package consumers import ( "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/AXY_SK" "goInOut/dbOperate" "goInOut/dbOperate/_kafka" "goInOut/models" "goInOut/monitors" "gopkg.in/yaml.v3" "log" "sync" "time" ) type consumerAxySkAlarm struct { //数据缓存管道 dataCache chan *models.EsTheme alarmCache map[string]models.EsAlarm //具体配置 Info AXY_SK.ConfigFile InKafka _kafka.KafkaHelper OutEs dbOperate.ESHelper infoPg *dbOperate.DBHelper sinkMap sync.Map lock sync.Mutex logTagId int monitor *monitors.CommonMonitor //数据库配置信息 stationAlarmTrigger []AXY_SK.StationAlarmTrigger configAlarmTrigger []AXY_SK.AlarmTrigger historyStationAlarmMap map[string]AXY_SK.HistoryAlarm } func (the *consumerAxySkAlarm) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerAxySkAlarm) Initial(cfg string) error { the.sinkMap = sync.Map{} the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) err := the.inputInitial() if err != nil { return err } err = the.outputInitial() if err != nil { return err } err = the.infoComponentInitial() if err != nil { return err } the.loadHistoryRecord() err = the.monitorInitial() return err } func (the *consumerAxySkAlarm) inputInitial() error { //数据入口 the.InKafka = _kafka.KafkaHelper{ Brokers: the.Info.IoConfig.In.Kafka.Brokers, GroupId: the.Info.IoConfig.In.Kafka.GroupId, } the.InKafka.Initial() for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { the.InKafka.Subscribe(inTopic, the.onData) } the.InKafka.Worker() return nil } func (the *consumerAxySkAlarm) outputInitial() error { //数据出口 the.OutEs = *dbOperate.NewESHelper( the.Info.IoConfig.Out.Es.Address, the.Info.IoConfig.Out.Es.Auth.UserName, the.Info.IoConfig.Out.Es.Auth.Password, ) return nil } func (the *consumerAxySkAlarm) infoComponentInitial() error { //数据出口 pgConnStr := the.Info.QueryComponent.Pg.Connect the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) return nil } func (the *consumerAxySkAlarm) loadHistoryRecord() { the.historyStationAlarmMap = map[string]AXY_SK.HistoryAlarm{} } func (the *consumerAxySkAlarm) monitorInitial() error { the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { case "cron": the.monitor.RegisterTask(cron, the.updateTriggerConfig) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } //测试用 //the.updateTriggerConfig() return nil } func (the *consumerAxySkAlarm) updateTriggerConfig() { log.Printf("--> 定时 查询pg 更新 双控配置信息") sql := `SELECT at.* FROM t_alarm_trigger as at order by at.struct_id, at.factor_id,at.alarm_level asc;` err := the.infoPg.Query(&the.configAlarmTrigger, sql) if err != nil { log.Printf("查询数据库异常:err-> %s", err.Error()) return } for i, trigger := range the.configAlarmTrigger { the.configAlarmTrigger[i].ConditionArray = trigger.ConditionRaw } log.Printf("当前共 %d条 启用配置", len(the.configAlarmTrigger)) //立即触发 the.judgeSK() } func (the *consumerAxySkAlarm) updateTriggerStationConfig() { log.Printf("--> 定时 查询pg 更新 双控配置信息") sql := `SELECT at.*,s.id as station_id,s.name as station_name FROM t_alarm_trigger as at left join t_sensor s on at.struct_id=s.structure and at.factor_id=s.factor;` err := the.infoPg.Query(&the.stationAlarmTrigger, sql) if err != nil { log.Printf("查询数据库异常:err-> %s", err.Error()) return } for i, trigger := range the.stationAlarmTrigger { the.stationAlarmTrigger[i].ConditionArray = trigger.ConditionRaw } log.Printf("当前共 %d条 启用配置", len(the.stationAlarmTrigger)) //立即触发 the.judgeSK() } func (the *consumerAxySkAlarm) judgeSK() string { onceTriggerStationAlarmMap := map[string]string{} //同factorId_stationId 触发了高等级 不触发低等级(查询已排序) for i, trigger := range the.configAlarmTrigger { log.Printf("开始处理--> 第[%d] id=%d trigger配置", i, trigger.Id) //配置的结构物的监测因素 去查询 esSql := the.getEsAlarmTriggerQueryStr(trigger.StructId) alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql) if err != nil { log.Printf("es查询异常err -> %s", err.Error()) continue } stationAlarmMap := map[string]AXY_SK.StationAlarmGroup{} for _, alarm := range alarms { defaultStationAlarm := AXY_SK.StationAlarmGroup{} if v, ok := stationAlarmMap[alarm.SourceId]; ok { defaultStationAlarm = v } switch alarm.AlarmTypeCode { case "3007": defaultStationAlarm.Alarm3007 = &alarm case "3008": defaultStationAlarm.Alarm3008 = &alarm } stationAlarmMap[alarm.SourceId] = defaultStationAlarm } //判断是否满足告警 for sid, stationAlarmInfo := range stationAlarmMap { log.Printf("判断测点[%s] 是否满足双控告警", sid) if v, ok := onceTriggerStationAlarmMap[sid]; ok { log.Printf("测点[%s]本次已经触发过[%s],不再重复触发", sid, v) continue } isAlarm, level, detail := the.isRuleAlarm(trigger, stationAlarmInfo) println(isAlarm, level, detail) var alarmInfoTemplate *models.EsAlarm if stationAlarmInfo.Alarm3007 != nil { alarmInfoTemplate = stationAlarmInfo.Alarm3007 } if stationAlarmInfo.Alarm3008 != nil { alarmInfoTemplate = stationAlarmInfo.Alarm3008 } if isAlarm && alarmInfoTemplate != nil { conditionStr := fmt.Sprintf("st:%d,f:%d,level:%d", trigger.StructId, trigger.FactorId, level) onceTriggerStationAlarmMap[sid] = conditionStr //纪录历史告警 payload, now := the.skAlarmInfo(alarmInfoTemplate, level, detail) //判断历史有没有 if v, ok := the.historyStationAlarmMap[sid]; ok { if v.AlarmLevel > level { //低等级告警过滤 log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, v.AlarmLevel, level) continue } if !v.Time.After(now) { log.Printf("测点[%s]本次触发时刻[%s] 对比历史有效时刻[%s] 非新", sid, v.Time.Format("2006-01-02 15:04:05"), now.Format("2006-01-02 15:04:05")) continue } } the.historyStationAlarmMap[sid] = AXY_SK.HistoryAlarm{ SourceId: sid, Condition: conditionStr, AlarmLevel: level, Time: now, } the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) } else { payload := the.skAlarmElimination(alarmInfoTemplate, level, detail) the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) } } } return "" } func (the *consumerAxySkAlarm) skAlarmInfo(alarmInfoTemplate *models.EsAlarm, level int, detail string) ([]byte, time.Time) { now := time.Now() alarmMsg := models.KafkaAlarm{ MessageMode: "AlarmGeneration", StructureId: alarmInfoTemplate.StructureId, StructureName: "", SourceId: alarmInfoTemplate.SourceId, SourceName: alarmInfoTemplate.SourceName, AlarmTypeCode: "3077", AlarmCode: fmt.Sprintf("3077000%d", level), Content: detail, Time: now.Format("2006-01-02T15:04:05+0800"), SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 Sponsor: "goInOut_axySkAlarm", Extras: nil, SubDevices: nil, } payload, _ := json.Marshal(alarmMsg) return payload, now } func (the *consumerAxySkAlarm) skAlarmElimination(alarmInfoTemplate *models.EsAlarm, level int, detail string) []byte { alarmMsg := models.KafkaAlarm{ MessageMode: "AlarmAutoElimination", StructureId: alarmInfoTemplate.StructureId, StructureName: "", SourceId: alarmInfoTemplate.SourceId, SourceName: alarmInfoTemplate.SourceName, AlarmTypeCode: "3077", AlarmCode: fmt.Sprintf("3077000%d", level), Content: "", Time: time.Now().Format("2006-01-02T15:04:05+0800"), SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 Sponsor: "goInOut_axySkAlarm", Extras: nil, SubDevices: nil, } payload, _ := json.Marshal(alarmMsg) return payload } func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationAlarm AXY_SK.StationAlarmGroup) (bool, int, string) { level := 0 detail := "" //3007和3008都要有 if trigger.Rule == 0 { isAlarm := true for _, conditionInt := range trigger.ConditionArray { switch conditionInt { case 0: if stationAlarm.Alarm3007 == nil || stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel { isAlarm = false } if isAlarm { if len(detail) > 0 { detail += "且 " } detail += stationAlarm.Alarm3007.Detail } case 1: if stationAlarm.Alarm3008 == nil || stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel { isAlarm = false } if isAlarm { if len(detail) > 0 { detail += "且 " } detail += stationAlarm.Alarm3008.Detail } } } if isAlarm { level = trigger.AlarmLevel } return isAlarm, level, detail } //3007和3008 任何一个 if trigger.Rule == 1 { isAlarm := false for _, conditionInt := range trigger.ConditionArray { switch conditionInt { case 0: if stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel { isAlarm = true detail += stationAlarm.Alarm3007.Detail } case 1: if stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel { isAlarm = true detail += stationAlarm.Alarm3008.Detail } } if isAlarm { level = trigger.AlarmLevel break } } return isAlarm, level, detail } return false, level, detail } func (the *consumerAxySkAlarm) getEsAlarmTriggerQueryStr(structId int) string { esQuery := fmt.Sprintf(` { "query": { "bool": { "must": [ { "term": { "structure_id": { "value": %d } } }, { "term": { "isTriggerPart": { "value": true } } }, { "terms": { "state": [ 0, 1, 2 ] } } ] } } }`, structId) return esQuery } func (the *consumerAxySkAlarm) sinkTask() { intervalSec := the.Info.IoConfig.Out.Es.Interval ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() for { <-ticker.C the.toSink() } } func (the *consumerAxySkAlarm) toSink() { var themes []models.EsTheme the.lock.Lock() defer the.lock.Unlock() the.sinkMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsTheme); ok { themes = append(themes, *v) //零时打日志用 if v.Sensor == the.logTagId { bs, _ := json.Marshal(v) log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } return ok } else { log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) } return true }) if len(themes) > 0 { index := the.Info.IoConfig.Out.Es.Index log.Printf("写入es [%s] %d条", index, len(themes)) the.OutEs.BulkWriteThemes2Es(index, themes) the.sinkMap.Clear() } } func (the *consumerAxySkAlarm) Work() { log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) go the.sinkTask() go func() { for { pushEsTheme := <-the.dataCache if pushEsTheme.Sensor == the.logTagId { bs, _ := json.Marshal(pushEsTheme) log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } //有效数据存入缓存 the.lock.Lock() the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) the.lock.Unlock() } }() } func (the *consumerAxySkAlarm) onData(topic string, msg string) bool { //if len(msg) > 80 { // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) //} adaptor := adaptors.Adaptor_Savoir_LastTheme{} needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { the.dataCache <- needPush } else { s, _ := json.Marshal(needPush) if needPush != nil { if needPush.Sensor == the.logTagId { log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) } } } return true }