Browse Source

update省平台测点从 label获取

dev
lucas 3 months ago
parent
commit
b776bc6e5b
  1. 36
      adaptors/安心云es主题特征to河北公路设施监测.go
  2. 27
      configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml
  3. 9
      consumers/HBJCAS/config.go
  4. 20
      consumers/consumerHBJCAS.go
  5. 39
      models/station.go

36
adaptors/安心云es主题特征to河北公路设施监测.go

@ -2,12 +2,16 @@ package adaptors
import (
"encoding/json"
"fmt"
"goInOut/consumers/HBJCAS"
"goInOut/consumers/HBJCAS/protoFiles_hb"
"goInOut/dbOperate"
"goInOut/models"
"google.golang.org/protobuf/proto"
"log"
"math"
"strconv"
"strings"
"time"
)
@ -18,6 +22,7 @@ type Adaptor_AXYES_HBGL struct {
StructInfo map[int64]int64
//一些必要信息
Info map[string]string
Redis *dbOperate.RedisHelper
}
func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg string) []NeedPush {
@ -55,19 +60,24 @@ func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esA
complexData := &protoFiles_hb.ComplexData{}
for _, sensorBucket := range buckets {
sensorId := sensorBucket.Key
monitorCode := int64(0)
for _, dateBucket := range sensorBucket.GroupDate.Buckets {
if _, ok := the.PointInfo[structId]; !ok {
//优先redis获取
station := models.Station{}
k1 := fmt.Sprintf("station:%d", sensorId)
errRedis := the.Redis.GetObj(k1, &station)
if errRedis != nil {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签异常", structId, factorId, sensorId)
continue
}
if _, ok := the.PointInfo[structId][sensorId]; !ok {
monitorCode := the.getPointCodeFromLabel(station.Labels)
if monitorCode == 0 {
log.Printf("redis 获取[s:%d,f:%d]测点[%d]标签,信息转换int64异常,跳过", structId, factorId, sensorId)
continue
}
monitorCode = the.PointInfo[structId][sensorId]
dataDefinition := &protoFiles_hb.DataDefinition{
DataType: protoFiles_hb.DataType_STATISTICS,
UniqueCode: strconv.FormatInt(uniqueCode, 10), //乃积沟大桥
UniqueCode: fmt.Sprintf("%d", uniqueCode), //乃积沟大桥
DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket),
}
complexData.SensorData = append(complexData.SensorData, dataDefinition)
@ -155,3 +165,19 @@ func (the Adaptor_AXYES_HBGL) getUniqueCode(structId int64) (uniqueCode int64) {
}
return uniqueCode
}
func (the Adaptor_AXYES_HBGL) getPointCodeFromLabel(label string) int64 {
//解析label {13010600001}
pointUniqueCode := int64(0)
if len(label) > 2 {
newLabel := strings.TrimLeft(label, "{")
str := strings.TrimRight(newLabel, "}")
codeInt64, err := strconv.ParseInt(str, 10, 64)
if err != nil {
log.Printf("测点标签转换异常[%s]", label)
}
pointUniqueCode = codeInt64
}
return pointUniqueCode
}

27
configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml

@ -14,31 +14,22 @@ ioConfig:
- t/province/1307
monitor:
#振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报
cron10min: 20 0/1 * * * #6/10 * * * *
cron10min: 40 0/1 * * * #6/10 * * * *
#普通类型 特征数据
cron1hour: 11 0/1 * * *
cron1hour: 20 0/1 * * *
info:
rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2
queryComponent:
redis:
address: 10.8.30.160:30379
#结构物id对应
structInfo:
5011: 130110
5016: 130109
#点位id对应信息
pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28
5011: #河北承德横河子中桥 (axy structureId)
#裂缝 axy sensorId映射 省平台 pointUniqueCode
68397: 1301100001
68398: 1301100002
68399: 1301100003
#振动
68400: 1301100004
5016: #河北承德乃积沟大桥
#桥墩倾斜
68384: 13010900001
68385: 13010900002
68386: 13010900003
#支座位移
68387: 13010900004
68388: 13010900005
68389: 13010900006

9
consumers/HBJCAS/config.go

@ -8,6 +8,7 @@ type ConfigFile struct {
PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"`
StructInfo map[int64]int64 `yaml:"structInfo"`
Monitor map[string]string `yaml:"monitor"`
QueryComponent queryComponent `yaml:"queryComponent"`
}
type ioConfig struct {
In In `yaml:"in"`
@ -18,5 +19,11 @@ type In struct {
}
type Out struct {
Mqtt config.MqttConfig `json:"mqtt"`
Mqtt config.MqttConfig `yaml:"mqtt"`
}
type queryComponent struct {
Redis struct {
Address string `yaml:"address"`
} `yaml:"redis"`
}

20
consumers/consumerHBJCAS.go

@ -22,6 +22,7 @@ type consumerHBJCAS struct {
InHttp *dbOperate.HttpHelper
outMqtt *dbOperate.MqttHelper
monitor *monitors.CommonMonitor
infoRedis *dbOperate.RedisHelper
}
func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) {
@ -40,6 +41,10 @@ func (the *consumerHBJCAS) Initial(cfg string) error {
return err
}
err = the.OutputInitial()
if err != nil {
return err
}
err = the.infoComponentInitial()
return err
}
func (the *consumerHBJCAS) InputInitial() error {
@ -78,6 +83,13 @@ func (the *consumerHBJCAS) OutputInitial() error {
"consumers/HBJCAS/ssl/client-key.pem")
return nil
}
func (the *consumerHBJCAS) infoComponentInitial() error {
//数据出口
addr := the.Info.QueryComponent.Redis.Address
the.infoRedis = dbOperate.NewRedisHelper("", addr)
return nil
}
func (the *consumerHBJCAS) Work() {
go func() {
for {
@ -106,12 +118,14 @@ func (the *consumerHBJCAS) Work() {
func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) {
return adaptors.Adaptor_AXYES_HBGL{}
return adaptors.Adaptor_AXYES_HBGL{
Redis: the.infoRedis,
}
}
func (the *consumerHBJCAS) getStructIds() []int64 {
var structIds []int64
for strutId, _ := range the.Info.PointInfo {
for strutId, _ := range the.Info.StructInfo {
structIds = append(structIds, strutId)
}
return structIds
@ -146,7 +160,7 @@ func (the *consumerHBJCAS) getEs1HourAggData() {
func (the *consumerHBJCAS) getEs10minAggData() {
//utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报
start, end := utils.GetTimeRangeByHour(-1)
log.Printf("查询数据时间范围 %s - %s", start, end)
log.Printf("查询10min数据时间范围 %s - %s", start, end)
factorIds := []int{28}
structIds := the.getStructIds()
for _, structId := range structIds {

39
models/station.go

@ -0,0 +1,39 @@
package models
import (
"encoding/json"
)
type Station struct {
Id int `json:"id"`
Name string `json:"name"`
Structure int `json:"structure"`
ThingId string `json:"thingId"`
StructName string `json:"struct_name"`
Factor int `json:"factor"`
ManualData bool `json:"manual_data"`
Formula interface{} `json:"formula"`
ParamsValue interface{} `json:"params_value"`
FacName string `json:"fac_name"`
Proto string `json:"proto"`
Devices []struct {
Params map[string]any `json:"params"`
IotaDeviceId string `json:"iota_device_id"`
IotaDeviceSerial int `json:"iota_device_serial"`
} `json:"devices"`
//测点标签 {channelCode:6d608735-6dce-420c-9ffa-58ede93292e1,低频}
Labels string `json:"labels"`
//关联 广东省长大桥梁结构健康监测平台
ChannelCode string `json:"channelCode"`
}
// redis序列化
func (m *Station) MarshalBinary() (data []byte, err error) {
return json.Marshal(m)
}
// redis序列化
func (m *Station) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, m)
}
Loading…
Cancel
Save