package consumers import ( "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/HBJCAS" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "log" "time" ) type consumerHBJCAS struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info HBJCAS.ConfigFile InHttp *dbOperate.HttpHelper outMqtt *dbOperate.MqttHelper monitor *monitors.CommonMonitor } func (the *consumerHBJCAS) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := json.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerHBJCAS) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() return err } func (the *consumerHBJCAS) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{CronStr: the.Info.IoConfig.In.CronStr}, } the.monitor.Start() the.monitor.RegisterFun(the.getEsData) return nil } func (the *consumerHBJCAS) OutputInitial() error { //数据出口 the.outMqtt = dbOperate.MqttInitial( the.Info.IoConfig.Out.Mqtt.Host, the.Info.IoConfig.Out.Mqtt.Port, the.Info.IoConfig.Out.Mqtt.ClientId, the.Info.IoConfig.Out.Mqtt.UserName, the.Info.IoConfig.Out.Mqtt.Password, false, //按照具体项目来 "") return nil } func (the *consumerHBJCAS) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { if push.Topic != "" { the.outMqtt.Publish(push.Topic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerHBJCAS) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { return adaptors.Adaptor_AXYES_HBGL{} } func (the *consumerHBJCAS) getEsData() { structureId := the.getStructureId() start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) //start := "2024-02-05T00:00:00.000+0800" //end := "2024-02-05T23:59:59.999+0800" factorIds := []string{"15", "20", "28"} for _, factorId := range factorIds { esQuery := the.getESQueryStr(structureId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResult := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() needPush := adaptor.Transform(esAggResult) if len(needPush) > 0 { the.ch <- needPush } } } func (the *consumerHBJCAS) getESQueryStr(structureId, factorId, start, end string) string { aggSubSql := getEsAggSubSqlByFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %s } } }, { "term": { "factor": { "value": %s } } }, { "range": { "collect_time": { "gte": "%s", "lte": "%s" } } } ] } }, "aggs": { "groupBySensorId": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "hour", "time_zone": "Asia/Shanghai" }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func getEsAggSubSqlByFactorId(factorId string) string { //桥墩倾斜 15 支座位移20 桥面振动28 subAggSQl := "" switch factorId { case "15": subAggSQl = ` { "x": { "extended_stats": { "field": "data.x" } }, "y": { "extended_stats": { "field": "data.y" } } }` case "20": subAggSQl = ` { "displacement": { "extended_stats": { "field": "data.displacement" } } }` case "28": subAggSQl = ` { "trms": { "extended_stats": { "field": "data.trms" } } }` } return subAggSQl } func (the *consumerHBJCAS) getStructureId() string { structureId, ok := the.Info.OtherInfo["structureId"] if !ok { structureId = "5016" //河北承德乃积沟大桥结构物id=5016 } return structureId }