package consumers import ( "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/AXY_SK" "goInOut/dbOperate" "goInOut/dbOperate/_kafka" "goInOut/models" "goInOut/monitors" "gopkg.in/yaml.v3" "log" "strconv" "sync" "time" ) type consumerAxySkAlarm struct { //数据缓存管道 dataCache chan *models.EsTheme alarmCache map[string]models.EsAlarm //具体配置 Info AXY_SK.ConfigFile InEs dbOperate.ESHelper OutKafka _kafka.KafkaHelper infoPg *dbOperate.DBHelper sinkMap sync.Map lock sync.Mutex logTagId int monitor *monitors.CommonMonitor //数据库配置信息 stationAlarmTrigger []AXY_SK.StationAlarmTrigger configAlarmTrigger []AXY_SK.AlarmTrigger historyStationAlarmMap map[string]AXY_SK.HistoryAlarm } func (the *consumerAxySkAlarm) LoadConfigJson(cfgStr string) { // 将 JSON 格式的数据解析到结构体中 err := yaml.Unmarshal([]byte(cfgStr), &the.Info) if err != nil { log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) panic(err) } } func (the *consumerAxySkAlarm) Initial(cfg string) error { the.sinkMap = sync.Map{} the.dataCache = make(chan *models.EsTheme, 1000) the.LoadConfigJson(cfg) err := the.inputInitial() if err != nil { return err } err = the.outputInitial() if err != nil { return err } err = the.infoComponentInitial() if err != nil { return err } err = the.monitorInitial() return err } func (the *consumerAxySkAlarm) inputInitial() error { //数据入口 the.OutKafka = _kafka.KafkaHelper{ Brokers: the.Info.IoConfig.Out.Kafka.Brokers, GroupId: the.Info.IoConfig.Out.Kafka.GroupId, } the.OutKafka.Initial() return nil } func (the *consumerAxySkAlarm) outputInitial() error { //数据出口 the.InEs = *dbOperate.NewESHelper( the.Info.IoConfig.In.Es.Address, the.Info.IoConfig.In.Es.Auth.UserName, the.Info.IoConfig.In.Es.Auth.Password, ) return nil } func (the *consumerAxySkAlarm) infoComponentInitial() error { //数据出口 pgConnStr := the.Info.QueryComponent.Pg.Connect the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) return nil } func (the *consumerAxySkAlarm) monitorInitial() error { the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, } the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { case "cron": the.monitor.RegisterTask(cron, the.updateTriggerStationConfig) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } //测试用 //the.updateTriggerStationConfig() return nil } func (the *consumerAxySkAlarm) updateTriggerStationConfig() { log.Printf("--> 定时 查询pg 更新 双控配置信息") sql := `SELECT at.*, s.id as station_id, s.name as station_name, ts.name as struct_name FROM t_alarm_trigger as at left join t_sensor s on at.struct_id = s.structure and at.factor_id = s.factor left join t_structure ts on at.struct_id = ts.id order by s.id, at.alarm_level;` err := the.infoPg.Query(&the.stationAlarmTrigger, sql) if err != nil { log.Printf("查询数据库异常:err-> %s", err.Error()) return } the.configAlarmTrigger = []AXY_SK.AlarmTrigger{} previousTag := "" for i, trigger := range the.stationAlarmTrigger { the.stationAlarmTrigger[i].ConditionArray = trigger.ConditionRaw triggerTag := fmt.Sprintf("%d_%d_%d", trigger.StructId, trigger.FactorId, trigger.AlarmLevel) if previousTag != triggerTag { previousTag = triggerTag the.configAlarmTrigger = append(the.configAlarmTrigger, AXY_SK.AlarmTrigger{ Id: trigger.Id, StructId: trigger.StructId, FactorId: trigger.FactorId, AlarmLevel: trigger.AlarmLevel, ConditionRaw: trigger.ConditionRaw, ConditionArray: trigger.ConditionRaw, Rule: trigger.Rule, StationIds: []int{trigger.StationId}, StructName: trigger.StructName, }) } else { the.configAlarmTrigger[len(the.configAlarmTrigger)-1].StationIds = append(the.configAlarmTrigger[len(the.configAlarmTrigger)-1].StationIds, trigger.StationId) } } log.Printf("当前共 %d条 启用配置", len(the.configAlarmTrigger)) //立即触发 the.judgeSK() } func (the *consumerAxySkAlarm) judgeSK() string { onceTriggerStationAlarmMap := map[string]string{} //同factorId_stationId 触发了高等级 不触发低等级(查询已排序) for i, trigger := range the.configAlarmTrigger { log.Printf("开始处理--> 第[%d]配置 id=%d trigger", i, trigger.Id) if len(trigger.StationIds) == 0 { continue } the.updateEsAlarmTriggerHistory(trigger.StructId) //配置的结构物的监测因素 去查询 esSql := the.getEsAlarmTriggerPartQueryStrByStationIds(trigger.StructId, trigger.StationIds) alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql) if err != nil { log.Printf("es查询异常err -> %s", err.Error()) continue } stationAlarmMap := map[string]AXY_SK.StationAlarmGroup{} for _, alarm := range alarms { defaultStationAlarm := AXY_SK.StationAlarmGroup{} if v, ok := stationAlarmMap[alarm.SourceId]; ok { defaultStationAlarm = v } switch alarm.AlarmTypeCode { case "3007": defaultStationAlarm.Alarm3007 = &alarm case "3008": defaultStationAlarm.Alarm3008 = &alarm } stationAlarmMap[alarm.SourceId] = defaultStationAlarm } //判断是否满足告警 for sid, stationAlarmInfo := range stationAlarmMap { if v, ok := onceTriggerStationAlarmMap[sid]; ok { log.Printf("测点[%s]本次已经触发过[%s],不再重复触发", sid, v) continue } isAlarm, level, detail, alarmTime, triggerTypeCodes := the.isRuleAlarm(trigger, stationAlarmInfo) log.Printf("trigger.Id=%d, 测点[%s]判断双控 告警[%v] level=%d", trigger.Id, sid, isAlarm, level) var alarmInfoTemplate *models.EsAlarm if stationAlarmInfo.Alarm3007 != nil { alarmInfoTemplate = stationAlarmInfo.Alarm3007 } if stationAlarmInfo.Alarm3008 != nil { alarmInfoTemplate = stationAlarmInfo.Alarm3008 } if isAlarm && alarmInfoTemplate != nil { //判断历史有没有 isHisAlarm, hisK := the.isHistoryAlarm(sid, level, alarmTime) if isHisAlarm { continue } conditionStr := fmt.Sprintf("st:%d,f:%d,level:%d", trigger.StructId, trigger.FactorId, level) onceTriggerStationAlarmMap[sid] = conditionStr //纪录历史告警 log.Printf("trigger.Id=%d, 测点[%s]本次触发双控,%s,时刻[%s]level=%d", trigger.Id, sid, hisK, alarmTime.Format("2006-01-02 15:04:05.000"), level) the.historyStationAlarmMap[hisK] = AXY_SK.HistoryAlarm{ SourceId: sid, AlarmLevel: level, Time: alarmTime, } payload := the.skAlarmInfo(alarmInfoTemplate, level, detail, alarmTime, triggerTypeCodes, trigger.StructName) the.OutKafka.Publish(the.Info.IoConfig.Out.Kafka.AlarmTopic, payload) } else { hisK := fmt.Sprintf("%s_%s", sid, fmt.Sprintf("3077000%d", trigger.AlarmLevel)) if level == 0 { level = trigger.AlarmLevel } if v, ok := the.historyStationAlarmMap[hisK]; ok { log.Printf("trigger.Id=%d, 测点[%s]本次 恢复双控%s,时刻[%s]level=%d", trigger.Id, sid, hisK, v.Time.Format("2006-01-02 15:04:05.000"), level) payload := the.skAlarmElimination(alarmInfoTemplate, level, detail, trigger.StructName) the.OutKafka.Publish(the.Info.IoConfig.Out.Kafka.AlarmTopic, payload) delete(the.historyStationAlarmMap, hisK) } } } //无告警的测点,需要恢复 var noAlarmStationIds []string var alarmStationIdMap map[string]string for _, alarm := range alarms { alarmStationIdMap[alarm.SourceId] = alarm.AlarmCode } for _, stationId := range trigger.StationIds { stationIdStr := strconv.Itoa(stationId) if _, ok := alarmStationIdMap[stationIdStr]; !ok { noAlarmStationIds = append(noAlarmStationIds, stationIdStr) } } //恢复无告警的双控测点 log.Printf("无告警,待恢复的测点ids==%v", noAlarmStationIds) for _, stationId := range noAlarmStationIds { content := "原始告警都已恢复" payload := the.skAlarmAutoElimination(trigger.StructId, trigger.StructName, stationId, "", trigger.AlarmLevel, content) the.OutKafka.Publish(the.Info.IoConfig.Out.Kafka.AlarmTopic, payload) hisK := fmt.Sprintf("%s_%s", stationId, fmt.Sprintf("3077000%d", trigger.AlarmLevel)) delete(the.historyStationAlarmMap, hisK) } } return "" } func (the *consumerAxySkAlarm) isHistoryAlarm(sourceId string, level int, alarmTime time.Time) (bool, string) { isHis := false //特别注意 告警进程:产生一级后,后续触发二级 也会算到一级头上,所有双控产生二级 如果二级不存在 要往上高等级判断 rawK := fmt.Sprintf("%s_%s", sourceId, fmt.Sprintf("3077000%d", level)) hisK := rawK for i := level; i >= 1; i-- { hisK = fmt.Sprintf("%s_%s", sourceId, fmt.Sprintf("3077000%d", i)) if hv, ok := the.historyStationAlarmMap[hisK]; ok { if !alarmTime.After(hv.Time) { log.Printf("测点[%s]本次触发时刻[%s]%s(实际es=%s) 对比历史有效时刻[%s] 非新", sourceId, alarmTime.Format("2006-01-02 15:04:05.000"), rawK, hisK, hv.Time.Format("2006-01-02 15:04:05.000"), ) isHis = true continue } if hv.AlarmLevel < level { //低等级告警过滤 log.Printf("测点[%s]本次触发 %s(实际es=%s) 低于 历史有效等级%d,历史时刻%s,不再重复触发", sourceId, rawK, hisK, hv.AlarmLevel, hv.Time.Format("2006-01-02 15:04:05.000")) isHis = true continue } } } return isHis, hisK } func (the *consumerAxySkAlarm) skAlarmInfo(alarmInfoTemplate *models.EsAlarm, level int, detail string, alarmTime time.Time, triggerTypeCodes []string, structName string) []byte { alarmMsg := models.KafkaAlarm{ MessageMode: "AlarmGeneration", StructureId: alarmInfoTemplate.StructureId, StructureName: structName, SourceId: alarmInfoTemplate.SourceId, SourceName: alarmInfoTemplate.SourceName, AlarmTypeCode: "3077", AlarmCode: fmt.Sprintf("3077000%d", level), Content: detail, Time: alarmTime.Add(8 * time.Hour).Format("2006-01-02T15:04:05.000+0800"), SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 Sponsor: "goInOut_axySkAlarm", Extras: nil, SubDevices: triggerTypeCodes, } payload, _ := json.Marshal(alarmMsg) return payload } func (the *consumerAxySkAlarm) skAlarmElimination(alarmInfoTemplate *models.EsAlarm, level int, detail string, structName string) []byte { alarmMsg := models.KafkaAlarm{ MessageMode: "AlarmAutoElimination", StructureId: alarmInfoTemplate.StructureId, StructureName: structName, SourceId: alarmInfoTemplate.SourceId, SourceName: alarmInfoTemplate.SourceName, AlarmTypeCode: "3077", AlarmCode: fmt.Sprintf("3077000%d", level), Content: "", Time: time.Now().Format("2006-01-02T15:04:05.000+0800"), SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 Sponsor: "goInOut_axySkAlarm", Extras: nil, SubDevices: nil, } payload, _ := json.Marshal(alarmMsg) return payload } func (the *consumerAxySkAlarm) skAlarmAutoElimination(structureId int, structName string, sourceId, sourceName string, level int, content string) []byte { alarmMsg := models.KafkaAlarm{ MessageMode: "AlarmAutoElimination", StructureId: structureId, StructureName: structName, SourceId: sourceId, SourceName: sourceName, AlarmTypeCode: "3077", AlarmCode: fmt.Sprintf("3077000%d", level), Content: content, Time: time.Now().Format("2006-01-02T15:04:05.000+0800"), SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 Sponsor: "goInOut_axySkAlarm", Extras: nil, SubDevices: nil, } payload, _ := json.Marshal(alarmMsg) return payload } func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationAlarm AXY_SK.StationAlarmGroup) (bool, int, string, time.Time, []string) { level := 0 detail := "" dt := time.Time{} var triggerTypeCodes []string //3007和3008都要有 if trigger.Rule == 0 { isAlarm := true for _, conditionInt := range trigger.ConditionArray { switch conditionInt { case 0: if stationAlarm.Alarm3007 == nil || stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel { isAlarm = false detail += fmt.Sprintf("测量值恢复正常") } if isAlarm { if len(detail) > 0 { detail += "且 " } if stationAlarm.Alarm3007.EndTime.After(dt) { dt = stationAlarm.Alarm3007.EndTime } detail += stationAlarm.Alarm3007.Detail triggerTypeCodes = append(triggerTypeCodes, stationAlarm.Alarm3007.AlarmTypeCode) } case 1: if stationAlarm.Alarm3008 == nil || stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel { isAlarm = false detail += fmt.Sprintf("变化速率恢复正常") } if isAlarm { if len(detail) > 0 { detail += "且 " } detail += stationAlarm.Alarm3008.Detail if stationAlarm.Alarm3008.EndTime.After(dt) { dt = stationAlarm.Alarm3008.EndTime } triggerTypeCodes = append(triggerTypeCodes, stationAlarm.Alarm3008.AlarmTypeCode) } } } if isAlarm { level = trigger.AlarmLevel } return isAlarm, level, detail, dt, triggerTypeCodes } //3007和3008 任何一个 if trigger.Rule == 1 { isAlarm := false for _, conditionInt := range trigger.ConditionArray { switch conditionInt { case 0: if stationAlarm.Alarm3007 != nil && stationAlarm.Alarm3007.CurrentLevel <= trigger.AlarmLevel { isAlarm = true detail += stationAlarm.Alarm3007.Detail if stationAlarm.Alarm3007.EndTime.After(dt) { dt = stationAlarm.Alarm3007.EndTime } triggerTypeCodes = append(triggerTypeCodes, stationAlarm.Alarm3007.AlarmTypeCode) } case 1: if stationAlarm.Alarm3008 != nil && stationAlarm.Alarm3008.CurrentLevel <= trigger.AlarmLevel { isAlarm = true detail += stationAlarm.Alarm3008.Detail if stationAlarm.Alarm3008.EndTime.After(dt) { dt = stationAlarm.Alarm3008.EndTime } triggerTypeCodes = append(triggerTypeCodes, stationAlarm.Alarm3008.AlarmTypeCode) } } if isAlarm { level = trigger.AlarmLevel break } } return isAlarm, level, detail, dt, triggerTypeCodes } return false, level, detail, dt, triggerTypeCodes } func (the *consumerAxySkAlarm) updateEsAlarmTriggerHistory(structId int) { esSql := the.getEsAlarmTriggerHistoryQueryStr(structId) alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql) if err != nil { log.Printf("结构物[%d] 查询历史有效AlarmTrigger 异常=>%s", structId, err.Error()) } the.historyStationAlarmMap = make(map[string]AXY_SK.HistoryAlarm) for _, alarm := range alarms { hisK := fmt.Sprintf("%s_%s", alarm.SourceId, alarm.AlarmCode) the.historyStationAlarmMap[hisK] = AXY_SK.HistoryAlarm{ SourceId: alarm.SourceId, Time: alarm.EndTime, AlarmLevel: alarm.CurrentLevel, } log.Printf("加载双控历史告警 hisK=%s,time=%v", hisK, alarm.EndTime.Format("2006-01-02T15:04:05.000+0800")) } log.Printf("双控历史告警 共 %d条", len(the.historyStationAlarmMap)) } func (the *consumerAxySkAlarm) getEsAlarmTriggerHistoryQueryStr(structId int) string { esQuery := fmt.Sprintf(` { "size": 100, "query": { "bool": { "must": [ { "term": { "structure_id": { "value": %d } } }, { "term": { "alarm_type_code": { "value": "3077" } } }, { "terms": { "state": [ 0, 1, 2 ] } } ] } } }`, structId) return esQuery } func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStr(structId int) string { esQuery := fmt.Sprintf(` { "size": 100, "query": { "bool": { "must": [ { "term": { "structure_id": { "value": %d } } }, { "term": { "isTriggerPart": { "value": true } } }, { "terms": { "state": [ 0, 1, 2 ] } } ] } } }`, structId) return esQuery } func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStrByStationIds(structId int, stationIds []int) string { s, _ := json.Marshal(stationIds) sourceIds := string(s) esQuery := fmt.Sprintf(` { "size": 100, "query": { "bool": { "must": [ { "term": { "structure_id": { "value": %d } } }, { "term": { "isTriggerPart": { "value": true } } }, { "terms": { "state": [ 0, 1, 2 ] } }, { "terms": { "source_id": %s } } ] } } }`, structId, sourceIds) return esQuery } func (the *consumerAxySkAlarm) sinkTask() { intervalSec := the.Info.IoConfig.In.Es.Interval ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() for { <-ticker.C the.toSink() } } func (the *consumerAxySkAlarm) toSink() { var themes []models.EsTheme the.lock.Lock() defer the.lock.Unlock() the.sinkMap.Range(func(key, value any) bool { if v, ok := value.(*models.EsTheme); ok { themes = append(themes, *v) //零时打日志用 if v.Sensor == the.logTagId { bs, _ := json.Marshal(v) log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } return ok } else { log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) } return true }) if len(themes) > 0 { index := the.Info.IoConfig.In.Es.Index log.Printf("写入es [%s] %d条", index, len(themes)) the.InEs.BulkWriteThemes2Es(index, themes) the.sinkMap.Clear() } } func (the *consumerAxySkAlarm) Work() { log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) go the.sinkTask() go func() { for { pushEsTheme := <-the.dataCache if pushEsTheme.Sensor == the.logTagId { bs, _ := json.Marshal(pushEsTheme) log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) } //有效数据存入缓存 the.lock.Lock() the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) the.lock.Unlock() } }() } func (the *consumerAxySkAlarm) onData(topic string, msg string) bool { adaptor := adaptors.Adaptor_Savoir_LastTheme{} needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { the.dataCache <- needPush } else { s, _ := json.Marshal(needPush) if needPush != nil { if needPush.Sensor == the.logTagId { log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) } } } return true }