From 86cd1918c4c13f829f0fcc547726c5fdacacaa6c Mon Sep 17 00:00:00 2001 From: lucas Date: Sun, 12 Jan 2025 15:18:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20=E5=B0=8F=E6=97=B6?= =?UTF-8?q?=E8=81=9A=E9=9B=86=E7=9A=84=20es=E6=9F=A5=E8=AF=A2=20=E6=8B=BC?= =?UTF-8?q?=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...s主题特征to河北公路设施监测.go | 35 +++ ...监测_承德_轻量化特征数据.json | 5 +- consumers/HBJCAS/config.go | 4 +- consumers/consumerHBJCAS.go | 213 ++++++++++++++++++ consumers/consumerManage.go | 3 + utils/timeRange.go | 2 +- 6 files changed, 256 insertions(+), 6 deletions(-) create mode 100644 adaptors/安心云es主题特征to河北公路设施监测.go create mode 100644 consumers/consumerHBJCAS.go diff --git a/adaptors/安心云es主题特征to河北公路设施监测.go b/adaptors/安心云es主题特征to河北公路设施监测.go new file mode 100644 index 0000000..38a4e44 --- /dev/null +++ b/adaptors/安心云es主题特征to河北公路设施监测.go @@ -0,0 +1,35 @@ +package adaptors + +import ( + "encoding/json" + "goInOut/models" + "strings" +) + +// Adaptor_AXYES_HBGL 统一采集软件数据 转换 湘潭健康监测平台 +type Adaptor_AXYES_HBGL struct { + //传感器code转换信息 + GnssMap map[string]string + RainMap map[string]string + NBWYMap map[string]string + DXSWMap map[string]string + //一些必要信息 + Info map[string]string +} + +func (the Adaptor_AXYES_HBGL) Transform(rawMsg string) []NeedPush { + esAggTop := models.EsAggTop{} + var needPush []NeedPush + err := json.Unmarshal([]byte(rawMsg), &esAggTop) + if err != nil { + return nil + } + return needPush +} + +func (the Adaptor_AXYES_HBGL) EsAggTopToHBKS(esAggTop models.EsAggTop) (result []byte) { + + fileContent := strings.Builder{} + + return []byte(fileContent.String()) +} diff --git a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json index 1470d60..10dc0ed 100644 --- a/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json +++ b/configFiles/config_河北省公路基础设施监测_承德_轻量化特征数据.json @@ -5,7 +5,7 @@ "http": { "url": "https://esproxy.anxinyun.cn/anxincloud_themes/_search" }, - "cronStr": "48 0/1 * * *" + "cronStr": "16 0/1 * * *" }, "out": { "mqtt": { @@ -22,7 +22,6 @@ } }, "info": { - "common": { - } + "structureId": "5016" } } \ No newline at end of file diff --git a/consumers/HBJCAS/config.go b/consumers/HBJCAS/config.go index 6720049..7e4ae9d 100644 --- a/consumers/HBJCAS/config.go +++ b/consumers/HBJCAS/config.go @@ -12,8 +12,8 @@ type ioConfig struct { Out OUT `json:"out"` } type In struct { - Es config.EsConfig `json:"es"` - CronStr string `json:"cronStr"` + Http config.HttpConfig `json:"http"` + CronStr string `json:"cronStr"` } type OUT struct { diff --git a/consumers/consumerHBJCAS.go b/consumers/consumerHBJCAS.go new file mode 100644 index 0000000..82d4bd8 --- /dev/null +++ b/consumers/consumerHBJCAS.go @@ -0,0 +1,213 @@ +package consumers + +import ( + "encoding/json" + "fmt" + "goInOut/adaptors" + "goInOut/consumers/HBJCAS" + "goInOut/dbOperate" + "goInOut/monitors" + "goInOut/utils" + "log" + "time" +) + +type consumerHBJCAS struct { + //数据缓存管道 + ch chan []adaptors.NeedPush + //具体配置 + Info HBJCAS.ConfigFile + InHttp *dbOperate.HttpHelper + outMqtt *dbOperate.MqttHelper + monitor *monitors.CommonMonitor +} + +func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := json.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerHBJCAS) Initial(cfg string) error { + the.LoadConfigJson(cfg) + err := the.InputInitial() + if err != nil { + return err + } + err = the.OutputInitial() + return err +} +func (the *consumerHBJCAS) InputInitial() error { + the.ch = make(chan []adaptors.NeedPush, 200) + //数据入口 + the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} + the.monitor = &monitors.CommonMonitor{ + MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, + } + the.monitor.Start() + the.monitor.RegisterFun(the.getEsData) + return nil +} +func (the *consumerHBJCAS) OutputInitial() error { + //数据出口 + the.outMqtt = dbOperate.MqttInitial( + the.Info.IoConfig.Out.Mqtt.Host, + the.Info.IoConfig.Out.Mqtt.Port, + the.Info.IoConfig.Out.Mqtt.ClientId, + the.Info.IoConfig.Out.Mqtt.UserName, + the.Info.IoConfig.Out.Mqtt.Password, + false, //按照具体项目来 + "") + return nil +} +func (the *consumerHBJCAS) Work() { + go func() { + for { + needPushList := <-the.ch + if len(the.ch) > 0 { + log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) + } + + for _, push := range needPushList { + if push.Topic != "" { + the.outMqtt.Publish(push.Topic, push.Payload) + } + } + + time.Sleep(100 * time.Millisecond) + } + }() +} + +func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { + + return adaptors.Adaptor_AXYES_HBGL{} +} + +func (the *consumerHBJCAS) getEsData() { + structureId := the.getStructureId() + start, end := utils.GetTimeRangeByHour(-1) + log.Printf("查询数据时间范围 %s - %s", start, end) + //start := "2024-02-05T00:00:00.000+0800" + //end := "2024-02-05T23:59:59.999+0800" + + factorIds := []string{"15", "20", "28"} + for _, factorId := range factorIds { + esQuery := the.getESQueryStr(structureId, factorId, start, end) + auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) + + adaptor := the.getAdaptor() + needPush := adaptor.Transform(esAggResult) + if len(needPush) > 0 { + the.ch <- needPush + } + } +} + +func (the *consumerHBJCAS) getESQueryStr(structureId, factorId, start, end string) string { + aggSubSql := getEsAggSubSqlByFactorId(factorId) + esQuery := fmt.Sprintf(` +{ + "size": 0, + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %s + } + } + }, + { + "term": { + "factor": { + "value": %s + } + } + }, + { + "range": { + "collect_time": { + "gte": "%s", + "lte": "%s" + } + } + } + ] + } + }, + "aggs": { + "groupBySensorId": { + "terms": { + "field": "sensor" + }, + "aggs": { + "groupDate": { + "date_histogram": { + "field": "collect_time", + "interval": "hour", + "time_zone": "Asia/Shanghai" + }, + "aggs": %s + } + } + } + } +} +`, structureId, factorId, start, end, aggSubSql) + + return esQuery +} + +func getEsAggSubSqlByFactorId(factorId string) string { + //桥墩倾斜 15 支座位移20 桥面振动28 + subAggSQl := "" + switch factorId { + case "15": + subAggSQl = ` +{ + "x": { + "extended_stats": { + "field": "data.x" + } + }, + "y": { + "extended_stats": { + "field": "data.y" + } + } +}` + case "20": + subAggSQl = ` +{ + "displacement": { + "extended_stats": { + "field": "data.displacement" + } + } +}` + case "28": + subAggSQl = ` +{ + "trms": { + "extended_stats": { + "field": "data.trms" + } + } +}` + } + return subAggSQl +} + +func (the *consumerHBJCAS) getStructureId() string { + structureId, ok := the.Info.OtherInfo["structureId"] + if !ok { + structureId = "5016" //河北承德乃积沟大桥结构物id=5016 + } + return structureId +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index 68cb921..06d657b 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -31,6 +31,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerSinoGnssMySQL": consumer = new(consumerSinoGnssMySQL) + + case "consumerHBJCAS": + consumer = new(consumerHBJCAS) default: consumer = nil } diff --git a/utils/timeRange.go b/utils/timeRange.go index b5a10cd..9db1e08 100644 --- a/utils/timeRange.go +++ b/utils/timeRange.go @@ -15,7 +15,7 @@ func GetTimeRangeByMinute(durationMinute int) (start, stop string) { func GetTimeRangeByHour(durationHour int) (start, stop string) { start = time.Now().Add(time.Hour * time.Duration(durationHour)).Format("2006-01-02T15:00:00.000+08:00") - stop = time.Now().Format("2006-01-02T15:04:00.000+08:00") + stop = time.Now().Format("2006-01-02T15:00:00.000+08:00") return }