package consumers import ( "fmt" "goInOut/adaptors" "goInOut/consumers/GDJKJC" "goInOut/dbOperate" "goInOut/monitors" "goInOut/utils" "gopkg.in/yaml.v3" "log" "strconv" "time" ) type consumerAXYES2GDJKJC struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 ConfigInfo GDJKJC.ConfigFile InHttp *dbOperate.HttpHelper outHttp *dbOperate.HttpHelper monitor *monitors.CommonMonitor infoRedis *dbOperate.RedisHelper } func (the *consumerAXYES2GDJKJC) LoadConfigJson(cfgStr string) { err := yaml.Unmarshal([]byte(cfgStr), &the.ConfigInfo) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerAXYES2GDJKJC) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerAXYES2GDJKJC) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for taskName, cron := range the.ConfigInfo.Monitor { switch taskName { case "cron10min": the.monitor.RegisterTask(cron, the.GetEs10minAggData) case "cron1hour": the.monitor.RegisterTask(cron, the.getEs1HourAggData) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } return nil } func (the *consumerAXYES2GDJKJC) OutputInitial() error { //数据出口 the.outHttp = &dbOperate.HttpHelper{Url: the.ConfigInfo.IoConfig.Out.Http.Url, Token: ""} return nil } func (the *consumerAXYES2GDJKJC) infoComponentInitial() error { //数据出口 addr := the.ConfigInfo.QueryComponent.Redis.Address the.infoRedis = dbOperate.NewRedisHelper("", addr) return nil } func (the *consumerAXYES2GDJKJC) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { structIdStr := push.Topic if _, ok := the.ConfigInfo.Info[structIdStr]; !ok { log.Printf("structId=%s 无匹配的省平台AppKeySecret", structIdStr) continue } appKey := the.ConfigInfo.Info[structIdStr].AppKey appSecret := the.ConfigInfo.Info[structIdStr].AppSecret rnd := strconv.FormatInt(time.Now().Unix(), 10) Header := map[string]string{ "appKey": appKey, "rnd": rnd, "sign": utils.GetSign(string(push.Payload), rnd, appKey, appSecret), } log.Printf("推送数据=%s", push.Payload) _, err := the.outHttp.PublishWithHeader(push.Payload, Header) if err != nil { log.Printf("数据推送异常=> %s", err.Error()) return } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerAXYES2GDJKJC) getAdaptor() (adaptor adaptors.Adaptor_AXYES_GDJKJC) { return adaptors.Adaptor_AXYES_GDJKJC{ Redis: the.infoRedis, } } func (the *consumerAXYES2GDJKJC) getStructIds() []int64 { var structIds []int64 for strutId, _ := range the.ConfigInfo.StructInfo { structIds = append(structIds, strutId) } return structIds } func (the *consumerAXYES2GDJKJC) getEs1HourAggData() { start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询数据时间范围 %s - %s", start, end) hourFactorIds := []int{15} //[]int{11, 15, 18, 20} //应变11 桥墩倾斜15 裂缝监测18 支护结构变形63 structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range hourFactorIds { esQuery := the.getESQueryStrByHour(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) lenRes := len(esAggResultStr) if lenRes < 250 { log.Printf("[s=%d,f=%d],es agg 返回无数据", structId, factorId) continue } adaptor := the.getAdaptor() adaptor.PointInfo = the.ConfigInfo.PointInfo adaptor.StructInfo = the.ConfigInfo.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerAXYES2GDJKJC) GetEs10minAggData() { //utils.GetTimeRangeBy10min() 由于振动数据实时性问题 改用一小时统一上报 start, end := utils.GetTimeRangeByHour(-1) log.Printf("查询10min数据时间范围 %s - %s", start, end) factorIds := []int{28} //桥面振动 28 structIds := the.getStructIds() for _, structId := range structIds { for _, factorId := range factorIds { esQuery := the.getESQueryStrBy10min(structId, factorId, start, end) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) adaptor := the.getAdaptor() adaptor.PointInfo = the.ConfigInfo.PointInfo adaptor.StructInfo = the.ConfigInfo.StructInfo needPushes := adaptor.Transform(structId, factorId, esAggResultStr) if len(needPushes) > 0 { the.ch <- needPushes } } } } func (the *consumerAXYES2GDJKJC) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lt": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "1h", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery } func (the *consumerAXYES2GDJKJC) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure": { "value": %d } } }, { "term": { "factor": { "value": %d } } }, { "range": { "collect_time": { "gte": "%s", "lte": "%s" } } } ] } }, "aggs": { "groupSensor": { "terms": { "field": "sensor" }, "aggs": { "groupDate": { "date_histogram": { "field": "collect_time", "interval": "10m", "time_zone": "Asia/Shanghai", "min_doc_count": 1 }, "aggs": %s } } } } } `, structureId, factorId, start, end, aggSubSql) return esQuery }