Browse Source

添加注释

dev
yfh 4 weeks ago
parent
commit
d18c3bc18c
  1. 6
      dataSource/kafka/kafka_handler.go
  2. 22
      et_Info/InfoHandler.go
  3. 4
      et_calc/group/timeStrategy.go
  4. 29
      node/app/et_node.go
  5. 1
      node/stages/stage.go

6
dataSource/kafka/kafka_handler.go

@ -2,9 +2,9 @@ package kafka
import ( import (
"dataSource" "dataSource"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper" "gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
) )
type KafkaDataSource struct { type KafkaDataSource struct {
@ -40,7 +40,7 @@ func (s *KafkaDataSource) Producer() {
// 创建消息处理器 // 创建消息处理器
handler := NewMessageHandler(cfgName) handler := NewMessageHandler(cfgName)
if handler == nil { if handler == nil {
fmt.Printf("No handler found for topic %s\n", cfgName) log.Printf("Kafka topic【%s】未定义处理者。\n", cfgName)
continue continue
} }
// 订阅主题 和 消息处理 // 订阅主题 和 消息处理
@ -60,8 +60,10 @@ type IMessageHandler interface {
func NewMessageHandler(cfgName string) IMessageHandler { func NewMessageHandler(cfgName string) IMessageHandler {
switch cfgName { switch cfgName {
case "data_raw": case "data_raw":
log.Printf("Kafka topic【%s】已注册处理者IotaDataHandler\n", cfgName)
return IotaDataHandler{} return IotaDataHandler{}
case "data_agg": case "data_agg":
log.Printf("Kafka topic【%s】已注册处理者AggDataHandler\n", cfgName)
return NewAggDataHandler() return NewAggDataHandler()
default: default:
return nil return nil

22
et_Info/InfoHandler.go

@ -29,6 +29,11 @@ func (the *InfoHandler) GetStage() stages.Stage {
func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData { func (the *InfoHandler) getStationInfo(p *common_models.ProcessData) *common_models.ProcessData {
// TODO 测试 DeviceId = 22c76344-1eb2-4508-8aa6-4550c010e8f7 ,sensorId=18
//if p.DeviceData.DeviceId != "22c76344-1eb2-4508-8aa6-4550c010e8f7" {
// return &common_models.ProcessData{}
//}
s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId) s, err := the.configHelper.GetDeviceStationObjs(p.DeviceData.DeviceId)
if err == nil && s != nil { if err == nil && s != nil {
p.Stations = s p.Stations = s
@ -64,15 +69,24 @@ func (the *InfoHandler) getFormulaInfo(p *common_models.ProcessData) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
for i2, device := range p.Stations[i].Info.Devices { //for i2, device := range p.Stations[i].Info.Devices {
formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId) // formulaInfo, err := the.configHelper.GetFormulaInfo(device.FormulaId)
// if err == nil {
// p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
// p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
// }
//}
// TODO #TEST BEGIN 2024-10-01 测点设备没有公式信息,测试时先从设备监测原型中获取
for i2, _ := range p.Stations[i].Info.Devices {
p.Stations[i].Info.Devices[i2].FormulaId = deviceFactorProto.Formula
formulaInfo, err := the.configHelper.GetFormulaInfo(deviceFactorProto.Formula)
if err == nil { if err == nil {
p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo p.Stations[i].Info.Devices[i2].FormulaInfo = formulaInfo
p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto p.Stations[i].Info.Devices[i2].DeviceFactorProto = deviceFactorProto
} }
} }
// #TEST END
} }
} }

4
et_calc/group/timeStrategy.go

@ -8,8 +8,8 @@ import (
) )
const ( const (
data_active_expire_sec int = 900 data_active_expire_sec int = 900 // 15分钟
data_report_expire_sec int = 3600 data_report_expire_sec int = 3600 // 1小时
) )
// TimeStrategy 测点组合计算 超时时间判断策略 // TimeStrategy 测点组合计算 超时时间判断策略

29
node/app/et_node.go

@ -52,8 +52,35 @@ func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool)
return err return err
} }
// 是沉降测试数据
func isSettleData(data map[string]interface{}) bool {
// {"pressure":23.09,"temperature":24.93,"ssagee":16.44}
validKeys := map[string]bool{
"pressure": true,
"temperature": true,
"ssagee": true,
}
if len(data) != 3 {
return false
}
for key := range data {
if !validKeys[key] {
return false
}
}
return true
}
// ConsumerProcess 将 IotaData -> ProcessData // ConsumerProcess 将 IotaData -> ProcessData
func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
//TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试)
//if !isSettleData(iotaData.Data.Data) {
// return nil
//}
// #TEST END
deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData) deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData)
if err != nil { if err != nil {
return err return err
@ -64,7 +91,7 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error {
} }
log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw) log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw)
//log.Printf("rpc处理设备数据[%s]-time[%s]-data:%v", iotaData.DeviceId, iotaData.TriggerTime, iotaData.ThemeData.ThemeData)
the.ch <- &common_models.ProcessData{ the.ch <- &common_models.ProcessData{
DeviceData: *deviceData, DeviceData: *deviceData,
Stations: []common_models.Station{}, Stations: []common_models.Station{},

1
node/stages/stage.go

@ -66,6 +66,7 @@ func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) {
case <-s.execOver: case <-s.execOver:
case <-time.After(defaultTimeout): case <-time.After(defaultTimeout):
log.Printf("=====================") log.Printf("=====================")
//TODO #TEST BEGIN 测试时可以注释掉下面这行,否则调试时超时,会引发一个 panic,导致程序中断。
log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout) log.Panicf("stage[%s] ->[%s] 流程处理,超时[%v],请排查", stageName, deviceId, defaultTimeout)
} }
} }

Loading…
Cancel
Save