From 72d75c6d9fdf5e18eedafc5f9ec7f92811de709b Mon Sep 17 00:00:00 2001 From: lucas Date: Tue, 14 Jan 2025 16:40:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20=E6=8C=AF=E5=8A=A8?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E4=B8=8A=E6=8A=A5=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...s主题特征to河北公路设施监测.go | 13 +- ...监测_承德_轻量化特征数据.yaml | 7 +- consumers/HBJCAS/config.go | 4 +- .../HBJCAS/dataModel.go | 2 +- consumers/consumerCDJYSN.go | 2 +- consumers/consumerHBJCAS.go | 127 ++++++++++++++++-- dbOperate/httpHelper.go | 4 +- monitors/commonMonitor.go | 4 +- monitors/httpMonitor.go | 5 +- monitors/monitorHelper.go | 6 +- 10 files changed, 142 insertions(+), 32 deletions(-) rename models/esThemeAggDateHistogram.go => consumers/HBJCAS/dataModel.go (99%) diff --git a/adaptors/安心云es主题特征to河北公路设施监测.go b/adaptors/安心云es主题特征to河北公路设施监测.go index cc7369d..f827736 100644 --- a/adaptors/安心云es主题特征to河北公路设施监测.go +++ b/adaptors/安心云es主题特征to河北公路设施监测.go @@ -3,8 +3,8 @@ package adaptors import ( "encoding/json" "github.com/labstack/gommon/log" + "goInOut/consumers/HBJCAS" "goInOut/consumers/HBJCAS/protoFiles_hb" - "goInOut/models" "google.golang.org/protobuf/proto" "math" "strconv" @@ -21,7 +21,7 @@ type Adaptor_AXYES_HBGL struct { } func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg string) []NeedPush { - esAggDateHistogram := models.EsThemeAggDateHistogram{} + esAggDateHistogram := HBJCAS.EsThemeAggDateHistogram{} var needPush []NeedPush err := json.Unmarshal([]byte(rawMsg), &esAggDateHistogram) if err != nil { @@ -39,7 +39,7 @@ func (the Adaptor_AXYES_HBGL) Transform(structId int64, factorId int, rawMsg str return needPush } -func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs models.EsThemeAggDateHistogram) (result []byte) { +func (the Adaptor_AXYES_HBGL) EsAggTopToHBJCAS(structId int64, factorId int, esAggs HBJCAS.EsThemeAggDateHistogram) (result []byte) { buckets := esAggs.Aggregations.GroupSensor.Buckets if len(buckets) == 0 { log.Info("es agg数据数量==0") @@ -93,7 +93,7 @@ func (the Adaptor_AXYES_HBGL) getMonitorTypeByFactorId(factorId int) protoFiles_ } } -func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket models.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData { +func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int64, dateBucket HBJCAS.BucketsXY) *protoFiles_hb.DataDefinition_StatisticData { Atime := dateBucket.KeyAsString.Add(-8 * time.Hour).UnixMilli() maxAbsoluteValueX := max(math.Abs(dateBucket.X.Max), math.Abs(dateBucket.X.Min)) avgValueX := dateBucket.X.Avg @@ -137,6 +137,11 @@ func (the Adaptor_AXYES_HBGL) EsAgg2StatisticData(factorId int, monitorCode int6 RootMeanSquare: float32(rootMeanSquareX), TotalAbsoluteValue: float32(dateBucket.X.Max - dateBucket.X.Min), }} + case 28: //振动 + dataDefinitionStatisticData.StatisticData.DataBody = &protoFiles_hb.StatisticData_Vib{Vib: &protoFiles_hb.VIBStatistic{ + MaxAbsoluteValue: float32(maxAbsoluteValueX), + RootMeanSquare: float32(rootMeanSquareY), + }} } return dataDefinitionStatisticData diff --git a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml index 7b4b612..fc5675d 100644 --- a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml +++ b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.yaml @@ -3,7 +3,6 @@ ioConfig: in: http: url: https://esproxy.anxinyun.cn/anxincloud_themes/_search - cronStr: 11 0/1 * * * out: mqtt: host: 10.8.30.160 @@ -13,6 +12,10 @@ ioConfig: clientId: chengDe topics: - t/province/1307 +monitor: + #由于振动是触发式,数据迟缓 cron10min也改成1小时一次 上报多条 + cron10min: 30 0/1 * * * #6/10 * * * * + cron1hour: 11 0/1 * * * info: rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2 #结构物id对应 @@ -27,9 +30,11 @@ pointInfo: 68398: 1301100002 68399: 1301100001 5016: #河北承德乃积沟大桥 + #桥墩倾斜 68384: 13010900001 68385: 13010900002 68386: 13010900003 + #支座位移 68387: 13010900004 68388: 13010900005 68389: 13010900006 diff --git a/consumers/HBJCAS/config.go b/consumers/HBJCAS/config.go index e27f6ea..3217ac9 100644 --- a/consumers/HBJCAS/config.go +++ b/consumers/HBJCAS/config.go @@ -7,14 +7,14 @@ type ConfigFile struct { OtherInfo map[string]string `yaml:"info"` PointInfo map[int64]map[int64]int64 `yaml:"pointInfo"` StructInfo map[int64]int64 `yaml:"structInfo"` + Monitor map[string]string `yaml:"monitor"` } type ioConfig struct { In In `yaml:"in"` Out Out `yaml:"out"` } type In struct { - Http config.HttpConfig `yaml:"http"` - CronStr string `yaml:"cronStr"` + Http config.HttpConfig `yaml:"http"` } type Out struct { diff --git a/models/esThemeAggDateHistogram.go b/consumers/HBJCAS/dataModel.go similarity index 99% rename from models/esThemeAggDateHistogram.go rename to consumers/HBJCAS/dataModel.go index 3279988..1d7ad3b 100644 --- a/models/esThemeAggDateHistogram.go +++ b/consumers/HBJCAS/dataModel.go @@ -1,4 +1,4 @@ -package models +package HBJCAS import "time" diff --git a/consumers/consumerCDJYSN.go b/consumers/consumerCDJYSN.go index 7ea6857..4cbea58 100644 --- a/consumers/consumerCDJYSN.go +++ b/consumers/consumerCDJYSN.go @@ -52,7 +52,7 @@ func (the *consumerCDJYSN) InputInitial() error { } the.InHttp.Start() - the.InHttp.RegisterFun(the.getEsData) + the.InHttp.RegisterTask(the.ConfigInfo.IoConfig.In.CronStr, the.getEsData) return nil } func (the *consumerCDJYSN) OutputInitial() error { diff --git a/consumers/consumerHBJCAS.go b/consumers/consumerHBJCAS.go index e58a203..ce28470 100644 --- a/consumers/consumerHBJCAS.go +++ b/consumers/consumerHBJCAS.go @@ -47,10 +47,20 @@ func (the *consumerHBJCAS) InputInitial() error { //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ - MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, + MonitorHelper: &monitors.MonitorHelper{}, } + for taskName, cron := range the.Info.Monitor { + switch taskName { + case "cron10min": + the.monitor.RegisterTask(cron, the.getEs1HourAggData) + case "cron1hour": + the.monitor.RegisterTask(cron, the.getEs10minAggData) + default: + log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) + } + } + the.monitor.Start() - the.monitor.RegisterFun(the.getEsHourAggData) return nil } func (the *consumerHBJCAS) OutputInitial() error { @@ -103,21 +113,48 @@ func (the *consumerHBJCAS) getStructIds() []int64 { } return structIds } -func (the *consumerHBJCAS) getEsHourAggData() { +func (the *consumerHBJCAS) getEs1HourAggData() { start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) - hourFactorIds := []int{18, 15, 20} //15, 20 , 28 + hourFactorIds := []int{15, 18, 20} structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range hourFactorIds { esQuery := the.getESQueryStrByHour(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} - esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) + esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) + + adaptor := the.getAdaptor() + adaptor.PointInfo = the.Info.PointInfo + adaptor.StructInfo = the.Info.StructInfo + needPushes := adaptor.Transform(structId, factorId, esAggResultStr) + for i := range needPushes { + needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) + } + + if len(needPushes) > 0 { + the.ch <- needPushes + } + } + } + +} + +func (the *consumerHBJCAS) getEs10minAggData() { + start, end := utils.GetTimeRangeBy10min() + log.Printf("查询数据时间范围 %s - %s", start, end) + factorIds := []int{28} + structIds := the.getStructIds() + for _, structId := range structIds { + for _, factorId := range factorIds { + esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) + auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.Info.PointInfo adaptor.StructInfo = the.Info.StructInfo - needPushes := adaptor.Transform(structId, factorId, esAggResult) + needPushes := adaptor.Transform(structId, factorId, esAggResultStr) for i := range needPushes { needPushes[i].Payload = the.crc16rc4(needPushes[i].Payload) } @@ -129,6 +166,7 @@ func (the *consumerHBJCAS) getEsHourAggData() { } } + func (the *consumerHBJCAS) crc16rc4(transBytes []byte) []byte { resultByCrc16 := utils.NewCRC16CCITT().GetWCRCin(transBytes) needRC4 := append(transBytes, resultByCrc16...) @@ -187,8 +225,66 @@ func (the *consumerHBJCAS) getESQueryStrByHour(structureId int64, factorId int, "groupDate": { "date_histogram": { "field": "collect_time", - "interval": "hour", - "time_zone": "Asia/Shanghai" + "interval": "1h", + "time_zone": "Asia/Shanghai", + "min_doc_count": 1 + }, + "aggs": %s + } + } + } + } +} +`, structureId, factorId, start, end, aggSubSql) + + return esQuery +} + +func (the *consumerHBJCAS) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { + aggSubSql := getEsAggSubSqlByFactorId(factorId) + esQuery := fmt.Sprintf(` +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "factor": { + "value": %d + } + } + }, + { + "range": { + "collect_time": { + "gte": "%s", + "lte": "%s" + } + } + } + ] + } + }, + "aggs": { + "groupSensor": { + "terms": { + "field": "sensor" + }, + "aggs": { + "groupDate": { + "date_histogram": { + "field": "collect_time", + "interval": "10m", + "time_zone": "Asia/Shanghai", + "min_doc_count": 1 }, "aggs": %s } @@ -240,11 +336,16 @@ func getEsAggSubSqlByFactorId(factorId int) string { case 28: subAggSQl = ` { - "trms": { - "extended_stats": { - "field": "data.trms" - } - } + "x": { + "extended_stats": { + "field": "data.pv" + } + }, + "y": { + "extended_stats": { + "field": "data.trms" + } + } }` } return subAggSQl diff --git a/dbOperate/httpHelper.go b/dbOperate/httpHelper.go index b6277da..b80876b 100644 --- a/dbOperate/httpHelper.go +++ b/dbOperate/httpHelper.go @@ -36,7 +36,7 @@ func (the *HttpHelper) HttpGet(queryBody string) string { fmt.Println(err) } defer resp.Body.Close() - fmt.Println("http get 请求,url", url, " <- code=", resp.StatusCode) + log.Println("http get 请求,url", url, " <- code=", resp.StatusCode) body, err := io.ReadAll(resp.Body) return string(body) } @@ -64,7 +64,7 @@ func (the *HttpHelper) HttpGetWithHeader(queryBody string, headerMap map[string] } } }(resp) - fmt.Println("http get 请求,url", url, " <- code=", resp.StatusCode) + log.Println("http get 请求,url", url, " <- code=", resp.StatusCode) body, err := io.ReadAll(resp.Body) return string(body) } diff --git a/monitors/commonMonitor.go b/monitors/commonMonitor.go index c702665..fb84e62 100644 --- a/monitors/commonMonitor.go +++ b/monitors/commonMonitor.go @@ -4,8 +4,8 @@ type CommonMonitor struct { *MonitorHelper } -func (the *CommonMonitor) RegisterFun(fun func()) { - the.registerFun(fun) +func (the *CommonMonitor) RegisterTask(cron string, fun func()) { + the.registerTask(cron, fun) } func (the *CommonMonitor) Start() { diff --git a/monitors/httpMonitor.go b/monitors/httpMonitor.go index b3d14cb..5b1a534 100644 --- a/monitors/httpMonitor.go +++ b/monitors/httpMonitor.go @@ -7,10 +7,9 @@ type HttpMonitor struct { *MonitorHelper } -func (the *HttpMonitor) RegisterFun(fun func()) { - the.registerFun(fun) +func (the *HttpMonitor) RegisterTask(cron string, fun func()) { + the.registerTask(cron, fun) } - func (the *HttpMonitor) Start() { the.HttpClient.Initial() the.initial() diff --git a/monitors/monitorHelper.go b/monitors/monitorHelper.go index 5ad8b4f..ff78aec 100644 --- a/monitors/monitorHelper.go +++ b/monitors/monitorHelper.go @@ -15,9 +15,9 @@ func (the *MonitorHelper) initial() { log.Printf("cronStr=%s", the.CronStr) } -// RegisterFun 注册定时器方法 -func (the *MonitorHelper) registerFun(task func()) { - entryID, err := the.Cron.AddFunc(the.CronStr, task) +// RegisterTask 注册定时器方法 +func (the *MonitorHelper) registerTask(cron string, task func()) { + entryID, err := the.Cron.AddFunc(cron, task) if err != nil { log.Printf("cron 定时任务[%v]添加异常:%s", entryID, err.Error()) }