diff --git a/adaptors/安心云es主题特征to广东省平台.go b/adaptors/安心云es主题特征to广东省平台.go new file mode 100644 index 0000000..d9878fb --- /dev/null +++ b/adaptors/安心云es主题特征to广东省平台.go @@ -0,0 +1,190 @@ +package adaptors + +import ( + "encoding/json" + "fmt" + "goInOut/consumers/HBJCAS" + "goInOut/consumers/HBJCAS/protoFiles_hb" + "goInOut/dbOperate" + "goInOut/models" + "google.golang.org/protobuf/proto" + "log" + "math" + "strconv" + "strings" + "time" +) + +// Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台 +type Adaptor_AXYES_GDJKJC struct { + //传感器code转换信息 + PointInfo map[int64]map[int64]int64 + StructInfo map[int64]string + //一些必要信息 + Info map[string]string + Redis *dbOperate.RedisHelper +} + +func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg string) []NeedPush { + esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{} + var needPush []NeedPush + err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) + if err != nil { + return nil + } + + Payload := the.EsAggTopToHBJCAS(structId, factorId, esAggDateHistogram) + if len(Payload) == 0 { + return needPush + } + + needPush = append(needPush, NeedPush{ + Payload: Payload, + }) + return needPush +} + +func (the Adaptor_AXYES_GDJKJC) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) { + buckets := esAggs.Aggregations.GroupSensor.Buckets + if len(buckets) == 0 { + log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) + return + } + //设施唯一编码(省平台) + uniqueCode := the.getUniqueCode(structId) + if uniqueCode == "" { + log.Printf("structId=%d,无匹配省平台uniqueCode", structId) + return + } + //数据汇总 + complexData := &protoFiles_hb.ComplexData{} + for _, sensorBucket := range buckets { + sensorId := sensorBucket.Key + for _, dateBucket := range sensorBucket.GroupDate.Buckets { + //优先redis获取 + station := models.Station{} + k1 := fmt.Sprintf("station:%d", sensorId) + errRedis := the.Redis.GetObj(k1, &station) + if errRedis != nil { + log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) + continue + } + monitorCode := the.getPointCodeFromLabel(station.Labels) + if monitorCode == 0 { + log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId) + continue + } + + dataDefinition := &protoFiles_hb.DataDefinition{ + DataType: protoFiles_hb.DataType_STATISTICS, + UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥 + DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket), + } + complexData.SensorData = append(complexData.SensorData, dataDefinition) + } + } + v, _ := json.Marshal(complexData) + log.Printf("[s:%d,f:%d] 特征数据=> %s", structId, factorId, v) + result, _ = proto.Marshal(complexData) + return result +} +func (the Adaptor_AXYES_GDJKJC) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType { + //桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592 + switch factorId { + case 15: + return protoFiles_hb.MonitoryType_INC + case 18: + return protoFiles_hb.MonitoryType_CRK + case 20: + return protoFiles_hb.MonitoryType_DIS + case 28: + return protoFiles_hb.MonitoryType_VIB + case 592: + return protoFiles_hb.MonitoryType_VIB + default: + log.Printf("factorId=%d,无匹配的MonitorType", factorId) + return protoFiles_hb.MonitoryType_CMM + } +} + +func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData { + Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() + maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min)) + avgValueX := dateBucket.X.Avg + rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count)) + + maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min)) + avgValueY := dateBucket.Y.Avg + rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count)) + + monitoryType := the.getMonitorTypeByFactorId(factorId) + dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{ + StatisticData: &protoFiles_hb.StatisticData{ + MonitorType: monitoryType, + MonitorCode: monitorCode, //测点唯一编码 + EventTime: Atime, + Interval: 60 * 1000, + }, + } + + switch factorId { + case 15: //倾角 + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{ + MaxAbsoluteValueX: float32(maxAbsoluteValueX), + AvgValueX: float32(avgValueX), + RootMeanSquareX: float32(rootMeanSquareX), + MaxAbsoluteValueY: float32(maxAbsoluteValueY), + AvgValueY: float32(avgValueY), + RootMeanSquareY: float32(rootMeanSquareY), + }} + case 18: //裂缝监测 + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Crk{Crk: &protoFiles_hb.CRKStatistic{ + MaxAbsoluteValue: float32(maxAbsoluteValueX), + AvgValue: float32(avgValueX), + RootMeanSquare: float32(rootMeanSquareX), + TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min), + }} + case 20: //支座位移 + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Dis{Dis: &protoFiles_hb.DISStatistic{ + MaxAbsoluteValue: float32(maxAbsoluteValueX), + AvgValue: float32(avgValueX), + RootMeanSquare: float32(rootMeanSquareX), + TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min), + }} + case 28: //振动 + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{ + MaxAbsoluteValue: float32(maxAbsoluteValueX), + RootMeanSquare: float32(rootMeanSquareY), + }} + case 592: //加速度三项监测 + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{ + MaxAbsoluteValue: float32(maxAbsoluteValueX), + RootMeanSquare: float32(rootMeanSquareX), + }} + } + + return dataDefinitionStatisticData +} + +func (the Adaptor_AXYES_GDJKJC) getUniqueCode(structId int64) (uniqueCode string) { + if v, ok := the.StructInfo[structId]; ok { + uniqueCode = v + } + return uniqueCode +} + +func (the Adaptor_AXYES_GDJKJC) getPointCodeFromLabel(label string) int64 { + //解析label {13010600001} + pointUniqueCode := int64(0) + if len(label) > 2 { + newLabel := strings.TrimLeft(label, "{") + str := strings.TrimRight(newLabel, "}") + codeInt64, err := strconv.ParseInt(str, 10, 64) + if err != nil { + log.Printf("测点标签转换异常[%s]", label) + } + pointUniqueCode = codeInt64 + } + + return pointUniqueCode +} diff --git a/config/init.go b/config/init.go index c6686c5..9d35391 100644 --- a/config/init.go +++ b/config/init.go @@ -36,10 +36,15 @@ func LoadConfigJson() map[string]string { return allConfig } -func LoadConfig() map[string]string { +func LoadConfig(path ...string) map[string]string { allConfig := make(map[string]string) - files, err := os.ReadDir("configFiles") + defaultDir := "configFiles" + if len(path) != 0 { + defaultDir = path[0] + } + + files, err := os.ReadDir(defaultDir) if err != nil { log.Fatal(err) } @@ -51,7 +56,7 @@ func LoadConfig() map[string]string { continue } // 读取配置 - configBytes, _ := os.ReadFile(fmt.Sprintf("configFiles/%s", file.Name())) + configBytes, _ := os.ReadFile(fmt.Sprintf("%s/%s", defaultDir, file.Name())) consumer := new(Consumer) // 将 JSON 格式的数据解析到结构体中 err = yaml.Unmarshal(configBytes, &consumer) diff --git a/consumers/GDJKJC/config.go b/consumers/GDJKJC/config.go new file mode 100644 index 0000000..007257c --- /dev/null +++ b/consumers/GDJKJC/config.go @@ -0,0 +1,29 @@ +package GDJKJC + +import "goInOut/config" + +type ConfigFile struct { + IoConfig ioConfig `yaml:"ioConfig"` + OtherInfo map[string]string `yaml:"info"` + PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"` + StructInfo map[int64]string `yaml:"structInfo"` + Monitor map[string]string `yaml:"monitor"` + QueryComponent queryComponent `yaml:"queryComponent"` +} +type ioConfig struct { + In In `yaml:"in"` + Out Out `yaml:"out"` +} +type In struct { + Http config.HttpConfig `yaml:"http"` +} + +type Out struct { + Http config.HttpConfig `yaml:"http"` +} + +type queryComponent struct { + Redis struct { + Address string `yaml:"address"` + } `yaml:"redis"` +} diff --git a/consumers/HBJCAS/config.go b/consumers/HBJCAS/config.go index fa0fba9..30f18fe 100644 --- a/consumers/HBJCAS/config.go +++ b/consumers/HBJCAS/config.go @@ -19,7 +19,7 @@ type In struct { } type Out struct { - Http config.HttpConfig `yaml:"http"` + Mqtt config.MqttConfig `yaml:"mqtt"` } type queryComponent struct { diff --git a/consumers/consumerAXYES2GDJKJC.go b/consumers/consumerAXYES2GDJKJC.go index 50db175..0bb9f92 100644 --- a/consumers/consumerAXYES2GDJKJC.go +++ b/consumers/consumerAXYES2GDJKJC.go @@ -5,7 +5,7 @@ import ( "encoding/hex" "fmt" "goInOut/adaptors" - "goInOut/consumers/HBJCAS" + "goInOut/consumers/GDJKJC" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" @@ -18,7 +18,7 @@ type consumerAXYES2GDJKJC struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 - Info HBJCAS.ConfigFile + Info GDJKJC.ConfigFile InHttp *dbOperate.HttpHelper outHttp *dbOperate.HttpHelper monitor *monitors.CommonMonitor @@ -101,9 +101,9 @@ func (the *consumerAXYES2GDJKJC) Work() { }() } -func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { +func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_GDJKJC) { - return adaptors.Adaptor_AXYES_HBGL{ + return adaptors.Adaptor_AXYES_GDJKJC{ Redis: the.infoRedis, } } diff --git a/consumers/consumer_test.go b/consumers/consumer_test.go index c99fea3..bb36a7a 100644 --- a/consumers/consumer_test.go +++ b/consumers/consumer_test.go @@ -2,7 +2,7 @@ package consumers import ( "fmt" - "goUpload/config" + "goInOut/config" "log" "testing" "time" @@ -260,3 +260,14 @@ func Test_MYX_GDGS(t *testing.T) { consumerMyx.onFileData() } + +func Test_GDJKJC(t *testing.T) { + log.Printf("广州省平台") + myConfigs := config.LoadConfig("../configFiles") + configBody := myConfigs["consumerAXYES2GDJKJC"] + _consumerAXYES2GDJKJC := new(consumerAXYES2GDJKJC) + _consumerAXYES2GDJKJC.Initial(configBody) + //go _consumerGZGZM.RefreshTask() + + time.Sleep(2 * time.Second) +}