|
|
@ -101,13 +101,13 @@ func (the *consumerAxySkAlarm) monitorInitial() error { |
|
|
|
for taskName, cron := range the.Info.Monitor { |
|
|
|
switch taskName { |
|
|
|
case "cron": |
|
|
|
the.monitor.RegisterTask(cron, the.updateTriggerConfig) |
|
|
|
the.monitor.RegisterTask(cron, the.updateTriggerStationConfig) |
|
|
|
default: |
|
|
|
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) |
|
|
|
} |
|
|
|
} |
|
|
|
//测试用
|
|
|
|
//the.updateTriggerConfig()
|
|
|
|
//the.updateTriggerStationConfig()
|
|
|
|
return nil |
|
|
|
} |
|
|
|
func (the *consumerAxySkAlarm) updateTriggerConfig() { |
|
|
@ -138,10 +138,29 @@ func (the *consumerAxySkAlarm) updateTriggerStationConfig() { |
|
|
|
log.Printf("查询数据库异常:err-> %s", err.Error()) |
|
|
|
return |
|
|
|
} |
|
|
|
previousTag := "" |
|
|
|
for i, trigger := range the.stationAlarmTrigger { |
|
|
|
the.stationAlarmTrigger[i].ConditionArray = trigger.ConditionRaw |
|
|
|
triggerTag := fmt.Sprintf("%d_%d_%d", trigger.StructId, trigger.FactorId, trigger.AlarmLevel) |
|
|
|
if previousTag != triggerTag { |
|
|
|
previousTag = triggerTag |
|
|
|
the.configAlarmTrigger = append(the.configAlarmTrigger, AXY_SK.AlarmTrigger{ |
|
|
|
Id: trigger.Id, |
|
|
|
StructId: trigger.StructId, |
|
|
|
FactorId: trigger.FactorId, |
|
|
|
AlarmLevel: trigger.AlarmLevel, |
|
|
|
ConditionRaw: trigger.ConditionRaw, |
|
|
|
ConditionArray: trigger.ConditionRaw, |
|
|
|
Rule: trigger.Rule, |
|
|
|
StationIds: []int{trigger.StationId}, |
|
|
|
}) |
|
|
|
} else { |
|
|
|
the.configAlarmTrigger[len(the.configAlarmTrigger)-1].StationIds = |
|
|
|
append(the.configAlarmTrigger[len(the.configAlarmTrigger)-1].StationIds, trigger.StationId) |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
log.Printf("当前共 %d条 启用配置", len(the.stationAlarmTrigger)) |
|
|
|
log.Printf("当前共 %d条 启用配置", len(the.configAlarmTrigger)) |
|
|
|
|
|
|
|
//立即触发
|
|
|
|
the.judgeSK() |
|
|
@ -152,16 +171,19 @@ func (the *consumerAxySkAlarm) judgeSK() string { |
|
|
|
|
|
|
|
for i, trigger := range the.configAlarmTrigger { |
|
|
|
log.Printf("开始处理--> 第[%d]配置 id=%d trigger", i, trigger.Id) |
|
|
|
|
|
|
|
if len(trigger.StationIds) == 0 { |
|
|
|
continue |
|
|
|
} |
|
|
|
the.updateEsAlarmTriggerHistory(trigger.StructId) |
|
|
|
|
|
|
|
//配置的结构物的监测因素 去查询
|
|
|
|
esSql := the.getEsAlarmTriggerPartQueryStr(trigger.StructId) |
|
|
|
esSql := the.getEsAlarmTriggerPartQueryStrByStationIds(trigger.StructId, trigger.StationIds) |
|
|
|
alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql) |
|
|
|
if err != nil { |
|
|
|
log.Printf("es查询异常err -> %s", err.Error()) |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
stationAlarmMap := map[string]AXY_SK.StationAlarmGroup{} |
|
|
|
for _, alarm := range alarms { |
|
|
|
defaultStationAlarm := AXY_SK.StationAlarmGroup{} |
|
|
@ -200,15 +222,19 @@ func (the *consumerAxySkAlarm) judgeSK() string { |
|
|
|
trigger.StructId, trigger.FactorId, level) |
|
|
|
onceTriggerStationAlarmMap[sid] = conditionStr |
|
|
|
//判断历史有没有
|
|
|
|
if v, ok := the.historyStationAlarmMap[sid]; ok { |
|
|
|
if v.AlarmLevel < level { //低等级告警过滤
|
|
|
|
log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, level, v.AlarmLevel) |
|
|
|
if hv, ok := the.historyStationAlarmMap[sid]; ok { |
|
|
|
if hv.AlarmLevel < level { //低等级告警过滤
|
|
|
|
log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, level, hv.AlarmLevel) |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
if !v.Time.After(alarmTime) { |
|
|
|
log.Printf("测点[%s]本次触发时刻[%s] 对比历史有效时刻[%s] 非新", sid, |
|
|
|
v.Time.Format("2006-01-02 15:04:05"), alarmTime.Format("2006-01-02 15:04:05")) |
|
|
|
if hv.Time.After(alarmTime) { |
|
|
|
log.Printf("测点[%s]本次触发时刻[%s]level=%d 对比历史有效时刻[%s]level=%d 非新", sid, |
|
|
|
alarmTime.Format("2006-01-02 15:04:05"), |
|
|
|
level, |
|
|
|
hv.Time.Format("2006-01-02 15:04:05"), |
|
|
|
hv.AlarmLevel, |
|
|
|
) |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
@ -330,7 +356,7 @@ func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationA |
|
|
|
for _, conditionInt := range trigger.ConditionArray { |
|
|
|
switch conditionInt { |
|
|
|
case 0: |
|
|
|
if stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel { |
|
|
|
if stationAlarm.Alarm3007.CurrentLevel <= trigger.AlarmLevel { |
|
|
|
isAlarm = true |
|
|
|
detail += stationAlarm.Alarm3007.Detail |
|
|
|
if stationAlarm.Alarm3007.EndTime.After(dt) { |
|
|
@ -338,7 +364,7 @@ func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationA |
|
|
|
} |
|
|
|
} |
|
|
|
case 1: |
|
|
|
if stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel { |
|
|
|
if stationAlarm.Alarm3008.CurrentLevel <= trigger.AlarmLevel { |
|
|
|
isAlarm = true |
|
|
|
detail += stationAlarm.Alarm3008.Detail |
|
|
|
if stationAlarm.Alarm3008.EndTime.After(dt) { |
|
|
@ -446,6 +472,49 @@ func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStr(structId int) strin |
|
|
|
}`, structId) |
|
|
|
return esQuery |
|
|
|
} |
|
|
|
func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStrByStationIds(structId int, stationIds []int) string { |
|
|
|
s, _ := json.Marshal(stationIds) |
|
|
|
sourceIds := string(s) |
|
|
|
esQuery := fmt.Sprintf(` |
|
|
|
{ |
|
|
|
"size": 100, |
|
|
|
"query": { |
|
|
|
"bool": { |
|
|
|
"must": [ |
|
|
|
{ |
|
|
|
"term": { |
|
|
|
"structure_id": { |
|
|
|
"value": %d |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
{ |
|
|
|
"term": { |
|
|
|
"isTriggerPart": { |
|
|
|
"value": true |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
{ |
|
|
|
"terms": { |
|
|
|
"state": [ |
|
|
|
0, |
|
|
|
1, |
|
|
|
2 |
|
|
|
] |
|
|
|
} |
|
|
|
}, |
|
|
|
{ |
|
|
|
"terms": { |
|
|
|
"source_id": %s |
|
|
|
} |
|
|
|
} |
|
|
|
] |
|
|
|
} |
|
|
|
} |
|
|
|
}`, structId, sourceIds) |
|
|
|
return esQuery |
|
|
|
} |
|
|
|
|
|
|
|
func (the *consumerAxySkAlarm) sinkTask() { |
|
|
|
intervalSec := the.Info.IoConfig.In.Es.Interval |
|
|
|