You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
523 lines
14 KiB
523 lines
14 KiB
package consumers
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"goInOut/adaptors"
|
|
"goInOut/consumers/AXY_SK"
|
|
"goInOut/dbOperate"
|
|
"goInOut/dbOperate/_kafka"
|
|
"goInOut/models"
|
|
"goInOut/monitors"
|
|
"gopkg.in/yaml.v3"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type consumerAxySkAlarm struct {
|
|
//数据缓存管道
|
|
dataCache chan *models.EsTheme
|
|
alarmCache map[string]models.EsAlarm
|
|
//具体配置
|
|
Info AXY_SK.ConfigFile
|
|
InEs dbOperate.ESHelper
|
|
OutKafka _kafka.KafkaHelper
|
|
infoPg *dbOperate.DBHelper
|
|
sinkMap sync.Map
|
|
lock sync.Mutex
|
|
logTagId int
|
|
monitor *monitors.CommonMonitor
|
|
//数据库配置信息
|
|
stationAlarmTrigger []AXY_SK.StationAlarmTrigger
|
|
configAlarmTrigger []AXY_SK.AlarmTrigger
|
|
historyStationAlarmMap map[string]AXY_SK.HistoryAlarm
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) LoadConfigJson(cfgStr string) {
|
|
// 将 JSON 格式的数据解析到结构体中
|
|
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
|
|
if err != nil {
|
|
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) Initial(cfg string) error {
|
|
the.sinkMap = sync.Map{}
|
|
the.dataCache = make(chan *models.EsTheme, 1000)
|
|
|
|
the.LoadConfigJson(cfg)
|
|
err := the.inputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.outputInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = the.infoComponentInitial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
the.historyStationAlarmMap = make(map[string]AXY_SK.HistoryAlarm)
|
|
err = the.monitorInitial()
|
|
return err
|
|
}
|
|
func (the *consumerAxySkAlarm) inputInitial() error {
|
|
//数据入口
|
|
the.OutKafka = _kafka.KafkaHelper{
|
|
Brokers: the.Info.IoConfig.Out.Kafka.Brokers,
|
|
GroupId: the.Info.IoConfig.Out.Kafka.GroupId,
|
|
}
|
|
the.OutKafka.Initial()
|
|
for _, inTopic := range the.Info.IoConfig.Out.Kafka.Topics {
|
|
the.OutKafka.Subscribe(inTopic, the.onData)
|
|
}
|
|
|
|
the.OutKafka.Worker()
|
|
return nil
|
|
}
|
|
func (the *consumerAxySkAlarm) outputInitial() error {
|
|
//数据出口
|
|
the.InEs = *dbOperate.NewESHelper(
|
|
the.Info.IoConfig.In.Es.Address,
|
|
the.Info.IoConfig.In.Es.Auth.UserName,
|
|
the.Info.IoConfig.In.Es.Auth.Password,
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) infoComponentInitial() error {
|
|
//数据出口
|
|
pgConnStr := the.Info.QueryComponent.Pg.Connect
|
|
the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr)
|
|
return nil
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) monitorInitial() error {
|
|
the.monitor = &monitors.CommonMonitor{
|
|
MonitorHelper: &monitors.MonitorHelper{},
|
|
}
|
|
|
|
the.monitor.Start()
|
|
for taskName, cron := range the.Info.Monitor {
|
|
switch taskName {
|
|
case "cron":
|
|
the.monitor.RegisterTask(cron, the.updateTriggerConfig)
|
|
default:
|
|
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
|
|
}
|
|
}
|
|
//测试用
|
|
//the.updateTriggerConfig()
|
|
return nil
|
|
}
|
|
func (the *consumerAxySkAlarm) updateTriggerConfig() {
|
|
log.Printf("--> 定时 查询pg 更新 双控配置信息")
|
|
sql := `SELECT at.* FROM t_alarm_trigger as at order by at.struct_id, at.factor_id,at.alarm_level asc;`
|
|
err := the.infoPg.Query(&the.configAlarmTrigger, sql)
|
|
if err != nil {
|
|
log.Printf("查询数据库异常:err-> %s", err.Error())
|
|
return
|
|
}
|
|
for i, trigger := range the.configAlarmTrigger {
|
|
the.configAlarmTrigger[i].ConditionArray = trigger.ConditionRaw
|
|
}
|
|
log.Printf("当前共 %d条 启用配置", len(the.configAlarmTrigger))
|
|
|
|
//立即触发
|
|
the.judgeSK()
|
|
}
|
|
func (the *consumerAxySkAlarm) updateTriggerStationConfig() {
|
|
log.Printf("--> 定时 查询pg 更新 双控配置信息")
|
|
sql := `SELECT at.*,s.id as station_id,s.name as station_name
|
|
FROM t_alarm_trigger as at
|
|
left join t_sensor s
|
|
on at.struct_id=s.structure
|
|
and at.factor_id=s.factor;`
|
|
err := the.infoPg.Query(&the.stationAlarmTrigger, sql)
|
|
if err != nil {
|
|
log.Printf("查询数据库异常:err-> %s", err.Error())
|
|
return
|
|
}
|
|
for i, trigger := range the.stationAlarmTrigger {
|
|
the.stationAlarmTrigger[i].ConditionArray = trigger.ConditionRaw
|
|
}
|
|
log.Printf("当前共 %d条 启用配置", len(the.stationAlarmTrigger))
|
|
|
|
//立即触发
|
|
the.judgeSK()
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) judgeSK() string {
|
|
onceTriggerStationAlarmMap := map[string]string{} //同factorId_stationId 触发了高等级 不触发低等级(查询已排序)
|
|
|
|
for i, trigger := range the.configAlarmTrigger {
|
|
log.Printf("开始处理--> 第[%d]配置 id=%d trigger", i, trigger.Id)
|
|
|
|
the.updateEsAlarmTriggerHistory(trigger.StructId)
|
|
|
|
//配置的结构物的监测因素 去查询
|
|
esSql := the.getEsAlarmTriggerPartQueryStr(trigger.StructId)
|
|
alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql)
|
|
if err != nil {
|
|
log.Printf("es查询异常err -> %s", err.Error())
|
|
continue
|
|
}
|
|
stationAlarmMap := map[string]AXY_SK.StationAlarmGroup{}
|
|
for _, alarm := range alarms {
|
|
defaultStationAlarm := AXY_SK.StationAlarmGroup{}
|
|
if v, ok := stationAlarmMap[alarm.SourceId]; ok {
|
|
defaultStationAlarm = v
|
|
}
|
|
|
|
switch alarm.AlarmTypeCode {
|
|
case "3007":
|
|
defaultStationAlarm.Alarm3007 = &alarm
|
|
case "3008":
|
|
defaultStationAlarm.Alarm3008 = &alarm
|
|
}
|
|
|
|
stationAlarmMap[alarm.SourceId] = defaultStationAlarm
|
|
}
|
|
|
|
//判断是否满足告警
|
|
for sid, stationAlarmInfo := range stationAlarmMap {
|
|
log.Printf("判断测点[%s] 是否满足双控告警", sid)
|
|
if v, ok := onceTriggerStationAlarmMap[sid]; ok {
|
|
log.Printf("测点[%s]本次已经触发过[%s],不再重复触发", sid, v)
|
|
continue
|
|
}
|
|
isAlarm, level, detail, alarmTime := the.isRuleAlarm(trigger, stationAlarmInfo)
|
|
|
|
var alarmInfoTemplate *models.EsAlarm
|
|
if stationAlarmInfo.Alarm3007 != nil {
|
|
alarmInfoTemplate = stationAlarmInfo.Alarm3007
|
|
}
|
|
if stationAlarmInfo.Alarm3008 != nil {
|
|
alarmInfoTemplate = stationAlarmInfo.Alarm3008
|
|
}
|
|
if isAlarm && alarmInfoTemplate != nil {
|
|
conditionStr := fmt.Sprintf("st:%d,f:%d,level:%d",
|
|
trigger.StructId, trigger.FactorId, level)
|
|
onceTriggerStationAlarmMap[sid] = conditionStr
|
|
//判断历史有没有
|
|
if v, ok := the.historyStationAlarmMap[sid]; ok {
|
|
if v.AlarmLevel < level { //低等级告警过滤
|
|
log.Printf("测点[%s]本次触发 level=[%d] > 历史有效等级%d,不再重复触发", sid, level, v.AlarmLevel)
|
|
continue
|
|
}
|
|
|
|
if !v.Time.After(alarmTime) {
|
|
log.Printf("测点[%s]本次触发时刻[%s] 对比历史有效时刻[%s] 非新", sid,
|
|
v.Time.Format("2006-01-02 15:04:05"), alarmTime.Format("2006-01-02 15:04:05"))
|
|
continue
|
|
}
|
|
}
|
|
//纪录历史告警
|
|
the.historyStationAlarmMap[sid] = AXY_SK.HistoryAlarm{
|
|
SourceId: sid,
|
|
AlarmLevel: level,
|
|
Time: alarmTime,
|
|
}
|
|
|
|
payload := the.skAlarmInfo(alarmInfoTemplate, level, detail, alarmTime)
|
|
the.OutKafka.Publish(the.Info.IoConfig.Out.Kafka.AlarmTopic, payload)
|
|
} else {
|
|
payload := the.skAlarmElimination(alarmInfoTemplate, level, detail)
|
|
the.OutKafka.Publish(the.Info.IoConfig.Out.Kafka.AlarmTopic, payload)
|
|
delete(the.historyStationAlarmMap, sid)
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) skAlarmInfo(alarmInfoTemplate *models.EsAlarm, level int, detail string, alarmTime time.Time) []byte {
|
|
alarmMsg := models.KafkaAlarm{
|
|
MessageMode: "AlarmGeneration",
|
|
StructureId: alarmInfoTemplate.StructureId,
|
|
StructureName: "",
|
|
SourceId: alarmInfoTemplate.SourceId,
|
|
SourceName: alarmInfoTemplate.SourceName,
|
|
AlarmTypeCode: "3077",
|
|
AlarmCode: fmt.Sprintf("3077000%d", level),
|
|
Content: detail,
|
|
Time: alarmTime.Format("2006-01-02T15:04:05+0800"),
|
|
SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点
|
|
Sponsor: "goInOut_axySkAlarm",
|
|
Extras: nil,
|
|
SubDevices: nil,
|
|
}
|
|
payload, _ := json.Marshal(alarmMsg)
|
|
return payload
|
|
}
|
|
func (the *consumerAxySkAlarm) skAlarmElimination(alarmInfoTemplate *models.EsAlarm, level int, detail string) []byte {
|
|
alarmMsg := models.KafkaAlarm{
|
|
MessageMode: "AlarmAutoElimination",
|
|
StructureId: alarmInfoTemplate.StructureId,
|
|
StructureName: "",
|
|
SourceId: alarmInfoTemplate.SourceId,
|
|
SourceName: alarmInfoTemplate.SourceName,
|
|
AlarmTypeCode: "3077",
|
|
AlarmCode: fmt.Sprintf("3077000%d", level),
|
|
Content: "",
|
|
Time: time.Now().Format("2006-01-02T15:04:05+0800"),
|
|
SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点
|
|
Sponsor: "goInOut_axySkAlarm",
|
|
Extras: nil,
|
|
SubDevices: nil,
|
|
}
|
|
payload, _ := json.Marshal(alarmMsg)
|
|
return payload
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationAlarm AXY_SK.StationAlarmGroup) (bool, int, string, time.Time) {
|
|
level := 0
|
|
detail := ""
|
|
dt := time.Time{}
|
|
//3007和3008都要有
|
|
if trigger.Rule == 0 {
|
|
isAlarm := true
|
|
for _, conditionInt := range trigger.ConditionArray {
|
|
switch conditionInt {
|
|
case 0:
|
|
if stationAlarm.Alarm3007 == nil || stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel {
|
|
isAlarm = false
|
|
}
|
|
if isAlarm {
|
|
|
|
if len(detail) > 0 {
|
|
detail += "且 "
|
|
}
|
|
if stationAlarm.Alarm3007.EndTime.After(dt) {
|
|
dt = stationAlarm.Alarm3007.EndTime
|
|
}
|
|
detail += stationAlarm.Alarm3007.Detail
|
|
|
|
}
|
|
|
|
case 1:
|
|
if stationAlarm.Alarm3008 == nil || stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel {
|
|
isAlarm = false
|
|
}
|
|
if isAlarm {
|
|
|
|
if len(detail) > 0 {
|
|
detail += "且 "
|
|
}
|
|
detail += stationAlarm.Alarm3008.Detail
|
|
if stationAlarm.Alarm3008.EndTime.After(dt) {
|
|
dt = stationAlarm.Alarm3008.EndTime
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
if isAlarm {
|
|
level = trigger.AlarmLevel
|
|
}
|
|
return isAlarm, level, detail, dt
|
|
}
|
|
|
|
//3007和3008 任何一个
|
|
if trigger.Rule == 1 {
|
|
isAlarm := false
|
|
|
|
for _, conditionInt := range trigger.ConditionArray {
|
|
switch conditionInt {
|
|
case 0:
|
|
if stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel {
|
|
isAlarm = true
|
|
detail += stationAlarm.Alarm3007.Detail
|
|
if stationAlarm.Alarm3007.EndTime.After(dt) {
|
|
dt = stationAlarm.Alarm3007.EndTime
|
|
}
|
|
}
|
|
case 1:
|
|
if stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel {
|
|
isAlarm = true
|
|
detail += stationAlarm.Alarm3008.Detail
|
|
if stationAlarm.Alarm3008.EndTime.After(dt) {
|
|
dt = stationAlarm.Alarm3008.EndTime
|
|
}
|
|
}
|
|
}
|
|
|
|
if isAlarm {
|
|
level = trigger.AlarmLevel
|
|
break
|
|
}
|
|
}
|
|
return isAlarm, level, detail, dt
|
|
}
|
|
return false, level, detail, dt
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) updateEsAlarmTriggerHistory(structId int) {
|
|
esSql := the.getEsAlarmTriggerHistoryQueryStr(structId)
|
|
alarms, err := the.InEs.SearchAlarm(the.Info.IoConfig.In.Es.Index, esSql)
|
|
if err != nil {
|
|
log.Printf("结构物[%d] 查询历史有效AlarmTrigger 异常=>%s", structId, err.Error())
|
|
}
|
|
for _, alarm := range alarms {
|
|
the.historyStationAlarmMap[alarm.SourceId] = AXY_SK.HistoryAlarm{
|
|
SourceId: alarm.SourceId,
|
|
Time: alarm.EndTime,
|
|
AlarmLevel: alarm.CurrentLevel,
|
|
}
|
|
}
|
|
}
|
|
func (the *consumerAxySkAlarm) getEsAlarmTriggerHistoryQueryStr(structId int) string {
|
|
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"size": 100,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"term": {
|
|
"structure_id": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"term": {
|
|
"alarm_type_code": {
|
|
"value": "3077"
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"terms": {
|
|
"state": [
|
|
0,
|
|
1,
|
|
2
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}`, structId)
|
|
return esQuery
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) getEsAlarmTriggerPartQueryStr(structId int) string {
|
|
|
|
esQuery := fmt.Sprintf(`
|
|
{
|
|
"size": 100,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{
|
|
"term": {
|
|
"structure_id": {
|
|
"value": %d
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"term": {
|
|
"isTriggerPart": {
|
|
"value": true
|
|
}
|
|
}
|
|
},
|
|
{
|
|
"terms": {
|
|
"state": [
|
|
0,
|
|
1,
|
|
2
|
|
]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
}`, structId)
|
|
return esQuery
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) sinkTask() {
|
|
intervalSec := the.Info.IoConfig.In.Es.Interval
|
|
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
<-ticker.C
|
|
the.toSink()
|
|
}
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) toSink() {
|
|
var themes []models.EsTheme
|
|
the.lock.Lock()
|
|
defer the.lock.Unlock()
|
|
the.sinkMap.Range(func(key, value any) bool {
|
|
if v, ok := value.(*models.EsTheme); ok {
|
|
themes = append(themes, *v)
|
|
//零时打日志用
|
|
if v.Sensor == the.logTagId {
|
|
bs, _ := json.Marshal(v)
|
|
log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs))
|
|
}
|
|
return ok
|
|
} else {
|
|
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key)
|
|
}
|
|
return true
|
|
})
|
|
if len(themes) > 0 {
|
|
index := the.Info.IoConfig.In.Es.Index
|
|
log.Printf("写入es [%s] %d条", index, len(themes))
|
|
the.InEs.BulkWriteThemes2Es(index, themes)
|
|
the.sinkMap.Clear()
|
|
}
|
|
}
|
|
|
|
func (the *consumerAxySkAlarm) Work() {
|
|
log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId)
|
|
go the.sinkTask()
|
|
go func() {
|
|
for {
|
|
pushEsTheme := <-the.dataCache
|
|
|
|
if pushEsTheme.Sensor == the.logTagId {
|
|
bs, _ := json.Marshal(pushEsTheme)
|
|
log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs))
|
|
}
|
|
|
|
//有效数据存入缓存
|
|
the.lock.Lock()
|
|
the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme)
|
|
the.lock.Unlock()
|
|
}
|
|
|
|
}()
|
|
}
|
|
func (the *consumerAxySkAlarm) onData(topic string, msg string) bool {
|
|
|
|
adaptor := adaptors.Adaptor_Savoir_LastTheme{}
|
|
|
|
needPush := adaptor.Transform(topic, msg)
|
|
|
|
if needPush != nil && needPush.Data != nil {
|
|
the.dataCache <- needPush
|
|
} else {
|
|
s, _ := json.Marshal(needPush)
|
|
if needPush != nil {
|
|
if needPush.Sensor == the.logTagId {
|
|
log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s)
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|