diff --git a/adaptors/知物云es主题特征to中交华联.go b/adaptors/知物云es主题特征to中交华联.go index 2397c14..beae81e 100644 --- a/adaptors/知物云es主题特征to中交华联.go +++ b/adaptors/知物云es主题特征to中交华联.go @@ -24,9 +24,16 @@ type Adaptor_ZWYES_ZJHL struct { //一些必要信息 Info map[string]string Redis *dbOperate.RedisHelper + //果子沟特殊处理 缓存与湿度关联的温度测点数据 + cacheTemp map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData } -func (the Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { +var wsdMonitorCodeMap = map[int64]int64{ + 6500030003: 0, + 6500030004: 0, + 6500030005: 0} + +func (the *Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { esAggDateHistogram := GZG2ZJHL.EsThemeAggDateHistogram{} var needPush []NeedPush err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) @@ -45,7 +52,7 @@ func (the Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg str return needPush } -func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs GZG2ZJHL.EsThemeAggDateHistogram) (result []byte) { +func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs GZG2ZJHL.EsThemeAggDateHistogram) (result []byte) { buckets := esAggs.Aggregations.GroupSensor.Buckets if len(buckets) == 0 { log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) @@ -81,6 +88,22 @@ func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esA log.Printf("[s:%d,f:%d]测点[%d] 特征数据组包异常,跳过", structId, factorId, sensorId) continue } + //温度 特殊处理 : 温度 特殊测点缓存 便于后续湿度测断合并 + //湿度 特殊处理 : 果子沟 湿度和温度 同label里面的monitorCode 需要组合成温湿度 一起上报 + + if factorId == 4 { //关联湿度的 温度测点 只缓存 + if _, exists := wsdMonitorCodeMap[monitorCode]; exists { + if the.cacheTemp == nil { + the.cacheTemp = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData) + } + the.cacheTemp[monitorCode] = dataBody + tempStr, _ := json.Marshal(dataBody) + log.Printf("缓存关联label[%d]的温度数据 => %s", monitorCode, string(tempStr)) + continue + } + + } + dataDefinition := &protoFiles_zjhl_v3.DataDefinition{ DataType: protoFiles_zjhl_v3.DataType_STATISTICS, //BridgeCode: fmt.Sprintf("%d", uniqueCode), //提示 不传该字段 @@ -90,13 +113,19 @@ func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esA complexData.SensorData = append(complexData.SensorData, dataDefinition) } } + + //湿度处理后 清理之前的缓存温度 + if factorId == 883 { + the.cacheTemp = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData) + } + v, _ := json.Marshal(complexData) log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v) result, _ = proto.Marshal(complexData) log.Printf("[struct:%d,factor:%d] protobuf数据=> %s", structId, factorId, hex.EncodeToString(result)) return result } -func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_zjhl_v3.MonitoryType { +func (the *Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_zjhl_v3.MonitoryType { //结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592 switch factorId { case 4: @@ -117,6 +146,8 @@ func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_ return protoFiles_zjhl_v3.MonitoryType_DIS case 28: return protoFiles_zjhl_v3.MonitoryType_VIB + case 883: //湿度测点 需要合并 同标签的温度测点 组成温湿度 + return protoFiles_zjhl_v3.MonitoryType_RHS case 935: return protoFiles_zjhl_v3.MonitoryType_GNSS default: @@ -125,7 +156,7 @@ func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_ } } -func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket GZG2ZJHL.BucketsXYZ) *protoFiles_zjhl_v3.DataDefinition_StatisticData { +func (the *Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket GZG2ZJHL.BucketsXYZ) *protoFiles_zjhl_v3.DataDefinition_StatisticData { Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() monitoryType := the.getMonitorTypeByFactorId(factorId) @@ -213,7 +244,7 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6 RootMeanSquare: float32(rootMeanSquareY), }} case 883: //湿度 ->最终拼接为 温湿度 - dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Rhs{Rhs: &protoFiles_zjhl_v3.RHSStatistic{ + statisticDataRhs := &protoFiles_zjhl_v3.StatisticData_Rhs{Rhs: &protoFiles_zjhl_v3.RHSStatistic{ //MaxTemperature: float32(dateBucket.X.Max), //MinTemperature: float32(dateBucket.X.Min), //AvgTemperature: float32(avgValueX), @@ -223,6 +254,16 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6 AvgHumidity: float32(avgValueX), HumidityExceedDuration: 0, }} + + if relatedTemp, ok := the.cacheTemp[monitorCode]; ok { + statisticDataRhs.Rhs.MaxTemperature = relatedTemp.StatisticData.GetTmp().GetMaxTemperature() + statisticDataRhs.Rhs.MinTemperature = relatedTemp.StatisticData.GetTmp().GetMinTemperature() + statisticDataRhs.Rhs.AvgTemperature = relatedTemp.StatisticData.GetTmp().GetAvgTemperature() + statisticDataRhs.Rhs.MaxTemperatureDifference = relatedTemp.StatisticData.GetTmp().GetMaxDifference() + } + + dataDefinitionStatisticData.StatisticData.DataBody = statisticDataRhs + case 935: //gnss dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Gnss{Gnss: &protoFiles_zjhl_v3.GNSSStatistic{ // 统计时间范围内的空间变形X绝对最大值 @@ -249,14 +290,14 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6 return dataDefinitionStatisticData } -func (the Adaptor_ZWYES_ZJHL) getUniqueCode(structId int64) (uniqueCode int64) { +func (the *Adaptor_ZWYES_ZJHL) getUniqueCode(structId int64) (uniqueCode int64) { if v, ok := the.StructInfo[structId]; ok { uniqueCode = v } return uniqueCode } -func (the Adaptor_ZWYES_ZJHL) getPointCodeFromLabel(label string) int64 { +func (the *Adaptor_ZWYES_ZJHL) getPointCodeFromLabel(label string) int64 { //解析label {ID:6500030042} pointUniqueCode := int64(0) if len(label) > 5 { diff --git a/consumers/consumerGZG2ZJHL.go b/consumers/consumerGZG2ZJHL.go index 98e2ed3..5ba9b77 100644 --- a/consumers/consumerGZG2ZJHL.go +++ b/consumers/consumerGZG2ZJHL.go @@ -129,23 +129,25 @@ func (the *consumerGZG2ZJHL) getStructIds() []int64 { return structIds } func (the *consumerGZG2ZJHL) getEs1HourAggData() { - start, end := utils.GetTimeRangeByHour(-5) + start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) //hourFactorIds := []int{883, 2, 7, 935, 4, 6, 11} - hourFactorIds := []int{935} + hourFactorIds := []int{4, 883} //湿度883 -> 要当作温湿度处理,合并温度监测中的3个有相同label的测点数据,组合成虚拟温湿度数据 - //温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度 6500030003,6500030004,6500030005 - //Gnss 935 ,4(温度),6索力, 15, 18, 20 + //温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度 + //6500030003(温度测点 72179,湿度测点 72191),6500030004(温度测点 72178,湿度测点 72192),6500030005(温度测点 72180,湿度测点 72166) + + //935(Gnss) ,883(湿度,果子沟特有),4(温度),6索力, 15, 18, 20 structIds := the.getStructIds() for _, structId := range structIds { + adaptor := the.getAdaptor() + adaptor.PointInfo = the.Info.PointInfo + adaptor.StructInfo = the.Info.StructInfo for _, factorId := range hourFactorIds { esQuery := the.getESQueryStrByHour(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) - adaptor := the.getAdaptor() - adaptor.PointInfo = the.Info.PointInfo - adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)