Browse Source

update 更新处理

dev
lucas 13 hours ago
parent
commit
7e43bf57f5
  1. 4
      configFiles/config_安心云告警双控.yaml
  2. 1
      consumers/AXY_SK/dataModel.go
  3. 79
      consumers/consumerAxySkAlarm.go

4
configFiles/config_安心云告警双控.yaml

@ -7,12 +7,12 @@ ioConfig:
groupId: axy_sk_alarm_inout groupId: axy_sk_alarm_inout
alarmTopic: anxinyun_alarm #推送告警的主题 alarmTopic: anxinyun_alarm #推送告警的主题
topics: topics:
- szbb - no
out: out:
es: es:
address: address:
- "http://10.8.30.160:30092" - "http://10.8.30.160:30092"
index: "native_alarms" #推送告警索引 index: native_sk_alarms #推送告警索引
auth: auth:
userName: post userName: post
password: 123 password: 123

1
consumers/AXY_SK/dataModel.go

@ -30,7 +30,6 @@ type StationAlarmGroup struct {
type HistoryAlarm struct { type HistoryAlarm struct {
SourceId string SourceId string
Condition string
Time time.Time Time time.Time
AlarmLevel int AlarmLevel int
} }

79
consumers/consumerAxySkAlarm.go

@ -60,8 +60,7 @@ func (the *consumerAxySkAlarm) Initial(cfg string) error {
if err != nil { if err != nil {
return err return err
} }
the.loadHistoryRecord() the.historyStationAlarmMap = make(map[string]AXY_SK.HistoryAlarm)
err = the.monitorInitial() err = the.monitorInitial()
return err return err
} }
@ -97,10 +96,6 @@ func (the *consumerAxySkAlarm) infoComponentInitial() error {
return nil return nil
} }
func (the *consumerAxySkAlarm) loadHistoryRecord() {
the.historyStationAlarmMap = map[string]AXY_SK.HistoryAlarm{}
}
func (the *consumerAxySkAlarm) monitorInitial() error { func (the *consumerAxySkAlarm) monitorInitial() error {
the.monitor = &monitors.CommonMonitor{ the.monitor = &monitors.CommonMonitor{
MonitorHelper: &monitors.MonitorHelper{}, MonitorHelper: &monitors.MonitorHelper{},
@ -160,9 +155,12 @@ func (the *consumerAxySkAlarm) judgeSK() string {
onceTriggerStationAlarmMap := map[string]string{} //同factorId_stationId 触发了高等级 不触发低等级(查询已排序) onceTriggerStationAlarmMap := map[string]string{} //同factorId_stationId 触发了高等级 不触发低等级(查询已排序)
for i, trigger := range the.configAlarmTrigger { for i, trigger := range the.configAlarmTrigger {
log.Printf("开始处理--> 第[%d] id=%d trigger配置", i, trigger.Id) log.Printf("开始处理--> 第[%d]配置 id=%d trigger", i, trigger.Id)
the.updateEsAlarmTriggerHistory(trigger.StructId)
//配置的结构物的监测因素 去查询 //配置的结构物的监测因素 去查询
esSql := the.getEsAlarmTriggerQueryStr(trigger.StructId) esSql := the.getEsAlarmTriggerPartQueryStr(trigger.StructId)
alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql) alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql)
if err != nil { if err != nil {
log.Printf("es查询异常err -> %s", err.Error()) log.Printf("es查询异常err -> %s", err.Error())
@ -193,7 +191,7 @@ func (the *consumerAxySkAlarm) judgeSK() string {
continue continue
} }
isAlarm, level, detail, alarmTime := the.isRuleAlarm(trigger, stationAlarmInfo) isAlarm, level, detail, alarmTime := the.isRuleAlarm(trigger, stationAlarmInfo)
println(isAlarm, level, detail)
var alarmInfoTemplate *models.EsAlarm var alarmInfoTemplate *models.EsAlarm
if stationAlarmInfo.Alarm3007 != nil { if stationAlarmInfo.Alarm3007 != nil {
alarmInfoTemplate = stationAlarmInfo.Alarm3007 alarmInfoTemplate = stationAlarmInfo.Alarm3007
@ -207,8 +205,8 @@ func (the *consumerAxySkAlarm) judgeSK() string {
onceTriggerStationAlarmMap[sid] = conditionStr onceTriggerStationAlarmMap[sid] = conditionStr
//判断历史有没有 //判断历史有没有
if v, ok := the.historyStationAlarmMap[sid]; ok { if v, ok := the.historyStationAlarmMap[sid]; ok {
if v.AlarmLevel > level { //低等级告警过滤 if v.AlarmLevel < level { //低等级告警过滤
log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, v.AlarmLevel, level) log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, level, v.AlarmLevel)
continue continue
} }
@ -219,15 +217,13 @@ func (the *consumerAxySkAlarm) judgeSK() string {
} }
} }
//纪录历史告警 //纪录历史告警
payload := the.skAlarmInfo(alarmInfoTemplate, level, detail, alarmTime)
the.historyStationAlarmMap[sid] = AXY_SK.HistoryAlarm{ the.historyStationAlarmMap[sid] = AXY_SK.HistoryAlarm{
SourceId: sid, SourceId: sid,
Condition: conditionStr,
AlarmLevel: level, AlarmLevel: level,
Time: alarmTime, Time: alarmTime,
} }
payload := the.skAlarmInfo(alarmInfoTemplate, level, detail, alarmTime)
the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload)
} else { } else {
payload := the.skAlarmElimination(alarmInfoTemplate, level, detail) payload := the.skAlarmElimination(alarmInfoTemplate, level, detail)
@ -360,10 +356,63 @@ func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationA
return false, level, detail, dt return false, level, detail, dt
} }
func (the *consumerAxySkAlarm) getEsAlarmTriggerQueryStr(structId int) string { func (the *consumerAxySkAlarm) updateEsAlarmTriggerHistory(structId int) {
esSql := the.getEsAlarmTriggerHistoryQueryStr(structId)
alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql)
if err != nil {
log.Printf("结构物[%d] 查询历史有效AlarmTrigger 异常=>%s", structId, err.Error())
}
for _, alarm := range alarms {
the.historyStationAlarmMap[alarm.SourceId] = AXY_SK.HistoryAlarm{
SourceId: alarm.SourceId,
Time: alarm.EndTime,
AlarmLevel: alarm.CurrentLevel,
}
}
}
func (the *consumerAxySkAlarm) getEsAlarmTriggerHistoryQueryStr(structId int) string {
esQuery := fmt.Sprintf(`
{
"size": 100,
"query": {
"bool": {
"must": [
{
"term": {
"structure_id": {
"value": %d
}
}
},
{
"term": {
"alarm_type_code": {
"value": "3077"
}
}
},
{
"terms": {
"state": [
0,
1,
2
]
}
}
]
}
}
}`, structId)
return esQuery
}
func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStr(structId int) string {
esQuery := fmt.Sprintf(` esQuery := fmt.Sprintf(`
{ {
"size": 100,
"query": { "query": {
"bool": { "bool": {
"must": [ "must": [

Loading…
Cancel
Save