Browse Source

update 支持 特殊 温湿度 温度+测点测点,合并为一个温湿度 6500030003: 0,

6500030004: 0,
	6500030005: 0}
dev
lucas 2 months ago
parent
commit
5f81e78b28
  1. 55
      adaptors/知物云es主题特征to中交华联.go
  2. 16
      consumers/consumerGZG2ZJHL.go

55
adaptors/知物云es主题特征to中交华联.go

@ -24,9 +24,16 @@ type Adaptor_ZWYES_ZJHL struct {
//一些必要信息 //一些必要信息
Info map[string]string Info map[string]string
Redis *dbOperate.RedisHelper Redis *dbOperate.RedisHelper
//果子沟特殊处理 缓存与湿度关联的温度测点数据
cacheTemp map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData
} }
func (the Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { var wsdMonitorCodeMap = map[int64]int64{
6500030003: 0,
6500030004: 0,
6500030005: 0}
func (the *Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg string) []NeedPush {
esAggDateHistogram := GZG2ZJHL.EsThemeAggDateHistogram{} esAggDateHistogram := GZG2ZJHL.EsThemeAggDateHistogram{}
var needPush []NeedPush var needPush []NeedPush
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram)
@ -45,7 +52,7 @@ func (the Adaptor_ZWYES_ZJHL) Transform(structId int64, factorId int, rawMsg str
return needPush return needPush
} }
func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs GZG2ZJHL.EsThemeAggDateHistogram) (result []byte) { func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs GZG2ZJHL.EsThemeAggDateHistogram) (result []byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 { if len(buckets) == 0 {
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId)
@ -81,6 +88,22 @@ func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esA
log.Printf("[s:%d,f:%d]测点[%d] 特征数据组包异常,跳过", structId, factorId, sensorId) log.Printf("[s:%d,f:%d]测点[%d] 特征数据组包异常,跳过", structId, factorId, sensorId)
continue continue
} }
//温度 特殊处理 : 温度 特殊测点缓存 便于后续湿度测断合并
//湿度 特殊处理 : 果子沟 湿度和温度 同label里面的monitorCode 需要组合成温湿度 一起上报
if factorId == 4 { //关联湿度的 温度测点 只缓存
if _, exists := wsdMonitorCodeMap[monitorCode]; exists {
if the.cacheTemp == nil {
the.cacheTemp = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData)
}
the.cacheTemp[monitorCode] = dataBody
tempStr, _ := json.Marshal(dataBody)
log.Printf("缓存关联label[%d]的温度数据 => %s", monitorCode, string(tempStr))
continue
}
}
dataDefinition := &protoFiles_zjhl_v3.DataDefinition{ dataDefinition := &protoFiles_zjhl_v3.DataDefinition{
DataType: protoFiles_zjhl_v3.DataType_STATISTICS, DataType: protoFiles_zjhl_v3.DataType_STATISTICS,
//BridgeCode: fmt.Sprintf("%d", uniqueCode), //提示 不传该字段 //BridgeCode: fmt.Sprintf("%d", uniqueCode), //提示 不传该字段
@ -90,13 +113,19 @@ func (the Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, esA
complexData.SensorData = append(complexData.SensorData, dataDefinition) complexData.SensorData = append(complexData.SensorData, dataDefinition)
} }
} }
//湿度处理后 清理之前的缓存温度
if factorId == 883 {
the.cacheTemp = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData)
}
v, _ := json.Marshal(complexData) v, _ := json.Marshal(complexData)
log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v) log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v)
result, _ = proto.Marshal(complexData) result, _ = proto.Marshal(complexData)
log.Printf("[struct:%d,factor:%d] protobuf数据=> %s", structId, factorId, hex.EncodeToString(result)) log.Printf("[struct:%d,factor:%d] protobuf数据=> %s", structId, factorId, hex.EncodeToString(result))
return result return result
} }
func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_zjhl_v3.MonitoryType { func (the *Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_zjhl_v3.MonitoryType {
//结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592 //结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592
switch factorId { switch factorId {
case 4: case 4:
@ -117,6 +146,8 @@ func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_
return protoFiles_zjhl_v3.MonitoryType_DIS return protoFiles_zjhl_v3.MonitoryType_DIS
case 28: case 28:
return protoFiles_zjhl_v3.MonitoryType_VIB return protoFiles_zjhl_v3.MonitoryType_VIB
case 883: //湿度测点 需要合并 同标签的温度测点 组成温湿度
return protoFiles_zjhl_v3.MonitoryType_RHS
case 935: case 935:
return protoFiles_zjhl_v3.MonitoryType_GNSS return protoFiles_zjhl_v3.MonitoryType_GNSS
default: default:
@ -125,7 +156,7 @@ func (the Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_
} }
} }
func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket GZG2ZJHL.BucketsXYZ) *protoFiles_zjhl_v3.DataDefinition_StatisticData { func (the *Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket GZG2ZJHL.BucketsXYZ) *protoFiles_zjhl_v3.DataDefinition_StatisticData {
Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli()
monitoryType := the.getMonitorTypeByFactorId(factorId) monitoryType := the.getMonitorTypeByFactorId(factorId)
@ -213,7 +244,7 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6
RootMeanSquare: float32(rootMeanSquareY), RootMeanSquare: float32(rootMeanSquareY),
}} }}
case 883: //湿度 ->最终拼接为 温湿度 case 883: //湿度 ->最终拼接为 温湿度
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Rhs{Rhs: &protoFiles_zjhl_v3.RHSStatistic{ statisticDataRhs := &protoFiles_zjhl_v3.StatisticData_Rhs{Rhs: &protoFiles_zjhl_v3.RHSStatistic{
//MaxTemperature: float32(dateBucket.X.Max), //MaxTemperature: float32(dateBucket.X.Max),
//MinTemperature: float32(dateBucket.X.Min), //MinTemperature: float32(dateBucket.X.Min),
//AvgTemperature: float32(avgValueX), //AvgTemperature: float32(avgValueX),
@ -223,6 +254,16 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6
AvgHumidity: float32(avgValueX), AvgHumidity: float32(avgValueX),
HumidityExceedDuration: 0, HumidityExceedDuration: 0,
}} }}
if relatedTemp, ok := the.cacheTemp[monitorCode]; ok {
statisticDataRhs.Rhs.MaxTemperature = relatedTemp.StatisticData.GetTmp().GetMaxTemperature()
statisticDataRhs.Rhs.MinTemperature = relatedTemp.StatisticData.GetTmp().GetMinTemperature()
statisticDataRhs.Rhs.AvgTemperature = relatedTemp.StatisticData.GetTmp().GetAvgTemperature()
statisticDataRhs.Rhs.MaxTemperatureDifference = relatedTemp.StatisticData.GetTmp().GetMaxDifference()
}
dataDefinitionStatisticData.StatisticData.DataBody = statisticDataRhs
case 935: //gnss case 935: //gnss
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Gnss{Gnss: &protoFiles_zjhl_v3.GNSSStatistic{ dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Gnss{Gnss: &protoFiles_zjhl_v3.GNSSStatistic{
// 统计时间范围内的空间变形X绝对最大值 // 统计时间范围内的空间变形X绝对最大值
@ -249,14 +290,14 @@ func (the Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int6
return dataDefinitionStatisticData return dataDefinitionStatisticData
} }
func (the Adaptor_ZWYES_ZJHL) getUniqueCode(structId int64) (uniqueCode int64) { func (the *Adaptor_ZWYES_ZJHL) getUniqueCode(structId int64) (uniqueCode int64) {
if v, ok := the.StructInfo[structId]; ok { if v, ok := the.StructInfo[structId]; ok {
uniqueCode = v uniqueCode = v
} }
return uniqueCode return uniqueCode
} }
func (the Adaptor_ZWYES_ZJHL) getPointCodeFromLabel(label string) int64 { func (the *Adaptor_ZWYES_ZJHL) getPointCodeFromLabel(label string) int64 {
//解析label {ID:6500030042} //解析label {ID:6500030042}
pointUniqueCode := int64(0) pointUniqueCode := int64(0)
if len(label) > 5 { if len(label) > 5 {

16
consumers/consumerGZG2ZJHL.go

@ -129,23 +129,25 @@ func (the *consumerGZG2ZJHL) getStructIds() []int64 {
return structIds return structIds
} }
func (the *consumerGZG2ZJHL) getEs1HourAggData() { func (the *consumerGZG2ZJHL) getEs1HourAggData() {
start, end := utils.GetTimeRangeByHour(-5) start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end) log.Printf("查询数据时间范围 %s - %s", start, end)
//hourFactorIds := []int{883, 2, 7, 935, 4, 6, 11} //hourFactorIds := []int{883, 2, 7, 935, 4, 6, 11}
hourFactorIds := []int{935} hourFactorIds := []int{4, 883}
//湿度883 -> 要当作温湿度处理,合并温度监测中的3个有相同label的测点数据,组合成虚拟温湿度数据 //湿度883 -> 要当作温湿度处理,合并温度监测中的3个有相同label的测点数据,组合成虚拟温湿度数据
//温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度 6500030003,6500030004,6500030005 //温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度
//Gnss 935 ,4(温度),6索力, 15, 18, 20 //6500030003(温度测点 72179,湿度测点 72191),6500030004(温度测点 72178,湿度测点 72192),6500030005(温度测点 72180,湿度测点 72166)
//935(Gnss) ,883(湿度,果子沟特有),4(温度),6索力, 15, 18, 20
structIds := the.getStructIds() structIds := the.getStructIds()
for _, structId := range structIds { for _, structId := range structIds {
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
for _, factorId := range hourFactorIds { for _, factorId := range hourFactorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end) esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes { for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)

Loading…
Cancel
Save