From 7e43bf57f53ddc67f92b57e821ba2cb6340de489 Mon Sep 17 00:00:00 2001 From: lucas Date: Fri, 29 Aug 2025 14:19:16 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=9B=B4=E6=96=B0=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configFiles/config_安心云告警双控.yaml | 4 +- consumers/AXY_SK/dataModel.go | 1 - consumers/consumerAxySkAlarm.go | 79 +++++++++++++++---- 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/configFiles/config_安心云告警双控.yaml b/configFiles/config_安心云告警双控.yaml index a3eb178..f3e2b4f 100644 --- a/configFiles/config_安心云告警双控.yaml +++ b/configFiles/config_安心云告警双控.yaml @@ -7,12 +7,12 @@ ioConfig: groupId: axy_sk_alarm_inout alarmTopic: anxinyun_alarm #推送告警的主题 topics: - - szbb + - no out: es: address: - "http://10.8.30.160:30092" - index: "native_alarms" #推送告警索引 + index: native_sk_alarms #推送告警索引 auth: userName: post password: 123 diff --git a/consumers/AXY_SK/dataModel.go b/consumers/AXY_SK/dataModel.go index 726fefd..d484238 100644 --- a/consumers/AXY_SK/dataModel.go +++ b/consumers/AXY_SK/dataModel.go @@ -30,7 +30,6 @@ type StationAlarmGroup struct { type HistoryAlarm struct { SourceId string - Condition string Time time.Time AlarmLevel int } diff --git a/consumers/consumerAxySkAlarm.go b/consumers/consumerAxySkAlarm.go index 3c60bd7..e6abe51 100644 --- a/consumers/consumerAxySkAlarm.go +++ b/consumers/consumerAxySkAlarm.go @@ -60,8 +60,7 @@ func (the *consumerAxySkAlarm) Initial(cfg string) error { if err != nil { return err } - the.loadHistoryRecord() - + the.historyStationAlarmMap = make(map[string]AXY_SK.HistoryAlarm) err = the.monitorInitial() return err } @@ -97,10 +96,6 @@ func (the *consumerAxySkAlarm) infoComponentInitial() error { return nil } -func (the *consumerAxySkAlarm) loadHistoryRecord() { - the.historyStationAlarmMap = map[string]AXY_SK.HistoryAlarm{} -} - func (the *consumerAxySkAlarm) monitorInitial() error { the.monitor = &monitors.CommonMonitor{ MonitorHelper: &monitors.MonitorHelper{}, @@ -160,9 +155,12 @@ func (the *consumerAxySkAlarm) judgeSK() string { onceTriggerStationAlarmMap := map[string]string{} //同factorId_stationId 触发了高等级 不触发低等级(查询已排序) for i, trigger := range the.configAlarmTrigger { - log.Printf("开始处理--> 第[%d] id=%d trigger配置", i, trigger.Id) + log.Printf("开始处理--> 第[%d]配置 id=%d trigger", i, trigger.Id) + + the.updateEsAlarmTriggerHistory(trigger.StructId) + //配置的结构物的监测因素 去查询 - esSql := the.getEsAlarmTriggerQueryStr(trigger.StructId) + esSql := the.getEsAlarmTriggerPartQueryStr(trigger.StructId) alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql) if err != nil { log.Printf("es查询异常err -> %s", err.Error()) @@ -193,7 +191,7 @@ func (the *consumerAxySkAlarm) judgeSK() string { continue } isAlarm, level, detail, alarmTime := the.isRuleAlarm(trigger, stationAlarmInfo) - println(isAlarm, level, detail) + var alarmInfoTemplate *models.EsAlarm if stationAlarmInfo.Alarm3007 != nil { alarmInfoTemplate = stationAlarmInfo.Alarm3007 @@ -207,8 +205,8 @@ func (the *consumerAxySkAlarm) judgeSK() string { onceTriggerStationAlarmMap[sid] = conditionStr //判断历史有没有 if v, ok := the.historyStationAlarmMap[sid]; ok { - if v.AlarmLevel > level { //低等级告警过滤 - log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, v.AlarmLevel, level) + if v.AlarmLevel < level { //低等级告警过滤 + log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, level, v.AlarmLevel) continue } @@ -219,15 +217,13 @@ func (the *consumerAxySkAlarm) judgeSK() string { } } //纪录历史告警 - payload := the.skAlarmInfo(alarmInfoTemplate, level, detail, alarmTime) - the.historyStationAlarmMap[sid] = AXY_SK.HistoryAlarm{ SourceId: sid, - Condition: conditionStr, AlarmLevel: level, Time: alarmTime, } + payload := the.skAlarmInfo(alarmInfoTemplate, level, detail, alarmTime) the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) } else { payload := the.skAlarmElimination(alarmInfoTemplate, level, detail) @@ -360,10 +356,63 @@ func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationA return false, level, detail, dt } -func (the *consumerAxySkAlarm) getEsAlarmTriggerQueryStr(structId int) string { +func (the *consumerAxySkAlarm) updateEsAlarmTriggerHistory(structId int) { + esSql := the.getEsAlarmTriggerHistoryQueryStr(structId) + alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql) + if err != nil { + log.Printf("结构物[%d] 查询历史有效AlarmTrigger 异常=>%s", structId, err.Error()) + } + for _, alarm := range alarms { + the.historyStationAlarmMap[alarm.SourceId] = AXY_SK.HistoryAlarm{ + SourceId: alarm.SourceId, + Time: alarm.EndTime, + AlarmLevel: alarm.CurrentLevel, + } + } +} +func (the *consumerAxySkAlarm) getEsAlarmTriggerHistoryQueryStr(structId int) string { + + esQuery := fmt.Sprintf(` +{ + "size": 100, + "query": { + "bool": { + "must": [ + { + "term": { + "structure_id": { + "value": %d + } + } + }, + { + "term": { + "alarm_type_code": { + "value": "3077" + } + } + }, + { + "terms": { + "state": [ + 0, + 1, + 2 + ] + } + } + ] + } + } +}`, structId) + return esQuery +} + +func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStr(structId int) string { esQuery := fmt.Sprintf(` { + "size": 100, "query": { "bool": { "must": [