diff --git a/consumers/AXY_SK/dataModel.go b/consumers/AXY_SK/dataModel.go index d484238..24ed8c0 100644 --- a/consumers/AXY_SK/dataModel.go +++ b/consumers/AXY_SK/dataModel.go @@ -15,6 +15,7 @@ type AlarmTrigger struct { ConditionRaw pq.Int32Array `json:"condition" db:"condition"` ConditionArray []int32 Rule int `json:"rule" db:"rule"` + StationIds []int } type StationAlarmTrigger struct { diff --git a/consumers/consumerAxySkAlarm.go b/consumers/consumerAxySkAlarm.go index 033760a..b8a8ba9 100644 --- a/consumers/consumerAxySkAlarm.go +++ b/consumers/consumerAxySkAlarm.go @@ -101,13 +101,13 @@ func (the *consumerAxySkAlarm) monitorInitial() error { for taskName, cron := range the.Info.Monitor { switch taskName { case "cron": - the.monitor.RegisterTask(cron, the.updateTriggerConfig) + the.monitor.RegisterTask(cron, the.updateTriggerStationConfig) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } //测试用 - //the.updateTriggerConfig() + //the.updateTriggerStationConfig() return nil } func (the *consumerAxySkAlarm) updateTriggerConfig() { @@ -138,10 +138,29 @@ func (the *consumerAxySkAlarm) updateTriggerStationConfig() { log.Printf("查询数据库异常:err-> %s", err.Error()) return } + previousTag := "" for i, trigger := range the.stationAlarmTrigger { the.stationAlarmTrigger[i].ConditionArray = trigger.ConditionRaw + triggerTag := fmt.Sprintf("%d_%d_%d", trigger.StructId, trigger.FactorId, trigger.AlarmLevel) + if previousTag != triggerTag { + previousTag = triggerTag + the.configAlarmTrigger = append(the.configAlarmTrigger, AXY_SK.AlarmTrigger{ + Id: trigger.Id, + StructId: trigger.StructId, + FactorId: trigger.FactorId, + AlarmLevel: trigger.AlarmLevel, + ConditionRaw: trigger.ConditionRaw, + ConditionArray: trigger.ConditionRaw, + Rule: trigger.Rule, + StationIds: []int{trigger.StationId}, + }) + } else { + the.configAlarmTrigger[len(the.configAlarmTrigger)-1].StationIds = + append(the.configAlarmTrigger[len(the.configAlarmTrigger)-1].StationIds, trigger.StationId) + } + } - log.Printf("当前共 %d条 启用配置", len(the.stationAlarmTrigger)) + log.Printf("当前共 %d条 启用配置", len(the.configAlarmTrigger)) //立即触发 the.judgeSK() @@ -152,16 +171,19 @@ func (the *consumerAxySkAlarm) judgeSK() string { for i, trigger := range the.configAlarmTrigger { log.Printf("开始处理--> 第[%d]配置 id=%d trigger", i, trigger.Id) - + if len(trigger.StationIds) == 0 { + continue + } the.updateEsAlarmTriggerHistory(trigger.StructId) //配置的结构物的监测因素 去查询 - esSql := the.getEsAlarmTriggerPartQueryStr(trigger.StructId) + esSql := the.getEsAlarmTriggerPartQueryStrByStationIds(trigger.StructId, trigger.StationIds) alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql) if err != nil { log.Printf("es查询异常err -> %s", err.Error()) continue } + stationAlarmMap := map[string]AXY_SK.StationAlarmGroup{} for _, alarm := range alarms { defaultStationAlarm := AXY_SK.StationAlarmGroup{} @@ -200,15 +222,19 @@ func (the *consumerAxySkAlarm) judgeSK() string { trigger.StructId, trigger.FactorId, level) onceTriggerStationAlarmMap[sid] = conditionStr //判断历史有没有 - if v, ok := the.historyStationAlarmMap[sid]; ok { - if v.AlarmLevel < level { //低等级告警过滤 - log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, level, v.AlarmLevel) + if hv, ok := the.historyStationAlarmMap[sid]; ok { + if hv.AlarmLevel < level { //低等级告警过滤 + log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, level, hv.AlarmLevel) continue } - if !v.Time.After(alarmTime) { - log.Printf("测点[%s]本次触发时刻[%s] 对比历史有效时刻[%s] 非新", sid, - v.Time.Format("2006-01-02 15:04:05"), alarmTime.Format("2006-01-02 15:04:05")) + if hv.Time.After(alarmTime) { + log.Printf("测点[%s]本次触发时刻[%s]level=%d 对比历史有效时刻[%s]level=%d 非新", sid, + alarmTime.Format("2006-01-02 15:04:05"), + level, + hv.Time.Format("2006-01-02 15:04:05"), + hv.AlarmLevel, + ) continue } } @@ -330,7 +356,7 @@ func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationA for _, conditionInt := range trigger.ConditionArray { switch conditionInt { case 0: - if stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel { + if stationAlarm.Alarm3007.CurrentLevel <= trigger.AlarmLevel { isAlarm = true detail += stationAlarm.Alarm3007.Detail if stationAlarm.Alarm3007.EndTime.After(dt) { @@ -338,7 +364,7 @@ func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationA } } case 1: - if stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel { + if stationAlarm.Alarm3008.CurrentLevel <= trigger.AlarmLevel { isAlarm = true detail += stationAlarm.Alarm3008.Detail if stationAlarm.Alarm3008.EndTime.After(dt) { @@ -446,6 +472,49 @@ func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStr(structId int) strin }`, structId) return esQuery } +func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStrByStationIds(structId int, stationIds []int) string { + s, _ := json.Marshal(stationIds) + sourceIds := string(s) + esQuery := fmt.Sprintf(` +{ + "size": 100, + "query": { + "bool": { + "must": [ + { + "term": { + "structure_id": { + "value": %d + } + } + }, + { + "term": { + "isTriggerPart": { + "value": true + } + } + }, + { + "terms": { + "state": [ + 0, + 1, + 2 + ] + } + }, + { + "terms": { + "source_id": %s + } + } + ] + } + } +}`, structId, sourceIds) + return esQuery +} func (the *consumerAxySkAlarm) sinkTask() { intervalSec := the.Info.IoConfig.In.Es.Interval