From ab01ae7fba13dddf02bb2440ab6dc6faf4ba89ec Mon Sep 17 00:00:00 2001 From: lucas Date: Fri, 28 Feb 2025 16:58:08 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=20=E9=A3=8E?= =?UTF-8?q?=E9=80=9F156=20=E9=A3=8E=E5=90=91225=20=E5=90=88=E5=B9=B6?= =?UTF-8?q?=E6=B5=8B=E7=82=B9=E6=95=B0=E6=8D=AE=20=E6=8E=A8=E9=80=81=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../知物云es主题特征to中交华联.go | 47 +++++++++++++++++-- consumers/consumerGZG2ZJHL.go | 37 +++++++++++---- dbOperate/httpHelper.go | 3 +- utils/timeRange.go | 17 +++++-- 4 files changed, 85 insertions(+), 19 deletions(-) diff --git a/adaptors/知物云es主题特征to中交华联.go b/adaptors/知物云es主题特征to中交华联.go index beae81e..c753e8b 100644 --- a/adaptors/知物云es主题特征to中交华联.go +++ b/adaptors/知物云es主题特征to中交华联.go @@ -26,9 +26,13 @@ type Adaptor_ZWYES_ZJHL struct { Redis *dbOperate.RedisHelper //果子沟特殊处理 缓存与湿度关联的温度测点数据 cacheTemp map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData + //果子沟特殊处理 缓存与风向关联的风速测点数据 + cacheSpeed map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData } var wsdMonitorCodeMap = map[int64]int64{ + 6500030001: 0, + 6500030002: 0, 6500030003: 0, 6500030004: 0, 6500030005: 0} @@ -101,7 +105,22 @@ func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, es log.Printf("缓存关联label[%d]的温度数据 => %s", monitorCode, string(tempStr)) continue } - + } + if factorId == 156 { //风速156 // 关联风向的 风速测点 只缓存 + if _, exists := wsdMonitorCodeMap[monitorCode]; exists { + if the.cacheSpeed == nil { + the.cacheSpeed = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData) + } + if monitorCode == 6500030002 { //6500030002 有多个风速 只选测点72183 + if sensorId != 72183 { + continue + } + } + the.cacheSpeed[monitorCode] = dataBody + tempStr, _ := json.Marshal(dataBody) + log.Printf("缓存关联label[%d]的风速数据 => %s", monitorCode, string(tempStr)) + } + continue } dataDefinition := &protoFiles_zjhl_v3.DataDefinition{ @@ -114,11 +133,16 @@ func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, es } } - //湿度处理后 清理之前的缓存温度 + //湿度883处理后 清理之前的缓存温度 if factorId == 883 { the.cacheTemp = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData) } + //风向225处理后 清理之前的缓存风速 + if factorId == 225 { + the.cacheSpeed = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData) + } + v, _ := json.Marshal(complexData) log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v) result, _ = proto.Marshal(complexData) @@ -126,7 +150,7 @@ func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, es return result } func (the *Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_zjhl_v3.MonitoryType { - //结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592 + //结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 风速156 加速度三项监测592 switch factorId { case 4: return protoFiles_zjhl_v3.MonitoryType_TMP @@ -146,6 +170,8 @@ func (the *Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles return protoFiles_zjhl_v3.MonitoryType_DIS case 28: return protoFiles_zjhl_v3.MonitoryType_VIB + case 156: //风速测点 需要合并 同标签的风向测点 组成风速风向 + return protoFiles_zjhl_v3.MonitoryType_UAN case 883: //湿度测点 需要合并 同标签的温度测点 组成温湿度 return protoFiles_zjhl_v3.MonitoryType_RHS case 935: @@ -243,6 +269,21 @@ func (the *Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int MaxAbsoluteValue: float32(maxAbsoluteValueX), RootMeanSquare: float32(rootMeanSquareY), }} + case 156: //风速 + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Uan{Uan: &protoFiles_zjhl_v3.UANStatistic{ + AvgVelocity: float32(avgValueX), + //AvgDirection: float32(0), + }} + case 225: //风向 + statisticDataUan := &protoFiles_zjhl_v3.StatisticData_Uan{Uan: &protoFiles_zjhl_v3.UANStatistic{ + //AvgVelocity: float32(0), + AvgDirection: float32(avgValueX), + }} + + if relatedSpeed, ok := the.cacheSpeed[monitorCode]; ok { + statisticDataUan.Uan.AvgVelocity = relatedSpeed.StatisticData.GetUan().GetAvgVelocity() + } + dataDefinitionStatisticData.StatisticData.DataBody = statisticDataUan case 883: //湿度 ->最终拼接为 温湿度 statisticDataRhs := &protoFiles_zjhl_v3.StatisticData_Rhs{Rhs: &protoFiles_zjhl_v3.RHSStatistic{ //MaxTemperature: float32(dateBucket.X.Max), diff --git a/consumers/consumerGZG2ZJHL.go b/consumers/consumerGZG2ZJHL.go index 5ba9b77..27f1473 100644 --- a/consumers/consumerGZG2ZJHL.go +++ b/consumers/consumerGZG2ZJHL.go @@ -59,7 +59,7 @@ func (the *consumerGZG2ZJHL) InputInitial() error { for taskName, cron := range the.Info.Monitor { switch taskName { case "cron10min": - //the.monitor.RegisterTask(cron, the.getEs10minAggData) + the.monitor.RegisterTask(cron, the.getEs10minAggData) case "cron1hour": the.monitor.RegisterTask(cron, the.getEs1HourAggData) default: @@ -131,8 +131,8 @@ func (the *consumerGZG2ZJHL) getStructIds() []int64 { func (the *consumerGZG2ZJHL) getEs1HourAggData() { start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) - //hourFactorIds := []int{883, 2, 7, 935, 4, 6, 11} - hourFactorIds := []int{4, 883} + hourFactorIds := []int{4, 6, 7, 11, 19, 24, 883, 935} + //hourFactorIds := []int{4, 883} //湿度883 -> 要当作温湿度处理,合并温度监测中的3个有相同label的测点数据,组合成虚拟温湿度数据 //温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度 //6500030003(温度测点 72179,湿度测点 72191),6500030004(温度测点 72178,湿度测点 72192),6500030005(温度测点 72180,湿度测点 72166) @@ -159,24 +159,23 @@ func (the *consumerGZG2ZJHL) getEs1HourAggData() { } } } - } func (the *consumerGZG2ZJHL) getEs10minAggData() { - //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 - start, end := utils.GetTimeRangeByHour(-1) + start, end := utils.GetTimeRangeBy10minByOffset(-20) + //start, end := "2025-02-28T15:30:00.000+08:00", "2025-02-28T15:40:00.000+08:00" log.Printf("查询10min数据时间范围 %s - %s", start, end) - factorIds := []int{28, 592} //监测因素 592 -> 结构物[5222]隧道河北承德广仁岭隧道(上行) 的加速度三项监测 + factorIds := []int{156, 225} //监测因素 风速156 风向225 需要合并风向 structIds := the.getStructIds() for _, structId := range structIds { + adaptor := the.getAdaptor() + adaptor.PointInfo = the.Info.PointInfo + adaptor.StructInfo = the.Info.StructInfo for _, factorId := range factorIds { esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) - adaptor := the.getAdaptor() - adaptor.PointInfo = the.Info.PointInfo - adaptor.StructInfo = the.Info.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) @@ -420,6 +419,24 @@ func getEsAggSubSqlByZwyFactorId(factorId int) string { "field": "data.trms" } } +}` + case 156: + subAggSQl = ` +{ + "x": { + "extended_stats": { + "field": "data.speed" + } + } +}` + case 225: + subAggSQl = ` +{ + "x": { + "extended_stats": { + "field": "data.direction" + } + } }` case 883: //湿度 (后期需要合并3个温度 拼成 温湿度) subAggSQl = ` diff --git a/dbOperate/httpHelper.go b/dbOperate/httpHelper.go index b80876b..b491521 100644 --- a/dbOperate/httpHelper.go +++ b/dbOperate/httpHelper.go @@ -54,7 +54,8 @@ func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string] resp, err := client.Do(req) if err != nil { - fmt.Println(err) + log.Printf("查询esproxy 异常=>%s", err.Error()) + return "" } defer func(resp *http.Response) { if resp != nil && resp.Body != nil { diff --git a/utils/timeRange.go b/utils/timeRange.go index a9b4b40..8b90dfb 100644 --- a/utils/timeRange.go +++ b/utils/timeRange.go @@ -1,7 +1,6 @@ package utils import ( - "log" "time" ) @@ -20,10 +19,18 @@ func GetTimeRangeByHour(durationHour int) (start, stop string) { } func GetTimeRangeBy10min() (start, stop string) { - - m := time.Now().Minute() % 10 - log.Println(m) - startTime := time.Now().Add(time.Minute * -1 * time.Duration(m)) + now := time.Now() + m := now.Minute() % 10 + startTime := now.Add(time.Minute * -1 * time.Duration(m)) + start = startTime.Add(time.Minute * -10).Format("2006-01-02T15:04:00.000+08:00") + stop = startTime.Format("2006-01-02T15:04:00.000+08:00") + return +} +func GetTimeRangeBy10minByOffset(offsetMin int) (start, stop string) { + offset := time.Duration(offsetMin) * time.Minute + now := time.Now().Add(offset) + m := now.Minute() % 10 + startTime := now.Add(time.Minute * -1 * time.Duration(m)) start = startTime.Add(time.Minute * -10).Format("2006-01-02T15:04:00.000+08:00") stop = startTime.Format("2006-01-02T15:04:00.000+08:00") return