Browse Source

update 更新广东省平台 特征数据推送

dev
lucas 1 month ago
parent
commit
1ecd067489
  1. 12
      adaptors/安心云es主题特征to广东省平台.go
  2. 13
      configFiles/config_安心云测点特征数据_广东省平台.yaml
  3. 16
      consumers/GDJKJC/config.go
  4. 47
      consumers/consumerAXYES2GDJKJC.go

12
adaptors/安心云es主题特征to广东省平台.go

@ -18,15 +18,13 @@ import (
// Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台 // Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台
type Adaptor_AXYES_GDJKJC struct { type Adaptor_AXYES_GDJKJC struct {
//传感器code转换信息 StructInfo map[int]string
PointInfo map[int64]map[int64]int64
StructInfo map[int64]string
//一些必要信息 //一些必要信息
Info map[string]string Info map[string]string
Redis *dbOperate.RedisHelper Redis *dbOperate.RedisHelper
} }
func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg string) []NeedPush { func (the Adaptor_AXYES_GDJKJC) Transform(structId int, factorId int, rawMsg string) []NeedPush {
esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{} esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{}
var needPush []NeedPush var needPush []NeedPush
err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram)
@ -41,7 +39,7 @@ func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg s
for _, payload := range Payloads { for _, payload := range Payloads {
needPush = append(needPush, NeedPush{ needPush = append(needPush, NeedPush{
Topic: strconv.FormatInt(structId, 10), Topic: strconv.Itoa(structId),
Payload: payload, Payload: payload,
}) })
} }
@ -49,7 +47,7 @@ func (the Adaptor_AXYES_GDJKJC) Transform(structId int64, factorId int, rawMsg s
return needPush return needPush
} }
func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result [][]byte) { func (the Adaptor_AXYES_GDJKJC) EsAggTopToGDJKJC(structId int, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result [][]byte) {
buckets := esAggs.Aggregations.GroupSensor.Buckets buckets := esAggs.Aggregations.GroupSensor.Buckets
if len(buckets) == 0 { if len(buckets) == 0 {
log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId) log.Printf("[s=%d,f=%d] ,es agg数据数量==0", structId, factorId)
@ -164,7 +162,7 @@ func (the Adaptor_AXYES_GDJKJC) EsAgg2StatisticData(factorId int, monitorCode st
return dataBytes return dataBytes
} }
func (the Adaptor_AXYES_GDJKJC) getUniqueCode(structId int64) (uniqueCode string) { func (the Adaptor_AXYES_GDJKJC) getUniqueCode(structId int) (uniqueCode string) {
if v, ok := the.StructInfo[structId]; ok { if v, ok := the.StructInfo[structId]; ok {
uniqueCode = v uniqueCode = v
} }

13
configFiles/config_安心云测点特征数据_广东省平台.yaml

@ -9,9 +9,9 @@ ioConfig:
method: "post" method: "post"
monitor: monitor:
#振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 #振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报
cron10min: 56 0/1 * * * #6/10 * * * * cron10min: 25 0/1 * * * #6/10 * * * *
#普通类型 特征数据 #普通类型 特征数据
cron1hour: 35 0/1 * * * cron1hour: 28 0/1 * * *
info: info:
5450: #隆江大桥 5450: #隆江大桥
appKey: db43bc5d74534348 appKey: db43bc5d74534348
@ -28,8 +28,11 @@ queryComponent:
address: 10.8.30.160:30379 address: 10.8.30.160:30379
#结构物id对应 #结构物id对应
structInfo: structInfo:
5450: G15445224L1120 #隆江大桥 bridge:
5455: G15441581L1320 #螺河特大桥 5450: G15445224L1120 #隆江大桥
5456: G15441581L1310 #螺河东大桥 5455: G15441581L1320 #螺河特大桥
5456: G15441581L1310 #螺河东大桥
slope: #隧道无特征数据 预留
5452: project11223-

16
consumers/GDJKJC/config.go

@ -3,12 +3,11 @@ package GDJKJC
import "goInOut/config" import "goInOut/config"
type ConfigFile struct { type ConfigFile struct {
IoConfig ioConfig `yaml:"ioConfig"` IoConfig ioConfig `yaml:"ioConfig"`
Info map[string]AppKeySecret `yaml:"info"` Info map[string]AppKeySecret `yaml:"info"`
PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"` StructInfo structInfo `yaml:"structInfo"`
StructInfo map[int64]string `yaml:"structInfo"` Monitor map[string]string `yaml:"monitor"`
Monitor map[string]string `yaml:"monitor"` QueryComponent queryComponent `yaml:"queryComponent"`
QueryComponent queryComponent `yaml:"queryComponent"`
} }
type ioConfig struct { type ioConfig struct {
In In `yaml:"in"` In In `yaml:"in"`
@ -32,3 +31,8 @@ type AppKeySecret struct {
AppKey string `yaml:"appKey"` AppKey string `yaml:"appKey"`
AppSecret string `yaml:"appSecret"` AppSecret string `yaml:"appSecret"`
} }
type structInfo struct {
Bridge map[int]string `yaml:"bridge"`
Slope map[int]string `yaml:"slope"`
}

47
consumers/consumerAXYES2GDJKJC.go

@ -123,9 +123,16 @@ func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_GD
} }
} }
func (the *consumerAXYES2GDJKJC) getStructIds() []int64 { func (the *consumerAXYES2GDJKJC) getStructIdsByBridge() []int {
var structIds []int64 var structIds []int
for strutId, _ := range the.ConfigInfo.StructInfo { for strutId, _ := range the.ConfigInfo.StructInfo.Bridge {
structIds = append(structIds, strutId)
}
return structIds
}
func (the *consumerAXYES2GDJKJC) getStructIdsBySlope() []int {
var structIds []int
for strutId, _ := range the.ConfigInfo.StructInfo.Slope {
structIds = append(structIds, strutId) structIds = append(structIds, strutId)
} }
return structIds return structIds
@ -134,22 +141,34 @@ func (the *consumerAXYES2GDJKJC) getEs1HourAggData() {
start, end := utils.GetTimeRangeByHour(-1) start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end) log.Printf("查询数据时间范围 %s - %s", start, end)
hourFactorIds := []int{15} //[]int{11, 15, 18, 20} //应变11 桥墩倾斜15 裂缝监测18 支护结构变形63 hourFactorIds := []int{15} //[]int{11, 15, 18, 20} //应变11 桥墩倾斜15 裂缝监测18 支护结构变形63
structIds := the.getStructIds() structIds := the.getStructIdsByBridge()
the.handlerHourAggData(start, end, "bridge", structIds, hourFactorIds)
}
func (the *consumerAXYES2GDJKJC) handlerHourAggData(start, end, structType string, structIds, factorIds []int) {
adaptor := the.getAdaptor()
switch structType {
case "bridge":
adaptor.StructInfo = the.ConfigInfo.StructInfo.Bridge
case "slope":
adaptor.StructInfo = the.ConfigInfo.StructInfo.Slope
default:
log.Printf("无 匹配的结构物类型 => %s", structType)
return
}
for _, structId := range structIds { for _, structId := range structIds {
for _, factorId := range hourFactorIds { for _, factorId := range factorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end) esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
lenRes := len(esAggResultStr) lenRes := len(esAggResultStr)
if lenRes < 250 { if lenRes < 250 {
log.Printf("[s=%d,f=%d],es agg 返回无数据", structId, factorId) log.Printf("[s=%d,f=%d],es agg 返回无数据 len<250", structId, factorId)
continue continue
} }
adaptor := the.getAdaptor()
adaptor.PointInfo = the.ConfigInfo.PointInfo
adaptor.StructInfo = the.ConfigInfo.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
if len(needPushes) > 0 { if len(needPushes) > 0 {
@ -157,7 +176,6 @@ func (the *consumerAXYES2GDJKJC) getEs1HourAggData() {
} }
} }
} }
} }
func (the *consumerAXYES2GDJKJC) GetEs10minAggData() { func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
@ -165,7 +183,7 @@ func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
start, end := utils.GetTimeRangeByHour(-1) start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询10min数据时间范围 %s - %s", start, end) log.Printf("查询10min数据时间范围 %s - %s", start, end)
factorIds := []int{28} //桥面振动 28 factorIds := []int{28} //桥面振动 28
structIds := the.getStructIds() structIds := the.getStructIdsByBridge()
for _, structId := range structIds { for _, structId := range structIds {
for _, factorId := range factorIds { for _, factorId := range factorIds {
esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) esQuery := the.getESQueryStrBy10min(structId, factorId, start, end)
@ -173,8 +191,7 @@ func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor() adaptor := the.getAdaptor()
adaptor.PointInfo = the.ConfigInfo.PointInfo adaptor.StructInfo = the.ConfigInfo.StructInfo.Bridge
adaptor.StructInfo = the.ConfigInfo.StructInfo
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) needPushes := adaptor.Transform(structId, factorId, esAggResultStr)
if len(needPushes) > 0 { if len(needPushes) > 0 {
@ -185,7 +202,7 @@ func (the *consumerAXYES2GDJKJC) GetEs10minAggData() {
} }
func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int, factorId int, start, end string) string {
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
esQuery := fmt.Sprintf(` esQuery := fmt.Sprintf(`
{ {
@ -242,7 +259,7 @@ func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int64, factorId
return esQuery return esQuery
} }
func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int, factorId int, start, end string) string {
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
esQuery := fmt.Sprintf(` esQuery := fmt.Sprintf(`
{ {

Loading…
Cancel
Save