diff --git a/configFiles/config_安心云告警双控.yaml b/configFiles/config_安心云告警双控.yaml index f3e2b4f..86ca6d4 100644 --- a/configFiles/config_安心云告警双控.yaml +++ b/configFiles/config_安心云告警双控.yaml @@ -1,6 +1,15 @@ consumer: consumerAxySkAlarm ioConfig: in: + es: + address: + - "http://10.8.30.160:30092" + index: native_sk_alarms #推送告警索引 + auth: + userName: post + password: 123 + interval: 30 #多久写一次es(秒) + out: kafka: brokers: - 10.8.30.160:30992 @@ -8,15 +17,6 @@ ioConfig: alarmTopic: anxinyun_alarm #推送告警的主题 topics: - no - out: - es: - address: - - "http://10.8.30.160:30092" - index: native_sk_alarms #推送告警索引 - auth: - userName: post - password: 123 - interval: 30 #多久写一次es(秒) monitor: cron: 24 * * * * diff --git a/consumers/AXY_SK/config.go b/consumers/AXY_SK/config.go index 81d2f51..e8829ee 100644 --- a/consumers/AXY_SK/config.go +++ b/consumers/AXY_SK/config.go @@ -12,11 +12,11 @@ type ioConfig struct { Out out `json:"out"` } type in struct { - Kafka config.KafkaConfig `json:"kafka"` + Es config.EsConfig `json:"es"` } type out struct { - Es config.EsConfig `json:"es"` + Kafka config.KafkaConfig `json:"kafka"` } type Info struct { diff --git a/consumers/consumerAxySkAlarm.go b/consumers/consumerAxySkAlarm.go index e6abe51..22e06d1 100644 --- a/consumers/consumerAxySkAlarm.go +++ b/consumers/consumerAxySkAlarm.go @@ -21,8 +21,8 @@ type consumerAxySkAlarm struct { alarmCache map[string]models.EsAlarm //具体配置 Info AXY_SK.ConfigFile - InKafka _kafka.KafkaHelper - OutEs dbOperate.ESHelper + InEs dbOperate.ESHelper + OutKafka _kafka.KafkaHelper infoPg *dbOperate.DBHelper sinkMap sync.Map lock sync.Mutex @@ -66,24 +66,24 @@ func (the *consumerAxySkAlarm) Initial(cfg string) error { } func (the *consumerAxySkAlarm) inputInitial() error { //数据入口 - the.InKafka = _kafka.KafkaHelper{ - Brokers: the.Info.IoConfig.In.Kafka.Brokers, - GroupId: the.Info.IoConfig.In.Kafka.GroupId, + the.OutKafka = _kafka.KafkaHelper{ + Brokers: the.Info.IoConfig.Out.Kafka.Brokers, + GroupId: the.Info.IoConfig.Out.Kafka.GroupId, } - the.InKafka.Initial() - for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { - the.InKafka.Subscribe(inTopic, the.onData) + the.OutKafka.Initial() + for _, inTopic := range the.Info.IoConfig.Out.Kafka.Topics { + the.OutKafka.Subscribe(inTopic, the.onData) } - the.InKafka.Worker() + the.OutKafka.Worker() return nil } func (the *consumerAxySkAlarm) outputInitial() error { //数据出口 - the.OutEs = *dbOperate.NewESHelper( - the.Info.IoConfig.Out.Es.Address, - the.Info.IoConfig.Out.Es.Auth.UserName, - the.Info.IoConfig.Out.Es.Auth.Password, + the.InEs = *dbOperate.NewESHelper( + the.Info.IoConfig.In.Es.Address, + the.Info.IoConfig.In.Es.Auth.UserName, + the.Info.IoConfig.In.Es.Auth.Password, ) return nil @@ -105,13 +105,13 @@ func (the *consumerAxySkAlarm) monitorInitial() error { for taskName, cron := range the.Info.Monitor { switch taskName { case "cron": - //the.monitor.RegisterTask(cron, the.updateTriggerConfig) + the.monitor.RegisterTask(cron, the.updateTriggerConfig) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } } //测试用 - the.updateTriggerConfig() + //the.updateTriggerConfig() return nil } func (the *consumerAxySkAlarm) updateTriggerConfig() { @@ -161,7 +161,7 @@ func (the *consumerAxySkAlarm) judgeSK() string { //配置的结构物的监测因素 去查询 esSql := the.getEsAlarmTriggerPartQueryStr(trigger.StructId) - alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql) + alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql) if err != nil { log.Printf("es查询异常err -> %s", err.Error()) continue @@ -224,10 +224,10 @@ func (the *consumerAxySkAlarm) judgeSK() string { } payload := the.skAlarmInfo(alarmInfoTemplate, level, detail, alarmTime) - the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) + the.OutKafka.Publish(the.Info.IoConfig.Out.Kafka.AlarmTopic, payload) } else { payload := the.skAlarmElimination(alarmInfoTemplate, level, detail) - the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) + the.OutKafka.Publish(the.Info.IoConfig.Out.Kafka.AlarmTopic, payload) delete(the.historyStationAlarmMap, sid) } } @@ -358,7 +358,7 @@ func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationA func (the *consumerAxySkAlarm) updateEsAlarmTriggerHistory(structId int) { esSql := the.getEsAlarmTriggerHistoryQueryStr(structId) - alarms, err := the.OutEs.SearchAlarm(the.Info.IoConfig.Out.Es.Index, esSql) + alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql) if err != nil { log.Printf("结构物[%d] 查询历史有效AlarmTrigger 异常=>%s", structId, err.Error()) } @@ -447,7 +447,7 @@ func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStr(structId int) strin } func (the *consumerAxySkAlarm) sinkTask() { - intervalSec := the.Info.IoConfig.Out.Es.Interval + intervalSec := the.Info.IoConfig.In.Es.Interval ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) defer ticker.Stop() for { @@ -475,9 +475,9 @@ func (the *consumerAxySkAlarm) toSink() { return true }) if len(themes) > 0 { - index := the.Info.IoConfig.Out.Es.Index + index := the.Info.IoConfig.In.Es.Index log.Printf("写入es [%s] %d条", index, len(themes)) - the.OutEs.BulkWriteThemes2Es(index, themes) + the.InEs.BulkWriteThemes2Es(index, themes) the.sinkMap.Clear() } } @@ -503,9 +503,7 @@ func (the *consumerAxySkAlarm) Work() { }() } func (the *consumerAxySkAlarm) onData(topic string, msg string) bool { - //if len(msg) > 80 { - // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) - //} + adaptor := adaptors.Adaptor_Savoir_LastTheme{} needPush := adaptor.Transform(topic, msg)