From 0d01bd4f983fefb9df37cf9a2a844831472af404 Mon Sep 17 00:00:00 2001 From: lucas Date: Mon, 13 Jan 2025 18:10:47 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=9B=B4=E6=96=B0=E5=80=BE=E8=A7=92?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=20=E9=85=8D=E7=BD=AE=E4=BF=A1=E6=81=AF=20?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=20=E7=BB=93=E6=9E=84=E7=89=A9=E5=92=8C?= =?UTF-8?q?=E6=B5=8B=E7=82=B9=E6=98=A0=E5=B0=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...s主题特征to河北公路设施监测.go | 99 ++++++++++++++----- ...监测_承德_轻量化特征数据.yaml | 15 +-- consumers/HBJCAS/config.go | 10 +- consumers/consumerHBJCAS.go | 57 ++++++----- models/esThemeAggDateHistogram.go | 4 +- 5 files changed, 124 insertions(+), 61 deletions(-) diff --git a/adaptors/安心云es主题特征to河北公路设施监测.go b/adaptors/安心云es主题特征to河北公路设施监测.go index aacd6c0..05219c2 100644 --- a/adaptors/安心云es主题特征to河北公路设施监测.go +++ b/adaptors/安心云es主题特征to河北公路设施监测.go @@ -6,21 +6,21 @@ import ( "goInOut/consumers/HBJCAS/protoFiles_hb" "goInOut/models" "google.golang.org/protobuf/proto" + "math" + "strconv" "time" ) // Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台 type Adaptor_AXYES_HBGL struct { //传感器code转换信息 - GnssMap map[string]string - RainMap map[string]string - NBWYMap map[string]string - DXSWMap map[string]string + PointInfo map[int64]map[int64]int64 + StructInfo map[int64]int64 //一些必要信息 Info map[string]string } -func (the Adaptor_AXYES_HBGL) Transform(rawMsg string) []NeedPush { +func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { esAggDateHistogram := models.EsThemeAggDateHistogram{} var needPush []NeedPush err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) @@ -28,44 +28,41 @@ func (the Adaptor_AXYES_HBGL) Transform(rawMsg string) []NeedPush { return nil } needPush = append(needPush, NeedPush{ - Payload: the.EsAggTopToHBJCAS(esAggDateHistogram), + Payload: the.EsAggTopToHBJCAS(structId, factorId, esAggDateHistogram), }) return needPush } -func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(esAggs models.EsThemeAggDateHistogram) (result []byte) { +func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs models.EsThemeAggDateHistogram) (result []byte) { buckets := esAggs.Aggregations.GroupSensor.Buckets if len(buckets) == 0 { log.Info("es agg数据数量==0") return } + //设施唯一编码(省平台) + uniqueCode := the.getUniqueCode(structId) + if uniqueCode == 0 { + log.Printf("structId=%d,无匹配省平台uniqueCode", structId) + return + } //数据汇总 complexData := &protoFiles_hb.ComplexData{} for _, sensorBucket := range buckets { - //sensorId := sensorBucket.Key + sensorId := sensorBucket.Key + monitorCode := int64(0) for _, dateBucket := range sensorBucket.GroupDate.Buckets { - Atime := dateBucket.KeyAsString - dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{ - StatisticData: &protoFiles_hb.StatisticData{ - MonitorType: protoFiles_hb.MonitoryType_INC, - MonitorCode: 13000100001, //测点唯一编码 - EventTime: Atime.Add(-8 * time.Hour).UnixMilli(), - Interval: 60 * 1000, - DataBody: &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{ - MaxAbsoluteValueX: 0, - AvgValueX: 0, - RootMeanSquareX: 0, - MaxAbsoluteValueY: 0, - AvgValueY: 0, - RootMeanSquareY: 0, - }}, - }, + if _, ok := the.PointInfo[structId]; !ok { + continue } + if _, ok := the.PointInfo[structId][sensorId]; !ok { + continue + } + monitorCode = the.PointInfo[structId][sensorId] dataDefinition := &protoFiles_hb.DataDefinition{ DataType: protoFiles_hb.DataType_STATISTICS, - UniqueCode: "130109", //乃积沟大桥 - DataBody: dataDefinitionStatisticData, + UniqueCode: strconv.FormatInt(uniqueCode, 10), //乃积沟大桥 + DataBody: the.EsAgg2StatisticData(factorId, monitorCode, dateBucket), } complexData.SensorData = append(complexData.SensorData, dataDefinition) @@ -76,3 +73,53 @@ func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(esAggs models.EsThemeAggDateHisto result, _ = proto.Marshal(complexData) return result } +func (the Adaptor_AXYES_HBGL) getMonitorTypeByFactorId(factorId int) protoFiles_hb.MonitoryType { + //桥墩倾斜 15 支座位移20 桥面振动28 + switch factorId { + case 15: + return protoFiles_hb.MonitoryType_INC + case 20: + return protoFiles_hb.MonitoryType_AND + case 28: + return protoFiles_hb.MonitoryType_VIB + default: + return protoFiles_hb.MonitoryType_CMM + } +} +func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket models.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData { + Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() + maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min)) + avgValueX := dateBucket.X.Avg + maxAbsoluteValueY := max(math.Abs(dateBucket.Y.Max), math.Abs(dateBucket.Y.Min)) + avgValueY := dateBucket.Y.Avg + + rootMeanSquareX := math.Sqrt(dateBucket.X.SumOfSquares / float64(dateBucket.X.Count)) + rootMeanSquareY := math.Sqrt(dateBucket.Y.SumOfSquares / float64(dateBucket.Y.Count)) + + monitoryType := the.getMonitorTypeByFactorId(factorId) + dataDefinitionStatisticData := &protoFiles_hb.DataDefinition_StatisticData{ + StatisticData: &protoFiles_hb.StatisticData{ + MonitorType: monitoryType, + MonitorCode: monitorCode, //测点唯一编码 + EventTime: Atime, + Interval: 60 * 1000, + DataBody: &protoFiles_hb.StatisticData_Inc{Inc: &protoFiles_hb.INCStatistic{ + MaxAbsoluteValueX: float32(maxAbsoluteValueX), + AvgValueX: float32(avgValueX), + RootMeanSquareX: float32(rootMeanSquareX), + MaxAbsoluteValueY: float32(maxAbsoluteValueY), + AvgValueY: float32(avgValueY), + RootMeanSquareY: float32(rootMeanSquareY), + }}, + }, + } + + return dataDefinitionStatisticData +} + +func (the Adaptor_AXYES_HBGL) getUniqueCode(structId int64) (uniqueCode int64) { + if v, ok := the.StructInfo[structId]; ok { + uniqueCode = v + } + return uniqueCode +} diff --git a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml index bf7ee9e..fb2aea6 100644 --- a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml +++ b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml @@ -3,7 +3,7 @@ ioConfig: in: http: url: https://esproxy.anxinyun.cn/anxincloud_themes/_search - cronStr: 55 0/1 * * * + cronStr: 48 0/1 * * * out: mqtt: host: 10.8.30.160 @@ -14,10 +14,13 @@ ioConfig: topics: - t/province/1307 info: - structureId: '5016' rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2 +#结构物id对应 +structInfo: + 5016: 130109 +#点位id对应信息 pointInfo: - '5016': - '68684': 13000100001 - '68685': 13000100002 - '68686': 13000100003 + 5016: #河北承德乃积沟大桥 + 68384: 13000100001 + 68385: 13000100002 + 68386: 13000100003 diff --git a/consumers/HBJCAS/config.go b/consumers/HBJCAS/config.go index c75771c..e27f6ea 100644 --- a/consumers/HBJCAS/config.go +++ b/consumers/HBJCAS/config.go @@ -3,18 +3,20 @@ package HBJCAS import "goInOut/config" type ConfigFile struct { - IoConfig ioConfig `yaml:"ioConfig"` - OtherInfo map[string]string `yaml:"info"` + IoConfig ioConfig `yaml:"ioConfig"` + OtherInfo map[string]string `yaml:"info"` + PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"` + StructInfo map[int64]int64 `yaml:"structInfo"` } type ioConfig struct { In In `yaml:"in"` - Out OUT `yaml:"out"` + Out Out `yaml:"out"` } type In struct { Http config.HttpConfig `yaml:"http"` CronStr string `yaml:"cronStr"` } -type OUT struct { +type Out struct { Mqtt config.MqttConfig `json:"mqtt"` } diff --git a/consumers/consumerHBJCAS.go b/consumers/consumerHBJCAS.go index e916b93..be1bc2c 100644 --- a/consumers/consumerHBJCAS.go +++ b/consumers/consumerHBJCAS.go @@ -25,7 +25,7 @@ type consumerHBJCAS struct { } func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) { - // 将 JSON 格式的数据解析到结构体中 + // 将 yaml 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) @@ -96,27 +96,38 @@ func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { return adaptors.Adaptor_AXYES_HBGL{} } +func (the *consumerHBJCAS) getStructIds() []int64 { + var structIds []int64 + for strutId, _ := range the.Info.PointInfo { + structIds = append(structIds, strutId) + } + return structIds +} func (the *consumerHBJCAS) getEsData() { - structureId := the.getStructureId() start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) + hourFactorIds := []int{15, 20} //, "28" + structIds := the.getStructIds() + for _, structId := range structIds { + for _, factorId := range hourFactorIds { + esQuery := the.getESQueryStrByHour(structId, factorId, start, end) + auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) + + adaptor := the.getAdaptor() + adaptor.PointInfo = the.Info.PointInfo + adaptor.StructInfo = the.Info.StructInfo + needPushes := adaptor.Transform(structId, factorId, esAggResult) + for i := range needPushes { + needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) + } - hourFactorIds := []string{"15", "20"} //, "28" - for _, factorId := range hourFactorIds { - esQuery := the.getESQueryStrByHour(structureId, factorId, start, end) - auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} - esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) - - adaptor := the.getAdaptor() - needPushes := adaptor.Transform(esAggResult) - for i := range needPushes { - needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) - } - - if len(needPushes) > 0 { - the.ch <- needPushes + if len(needPushes) > 0 { + the.ch <- needPushes + } } } + } func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) @@ -134,7 +145,7 @@ func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { log.Printf("rc4加密结果=> %s ", hex.EncodeToString(dest1)) return dest1 } -func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end string) string { +func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { aggSubSql := getEsAggSubSqlByFactorId(factorId) esQuery := fmt.Sprintf(` { @@ -145,14 +156,14 @@ func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end { "term": { "structure": { - "value": %s + "value": %d } } }, { "term": { "factor": { - "value": %s + "value": %d } } }, @@ -190,11 +201,11 @@ func (the *consumerHBJCAS) getESQueryStrByHour(structureId, factorId, start, end return esQuery } -func getEsAggSubSqlByFactorId(factorId string) string { +func getEsAggSubSqlByFactorId(factorId int) string { //桥墩倾斜 15 支座位移20 桥面振动28 subAggSQl := "" switch factorId { - case "15": + case 15: subAggSQl = ` { "x": { @@ -208,7 +219,7 @@ func getEsAggSubSqlByFactorId(factorId string) string { } } }` - case "20": + case 20: subAggSQl = ` { "displacement": { @@ -217,7 +228,7 @@ func getEsAggSubSqlByFactorId(factorId string) string { } } }` - case "28": + case 28: subAggSQl = ` { "trms": { diff --git a/models/esThemeAggDateHistogram.go b/models/esThemeAggDateHistogram.go index 6c1d7e7..e846c80 100644 --- a/models/esThemeAggDateHistogram.go +++ b/models/esThemeAggDateHistogram.go @@ -27,8 +27,8 @@ type GPBySensorIdAggByDateHistogram struct { } type Buckets struct { - Key int `json:"key"` - DocCount int `json:"doc_count"` + Key int64 `json:"key"` + DocCount int64 `json:"doc_count"` GroupDate struct { Buckets []BucketsXY `json:"buckets"` } `json:"groupDate"`