From b776bc6e5ba0c450fcc3a8af8c63cc8ccf293ae7 Mon Sep 17 00:00:00 2001 From: lucas Date: Thu, 16 Jan 2025 17:24:19 +0800 Subject: [PATCH] =?UTF-8?q?update=E7=9C=81=E5=B9=B3=E5=8F=B0=E6=B5=8B?= =?UTF-8?q?=E7=82=B9=E4=BB=8E=20label=E8=8E=B7=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...s主题特征to河北公路设施监测.go | 38 +++++++++++++++--- ...监测_承德_轻量化特征数据.yaml | 27 +++++-------- consumers/HBJCAS/config.go | 19 ++++++--- consumers/consumerHBJCAS.go | 28 +++++++++---- models/station.go | 39 +++++++++++++++++++ 5 files changed, 114 insertions(+), 37 deletions(-) create mode 100644 models/station.go diff --git a/adaptors/安心云es主题特征to河北公路设施监测.go b/adaptors/安心云es主题特征to河北公路设施监测.go index f9dd09b..b018a62 100644 --- a/adaptors/安心云es主题特征to河北公路设施监测.go +++ b/adaptors/安心云es主题特征to河北公路设施监测.go @@ -2,12 +2,16 @@ package adaptors import ( "encoding/json" + "fmt" "goInOut/consumers/HBJCAS" "goInOut/consumers/HBJCAS/protoFiles_hb" + "goInOut/dbOperate" + "goInOut/models" "google.golang.org/protobuf/proto" "log" "math" "strconv" + "strings" "time" ) @@ -17,7 +21,8 @@ type Adaptor_AXYES_HBGL struct { PointInfo map[int64]map[int64]int64 StructInfo map[int64]int64 //一些必要信息 - Info map[string]string + Info map[string]string + Redis *dbOperate.RedisHelper } func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { @@ -55,19 +60,24 @@ func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esA complexData := &protoFiles_hb.ComplexData{} for _, sensorBucket := range buckets { sensorId := sensorBucket.Key - monitorCode := int64(0) for _, dateBucket := range sensorBucket.GroupDate.Buckets { - if _, ok := the.PointInfo[structId]; !ok { + //优先redis获取 + station := models.Station{} + k1 := fmt.Sprintf("station:%d", sensorId) + errRedis := the.Redis.GetObj(k1, &station) + if errRedis != nil { + log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId) continue } - if _, ok := the.PointInfo[structId][sensorId]; !ok { + monitorCode := the.getPointCodeFromLabel(station.Labels) + if monitorCode == 0 { + log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId) continue } - monitorCode = the.PointInfo[structId][sensorId] dataDefinition := &protoFiles_hb.DataDefinition{ DataType: protoFiles_hb.DataType_STATISTICS, - UniqueCode: strconv.FormatInt(uniqueCode, 10), //乃积沟大桥 + UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥 DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket), } complexData.SensorData = append(complexData.SensorData, dataDefinition) @@ -155,3 +165,19 @@ func (the Adaptor_AXYES_HBGL) getUniqueCode(structId int64) (uniqueCode int64) { } return uniqueCode } + +func (the Adaptor_AXYES_HBGL) getPointCodeFromLabel(label string) int64 { + //解析label {13010600001} + pointUniqueCode := int64(0) + if len(label) > 2 { + newLabel := strings.TrimLeft(label, "{") + str := strings.TrimRight(newLabel, "}") + codeInt64, err := strconv.ParseInt(str, 10, 64) + if err != nil { + log.Printf("测点标签转换异常[%s]", label) + } + pointUniqueCode = codeInt64 + } + + return pointUniqueCode +} diff --git a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml index a3772cc..2626f70 100644 --- a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml +++ b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml @@ -14,31 +14,22 @@ ioConfig: - t/province/1307 monitor: #振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 - cron10min: 20 0/1 * * * #6/10 * * * * + cron10min: 40 0/1 * * * #6/10 * * * * #普通类型 特征数据 - cron1hour: 11 0/1 * * * + cron1hour: 20 0/1 * * * info: rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2 + + +queryComponent: + redis: + address: 10.8.30.160:30379 #结构物id对应 structInfo: 5011: 130110 5016: 130109 + #点位id对应信息 pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 - 5011: #河北承德横河子中桥 (axy structureId) - #裂缝 axy sensorId映射 省平台 pointUniqueCode - 68397: 1301100001 - 68398: 1301100002 - 68399: 1301100003 - #振动 - 68400: 1301100004 - 5016: #河北承德乃积沟大桥 - #桥墩倾斜 - 68384: 13010900001 - 68385: 13010900002 - 68386: 13010900003 - #支座位移 - 68387: 13010900004 - 68388: 13010900005 - 68389: 13010900006 + diff --git a/consumers/HBJCAS/config.go b/consumers/HBJCAS/config.go index 3217ac9..30f18fe 100644 --- a/consumers/HBJCAS/config.go +++ b/consumers/HBJCAS/config.go @@ -3,11 +3,12 @@ package HBJCAS import "goInOut/config" type ConfigFile struct { - IoConfig ioConfig `yaml:"ioConfig"` - OtherInfo map[string]string `yaml:"info"` - PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"` - StructInfo map[int64]int64 `yaml:"structInfo"` - Monitor map[string]string `yaml:"monitor"` + IoConfig ioConfig `yaml:"ioConfig"` + OtherInfo map[string]string `yaml:"info"` + PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"` + StructInfo map[int64]int64 `yaml:"structInfo"` + Monitor map[string]string `yaml:"monitor"` + QueryComponent queryComponent `yaml:"queryComponent"` } type ioConfig struct { In In `yaml:"in"` @@ -18,5 +19,11 @@ type In struct { } type Out struct { - Mqtt config.MqttConfig `json:"mqtt"` + Mqtt config.MqttConfig `yaml:"mqtt"` +} + +type queryComponent struct { + Redis struct { + Address string `yaml:"address"` + } `yaml:"redis"` } diff --git a/consumers/consumerHBJCAS.go b/consumers/consumerHBJCAS.go index b110626..aac26fa 100644 --- a/consumers/consumerHBJCAS.go +++ b/consumers/consumerHBJCAS.go @@ -18,10 +18,11 @@ type consumerHBJCAS struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 - Info HBJCAS.ConfigFile - InHttp *dbOperate.HttpHelper - outMqtt *dbOperate.MqttHelper - monitor *monitors.CommonMonitor + Info HBJCAS.ConfigFile + InHttp *dbOperate.HttpHelper + outMqtt *dbOperate.MqttHelper + monitor *monitors.CommonMonitor + infoRedis *dbOperate.RedisHelper } func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) { @@ -40,6 +41,10 @@ func (the *consumerHBJCAS) Initial(cfg string) error { return err } err = the.OutputInitial() + if err != nil { + return err + } + err = the.infoComponentInitial() return err } func (the *consumerHBJCAS) InputInitial() error { @@ -78,6 +83,13 @@ func (the *consumerHBJCAS) OutputInitial() error { "consumers/HBJCAS/ssl/client-key.pem") return nil } + +func (the *consumerHBJCAS) infoComponentInitial() error { + //数据出口 + addr := the.Info.QueryComponent.Redis.Address + the.infoRedis = dbOperate.NewRedisHelper("", addr) + return nil +} func (the *consumerHBJCAS) Work() { go func() { for { @@ -106,12 +118,14 @@ func (the *consumerHBJCAS) Work() { func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { - return adaptors.Adaptor_AXYES_HBGL{} + return adaptors.Adaptor_AXYES_HBGL{ + Redis: the.infoRedis, + } } func (the *consumerHBJCAS) getStructIds() []int64 { var structIds []int64 - for strutId, _ := range the.Info.PointInfo { + for strutId, _ := range the.Info.StructInfo { structIds = append(structIds, strutId) } return structIds @@ -146,7 +160,7 @@ func (the *consumerHBJCAS) getEs1HourAggData() { func (the *consumerHBJCAS) getEs10minAggData() { //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 start, end := utils.GetTimeRangeByHour(-1) - log.Printf("查询数据时间范围 %s - %s", start, end) + log.Printf("查询10min数据时间范围 %s - %s", start, end) factorIds := []int{28} structIds := the.getStructIds() for _, structId := range structIds { diff --git a/models/station.go b/models/station.go new file mode 100644 index 0000000..d75672d --- /dev/null +++ b/models/station.go @@ -0,0 +1,39 @@ +package models + +import ( + "encoding/json" +) + +type Station struct { + Id int `json:"id"` + Name string `json:"name"` + Structure int `json:"structure"` + ThingId string `json:"thingId"` + StructName string `json:"struct_name"` + Factor int `json:"factor"` + ManualData bool `json:"manual_data"` + Formula interface{} `json:"formula"` + ParamsValue interface{} `json:"params_value"` + FacName string `json:"fac_name"` + Proto string `json:"proto"` + Devices []struct { + Params map[string]any `json:"params"` + IotaDeviceId string `json:"iota_device_id"` + IotaDeviceSerial int `json:"iota_device_serial"` + } `json:"devices"` + //测点标签 {channelCode:6d608735-6dce-420c-9ffa-58ede93292e1,低频} + Labels string `json:"labels"` + //关联 广东省长大桥梁结构健康监测平台 + ChannelCode string `json:"channelCode"` +} + +// redis序列化 +func (m *Station) MarshalBinary() (data []byte, err error) { + return json.Marshal(m) +} + +// redis序列化 +func (m *Station) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, m) + +}