diff --git a/dataSource/kafka/kafka_handler.go b/dataSource/kafka/kafka_handler.go index bd2eccc..67251e9 100644 --- a/dataSource/kafka/kafka_handler.go +++ b/dataSource/kafka/kafka_handler.go @@ -2,9 +2,9 @@ package kafka import ( "dataSource" - "fmt" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/kafkaHelper" + "log" ) type KafkaDataSource struct { @@ -40,7 +40,7 @@ func (s *KafkaDataSource) Producer() { // 创建消息处理器 handler := NewMessageHandler(cfgName) if handler == nil { - fmt.Printf("No handler found for topic %s\n", cfgName) + log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName) continue } // 订阅主题 和 消息处理 @@ -60,8 +60,10 @@ type IMessageHandler interface { func NewMessageHandler(cfgName string) IMessageHandler { switch cfgName { case "data_raw": + log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName) return IotaDataHandler{} case "data_agg": + log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName) return NewAggDataHandler() default: return nil diff --git a/et_Info/InfoHandler.go b/et_Info/InfoHandler.go index 7a5e18f..68c7c7e 100644 --- a/et_Info/InfoHandler.go +++ b/et_Info/InfoHandler.go @@ -29,6 +29,11 @@ func (the *InfoHandler) GetStage() stages.Stage { func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData { + // TODO 测试 DeviceId = 22c76344-1eb2-4508-8aa6-4550c010e8f7 ,sensorId=18 + //if p.DeviceData.DeviceId != "22c76344-1eb2-4508-8aa6-4550c010e8f7" { + // return &common_models.ProcessData{} + //} + s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId) if err == nil && s != nil { p.Stations = s @@ -64,15 +69,24 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) { if err != nil { panic(err) } - for i2, device := range p.Stations[i].Info.Devices { - formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId) + //for i2, device := range p.Stations[i].Info.Devices { + // formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId) + // if err == nil { + // p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo + // p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto + // } + //} + + // TODO #TEST BEGIN 2024-10-01 测点设备没有公式信息,测试时先从设备监测原型中获取 + for i2, _ := range p.Stations[i].Info.Devices { + p.Stations[i].Info.Devices[i2].FormulaId = deviceFactorProto.Formula + formulaInfo, err := the.configHelper.GetFormulaInfo(deviceFactorProto.Formula) if err == nil { p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto } - } - + // #TEST END } } diff --git a/et_calc/group/timeStrategy.go b/et_calc/group/timeStrategy.go index 5807472..dad0968 100644 --- a/et_calc/group/timeStrategy.go +++ b/et_calc/group/timeStrategy.go @@ -8,8 +8,8 @@ import ( ) const ( - data_active_expire_sec int = 900 - data_report_expire_sec int = 3600 + data_active_expire_sec int = 900 // 15分钟 + data_report_expire_sec int = 3600 // 1小时 ) // TimeStrategy 测点组合计算 超时时间判断策略 diff --git a/node/app/et_node.go b/node/app/et_node.go index 5d71579..02bb387 100644 --- a/node/app/et_node.go +++ b/node/app/et_node.go @@ -52,8 +52,35 @@ func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool) return err } +// 是沉降测试数据 +func isSettleData(data map[string]interface{}) bool { + // {"pressure":23.09,"temperature":24.93,"ssagee":16.44} + validKeys := map[string]bool{ + "pressure": true, + "temperature": true, + "ssagee": true, + } + + if len(data) != 3 { + return false + } + + for key := range data { + if !validKeys[key] { + return false + } + } + return true +} + // ConsumerProcess 将 IotaData -> ProcessData func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { + //TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试) + //if !isSettleData(iotaData.Data.Data) { + // return nil + //} + // #TEST END + deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData) if err != nil { return err @@ -64,7 +91,7 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { } log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw) - //log.Printf("rpc处理设备数据[%s]-time[%s]-data:%v", iotaData.DeviceId, iotaData.TriggerTime, iotaData.ThemeData.ThemeData) + the.ch <- &common_models.ProcessData{ DeviceData: *deviceData, Stations: []common_models.Station{}, diff --git a/node/stages/stage.go b/node/stages/stage.go index dc421ef..85ca6a6 100644 --- a/node/stages/stage.go +++ b/node/stages/stage.go @@ -66,6 +66,7 @@ func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) { case <-s.execOver: case <-time.After(defaultTimeout): log.Printf("=====================") + //TODO #TEST BEGIN 测试时可以注释掉下面这行,否则调试时超时,会引发一个 panic,导致程序中断。 log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout) } }