package consumers import ( "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/AlarmCombination" "goInOut/dbOperate" "goInOut/dbOperate/_kafka" "goInOut/monitors" "gopkg.in/yaml.v3" "log" "time" ) type consumerAlarmCombination struct { //数据缓存管道 ch chan []adaptors.NeedPush //具体配置 Info AlarmCombination.ConfigFile InHttp *dbOperate.HttpHelper outKafka *_kafka.KafkaHelper monitor *monitors.CommonMonitor infoRedis *dbOperate.RedisHelper infoPg *dbOperate.DBHelper //数据库配置信息 combinationInfo []AlarmCombination.CombinationInfo } func (the *consumerAlarmCombination) LoadConfigJson(cfgStr string) { // 将 yaml 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerAlarmCombination) Initial(cfg string) error { the.LoadConfigJson(cfg) err := the.InputInitial() if err != nil { return err } err = the.OutputInitial() if err != nil { return err } err = the.infoComponentInitial() return err } func (the *consumerAlarmCombination) InputInitial() error { the.ch = make(chan []adaptors.NeedPush, 200) //数据入口 the.InHttp = &dbOperate.HttpHelper{Url: the.Info.IoConfig.In.Http.Url, Token: ""} the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { case "cron": the.monitor.RegisterTask(cron, the.updateCombinationInfo) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } return nil } func (the *consumerAlarmCombination) OutputInitial() error { //数据出口 the.outKafka = _kafka.KafkaInitial( the.Info.IoConfig.Out.Kafka.Brokers, the.Info.IoConfig.Out.Kafka.GroupId) return nil } func (the *consumerAlarmCombination) infoComponentInitial() error { //数据出口 pgConnStr := the.Info.QueryComponent.Pg.Connect the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) return nil } func (the *consumerAlarmCombination) Work() { go func() { for { needPushList := <-the.ch if len(the.ch) > 0 { log.Printf("取出ch数据,剩余[%d] ", len(the.ch)) } for _, push := range needPushList { if push.Topic != "" { the.outKafka.Publish(push.Topic, push.Payload) continue } //没有标记topic 的 按照配置文件里面的推送 for _, topic := range the.Info.IoConfig.Out.Kafka.Topics { the.outKafka.Publish(topic, push.Payload) } } time.Sleep(100 * time.Millisecond) } }() } func (the *consumerAlarmCombination) getAdaptor() (adaptor adaptors.Adaptor_ZWY_AlarmCombin) { return adaptors.Adaptor_ZWY_AlarmCombin{} } func (the *consumerAlarmCombination) getEs1minAlarmData() { adaptor := the.getAdaptor() for _, comInfos := range the.combinationInfo { //满足条件 sourceTypeId: Int = 1, // 0:DTU, 1:传感器, 2:测点 //state 0 新告警 1次数更新 2 等级提升 3 自动恢复 4 手动恢复 5 已恢复待确认 esQuery := the.getESAlarmAggQueryStr(comInfos) auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) lenRes := len(esAggResultStr) if lenRes < 120 { log.Printf("[s=%d]es agg 返回 无 有效数据,resp=%s ", comInfos.StructId, esAggResultStr) continue } needPushes := adaptor.Transform(comInfos, esAggResultStr) if len(needPushes) > 0 { the.ch <- needPushes } } } func (the *consumerAlarmCombination) updateCombinationInfo() { log.Printf("更新 数据库 组合配置信息") sql := `SELECT a.*,s."name" as struct_name FROM "t_alarm_combination" as a left join t_structure as s ON a.struct_id=s."id"` //sql += " where a.id=1" //测试用 err := the.infoPg.Query(&the.combinationInfo, sql) if err != nil { log.Printf("查询数据库异常:err-> %s", err.Error()) return } for i, info := range the.combinationInfo { err := json.Unmarshal([]byte(info.ConfigStr), &info.ConfigItems) if err != nil { return } the.combinationInfo[i].ConfigItems = info.ConfigItems } log.Printf("共刷新%d条配置", len(the.combinationInfo)) //立即触发 the.getEs1minAlarmData() } func (the *consumerAlarmCombination) getESAlarmAggQueryStr(onceConfig AlarmCombination.CombinationInfo) string { structureId := onceConfig.StructId var allStationId []int for _, item := range onceConfig.ConfigItems { if len(item.StationIds) > 0 { allStationId = append(allStationId, item.StationIds...) } } allStationIdStr, _ := json.Marshal(allStationId) esQuery := fmt.Sprintf(` { "size": 0, "query": { "bool": { "must": [ { "term": { "structure_id": { "value": %d } } }, { "term": { "source_type_id": { "value": 2 } } }, { "terms": { "state": [ 0, 1, 2 ] } }, { "terms": { "source_id": %s } } ] } }, "aggs": { "groupBySensor": { "terms": { "field": "source_id", "order": { "_count": "DESC" } } } } } `, structureId, allStationIdStr) return esQuery } func (the *consumerAlarmCombination) getStructureId() string { structureId, ok := the.Info.OtherInfo["structureId"] if !ok { log.Panicf("无法识别有效的structureId") } return structureId }