Browse Source

update 添加 风速156 风向225 合并测点数据 推送 支持

dev
lucas 2 months ago
parent
commit
ab01ae7fba
  1. 47
      adaptors/知物云es主题特征to中交华联.go
  2. 37
      consumers/consumerGZG2ZJHL.go
  3. 3
      dbOperate/httpHelper.go
  4. 17
      utils/timeRange.go

47
adaptors/知物云es主题特征to中交华联.go

@ -26,9 +26,13 @@ type Adaptor_ZWYES_ZJHL struct {
Redis *dbOperate.RedisHelper
//果子沟特殊处理 缓存与湿度关联的温度测点数据
cacheTemp map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData
//果子沟特殊处理 缓存与风向关联的风速测点数据
cacheSpeed map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData
}
var wsdMonitorCodeMap = map[int64]int64{
6500030001: 0,
6500030002: 0,
6500030003: 0,
6500030004: 0,
6500030005: 0}
@ -101,7 +105,22 @@ func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, es
log.Printf("缓存关联label[%d]的温度数据 => %s", monitorCode, string(tempStr))
continue
}
}
if factorId == 156 { //风速156 // 关联风向的 风速测点 只缓存
if _, exists := wsdMonitorCodeMap[monitorCode]; exists {
if the.cacheSpeed == nil {
the.cacheSpeed = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData)
}
if monitorCode == 6500030002 { //6500030002 有多个风速 只选测点72183
if sensorId != 72183 {
continue
}
}
the.cacheSpeed[monitorCode] = dataBody
tempStr, _ := json.Marshal(dataBody)
log.Printf("缓存关联label[%d]的风速数据 => %s", monitorCode, string(tempStr))
}
continue
}
dataDefinition := &protoFiles_zjhl_v3.DataDefinition{
@ -114,11 +133,16 @@ func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, es
}
}
//湿度处理后 清理之前的缓存温度
//湿度883处理后 清理之前的缓存温度
if factorId == 883 {
the.cacheTemp = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData)
}
//风向225处理后 清理之前的缓存风速
if factorId == 225 {
the.cacheSpeed = make(map[int64]*protoFiles_zjhl_v3.DataDefinition_StatisticData)
}
v, _ := json.Marshal(complexData)
log.Printf("[struct:%d,factor:%d] 特征数据=> %s", structId, factorId, v)
result, _ = proto.Marshal(complexData)
@ -126,7 +150,7 @@ func (the *Adaptor_ZWYES_ZJHL) EsAggTopToHBJCAS(structId int64, factorId int, es
return result
}
func (the *Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles_zjhl_v3.MonitoryType {
//结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 加速度三项监测592
//结构温度4 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 风速156 加速度三项监测592
switch factorId {
case 4:
return protoFiles_zjhl_v3.MonitoryType_TMP
@ -146,6 +170,8 @@ func (the *Adaptor_ZWYES_ZJHL) getMonitorTypeByFactorId(factorId int) protoFiles
return protoFiles_zjhl_v3.MonitoryType_DIS
case 28:
return protoFiles_zjhl_v3.MonitoryType_VIB
case 156: //风速测点 需要合并 同标签的风向测点 组成风速风向
return protoFiles_zjhl_v3.MonitoryType_UAN
case 883: //湿度测点 需要合并 同标签的温度测点 组成温湿度
return protoFiles_zjhl_v3.MonitoryType_RHS
case 935:
@ -243,6 +269,21 @@ func (the *Adaptor_ZWYES_ZJHL) EsAgg2StatisticData(factorId int, monitorCode int
MaxAbsoluteValue: float32(maxAbsoluteValueX),
RootMeanSquare: float32(rootMeanSquareY),
}}
case 156: //风速
dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_zjhl_v3.StatisticData_Uan{Uan: &protoFiles_zjhl_v3.UANStatistic{
AvgVelocity: float32(avgValueX),
//AvgDirection: float32(0),
}}
case 225: //风向
statisticDataUan := &protoFiles_zjhl_v3.StatisticData_Uan{Uan: &protoFiles_zjhl_v3.UANStatistic{
//AvgVelocity: float32(0),
AvgDirection: float32(avgValueX),
}}
if relatedSpeed, ok := the.cacheSpeed[monitorCode]; ok {
statisticDataUan.Uan.AvgVelocity = relatedSpeed.StatisticData.GetUan().GetAvgVelocity()
}
dataDefinitionStatisticData.StatisticData.DataBody = statisticDataUan
case 883: //湿度 ->最终拼接为 温湿度
statisticDataRhs := &protoFiles_zjhl_v3.StatisticData_Rhs{Rhs: &protoFiles_zjhl_v3.RHSStatistic{
//MaxTemperature: float32(dateBucket.X.Max),

37
consumers/consumerGZG2ZJHL.go

@ -59,7 +59,7 @@ func (the *consumerGZG2ZJHL) InputInitial() error {
for taskName, cron := range the.Info.Monitor {
switch taskName {
case "cron10min":
//the.monitor.RegisterTask(cron, the.getEs10minAggData)
the.monitor.RegisterTask(cron, the.getEs10minAggData)
case "cron1hour":
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
default:
@ -131,8 +131,8 @@ func (the *consumerGZG2ZJHL) getStructIds() []int64 {
func (the *consumerGZG2ZJHL) getEs1HourAggData() {
start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end)
//hourFactorIds := []int{883, 2, 7, 935, 4, 6, 11}
hourFactorIds := []int{4, 883}
hourFactorIds := []int{4, 6, 7, 11, 19, 24, 883, 935}
//hourFactorIds := []int{4, 883}
//湿度883 -> 要当作温湿度处理,合并温度监测中的3个有相同label的测点数据,组合成虚拟温湿度数据
//温湿度2(要温度和湿度数据拼接), 源于温度测点和湿度测点 特殊的3个label测点算温湿度
//6500030003(温度测点 72179,湿度测点 72191),6500030004(温度测点 72178,湿度测点 72192),6500030005(温度测点 72180,湿度测点 72166)
@ -159,24 +159,23 @@ func (the *consumerGZG2ZJHL) getEs1HourAggData() {
}
}
}
}
func (the *consumerGZG2ZJHL) getEs10minAggData() {
//utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报
start, end := utils.GetTimeRangeByHour(-1)
start, end := utils.GetTimeRangeBy10minByOffset(-20)
//start, end := "2025-02-28T15:30:00.000+08:00", "2025-02-28T15:40:00.000+08:00"
log.Printf("查询10min数据时间范围 %s - %s", start, end)
factorIds := []int{28, 592} //监测因素 592 -> 结构物[5222]隧道河北承德广仁岭隧道(上行) 的加速度三项监测
factorIds := []int{156, 225} //监测因素 风速156 风向225 需要合并风向
structIds := the.getStructIds()
for _, structId := range structIds {
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
for _, factorId := range factorIds {
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor()
adaptor.PointInfo = the.Info.PointInfo
adaptor.StructInfo = the.Info.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
for i := range needPushes {
needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload)
@ -420,6 +419,24 @@ func getEsAggSubSqlByZwyFactorId(factorId int) string {
"field": "data.trms"
}
}
}`
case 156:
subAggSQl = `
{
"x": {
"extended_stats": {
"field": "data.speed"
}
}
}`
case 225:
subAggSQl = `
{
"x": {
"extended_stats": {
"field": "data.direction"
}
}
}`
case 883: //湿度 (后期需要合并3个温度 拼成 温湿度)
subAggSQl = `

3
dbOperate/httpHelper.go

@ -54,7 +54,8 @@ func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string]
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
log.Printf("查询esproxy 异常=>%s", err.Error())
return ""
}
defer func(resp *http.Response) {
if resp != nil && resp.Body != nil {

17
utils/timeRange.go

@ -1,7 +1,6 @@
package utils
import (
"log"
"time"
)
@ -20,10 +19,18 @@ func GetTimeRangeByHour(durationHour int) (start, stop string) {
}
func GetTimeRangeBy10min() (start, stop string) {
m := time.Now().Minute() % 10
log.Println(m)
startTime := time.Now().Add(time.Minute * -1 * time.Duration(m))
now := time.Now()
m := now.Minute() % 10
startTime := now.Add(time.Minute * -1 * time.Duration(m))
start = startTime.Add(time.Minute * -10).Format("2006-01-02T15:04:00.000+08:00")
stop = startTime.Format("2006-01-02T15:04:00.000+08:00")
return
}
func GetTimeRangeBy10minByOffset(offsetMin int) (start, stop string) {
offset := time.Duration(offsetMin) * time.Minute
now := time.Now().Add(offset)
m := now.Minute() % 10
startTime := now.Add(time.Minute * -1 * time.Duration(m))
start = startTime.Add(time.Minute * -10).Format("2006-01-02T15:04:00.000+08:00")
stop = startTime.Format("2006-01-02T15:04:00.000+08:00")
return

Loading…
Cancel
Save