From b6c565bf81d3f1310ce8ce0ce89e1e12c8c21425 Mon Sep 17 00:00:00 2001 From: lucas Date: Thu, 28 Aug 2025 18:28:24 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=20=E5=8F=8C?= =?UTF-8?q?=E6=8E=A7=E5=91=8A=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configFiles/config_安心云告警双控.yaml | 27 ++ ...测_知物云_轻量化特征数据.yaml | 118 ----- consumers/AXY_SK/config.go | 31 ++ consumers/AXY_SK/dataModel.go | 28 ++ consumers/consumerAxySkAlarm.go | 421 ++++++++++++++++++ consumers/consumerManage.go | 3 + dbOperate/elasticsearchHelper.go | 33 ++ models/esAlarm.go | 46 ++ 8 files changed, 589 insertions(+), 118 deletions(-) create mode 100644 configFiles/config_安心云告警双控.yaml delete mode 100644 configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml create mode 100644 consumers/AXY_SK/config.go create mode 100644 consumers/AXY_SK/dataModel.go create mode 100644 consumers/consumerAxySkAlarm.go create mode 100644 models/esAlarm.go diff --git a/configFiles/config_安心云告警双控.yaml b/configFiles/config_安心云告警双控.yaml new file mode 100644 index 0000000..a3eb178 --- /dev/null +++ b/configFiles/config_安心云告警双控.yaml @@ -0,0 +1,27 @@ +consumer: consumerAxySkAlarm +ioConfig: + in: + kafka: + brokers: + - 10.8.30.160:30992 + groupId: axy_sk_alarm_inout + alarmTopic: anxinyun_alarm #推送告警的主题 + topics: + - szbb + out: + es: + address: + - "http://10.8.30.160:30092" + index: "native_alarms" #推送告警索引 + auth: + userName: post + password: 123 + interval: 30 #多久写一次es(秒) + +monitor: + cron: 24 * * * * + +queryComponent: + postgres: + connect: "host=10.8.30.160 port=30432 user=postgres password=example dbname=jilin sslmode=disable" + diff --git a/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml b/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml deleted file mode 100644 index 8bca831..0000000 --- a/configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml +++ /dev/null @@ -1,118 +0,0 @@ -consumer: consumerZWYHBJCAS -ioConfig: - in: - http: - url: https://esproxy.anxinyun.cn/savoir_themes/_search - kafka: - brokers: - - 10.8.30.160:30992 - groupId: anxinHebeiGL_01 - topics: - - savoir_alarm - out: - mqtt: - host: 123.249.81.52 - port: 8883 - userName: bs1321 - password: 9$TyND#ec$aZFfcl - clientId: zhangjiakouJTYS - topics: - - t/province/1321 -monitor: - #振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报 - cron10min: 6/10 * * * * - #普通类型 特征数据 - cron1hour: 45 0/1 * * * - #推送摄像机状态 1小时 - camera1hour: 0 0/1 * * * - #普推送健康度 24小时,每天8点执行 - health24hour: 0 8 * * * -info: - urlIndex: https://hbjcas.hebitt.com/pms/api/v2/bsi/ - secretKey: CrAd7tkSDXKt7dyk6ueY4OeWRnSHJhUa - rc4key: CrAd7tkSDXKt7dyk6ueY4OeWRnSHJhUa3Al3 -systemID: 1321 -queryComponent: - redis: - address: 10.8.30.160:30379 -#结构物id对应 -structInfo: - 8926: 130830 - 8928: 130831 - 8929: 130832 - 8930: 130833 - 8921: 130834 - 8922: 130835 - 8923: 130836 - 8931: 130837 - 8932: 130838 - 8936: 136137 - 8940: 136139 - 8939: 136141 - 8938: 136142 - 8935: 136144 - 8967: 130812 - 8966: 130934 - 8968: 138198 -#点位id对应信息 -pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28 -#定义http接口用于对外调用 -httpServer: 0.0.0.0:8425 -#需要上报健康度的(桥梁|隧道|边坡)唯一编码 -codeInfo: - - 130812 - - 130830 - - 130831 - - 130832 - - 130833 - - 130834 - - 130835 - - 130836 - - 130837 - - 130838 - - 130934 - - 136137 - - 136138 - - 136139 - - 136140 - - 136141 - - 136142 - - 136143 - - 136144 - - 136145 - - 138198 -cameraInfo: - - 13083099901 - - 13083099902 - - 13083199903 - - 13083199904 - - 13083299905 - - 13083299906 - - 13083399907 - - 13083399908 - - 13083499909 - - 13083599910 - - 13083699911 - - 13083699912 - - 13083799913 - - 13083799914 - - 13083899915 - - 13083899916 - - 13613799917 - - 13613799918 - - 13613899919 - - 13613899920 - - 13613999921 - - 13613999922 - - 13614099923 - - 13614099924 - - 13614199925 - - 13614199926 - - 13614299927 - - 13614299928 - - 13614399929 - - 13614399930 - - 13614499931 - - 13614499932 - - 13614599933 - - 13614599934 \ No newline at end of file diff --git a/consumers/AXY_SK/config.go b/consumers/AXY_SK/config.go new file mode 100644 index 0000000..81d2f51 --- /dev/null +++ b/consumers/AXY_SK/config.go @@ -0,0 +1,31 @@ +package AXY_SK + +import "goInOut/config" + +type ConfigFile struct { + IoConfig ioConfig `yaml:"ioConfig"` + Monitor map[string]string `yaml:"monitor"` + QueryComponent queryComponent `yaml:"queryComponent"` +} +type ioConfig struct { + In in `json:"in"` + Out out `json:"out"` +} +type in struct { + Kafka config.KafkaConfig `json:"kafka"` +} + +type out struct { + Es config.EsConfig `json:"es"` +} + +type Info struct { + //Common map[string]string `json:"common"` + QueryComponent queryComponent `json:"queryComponent"` +} + +type queryComponent struct { + Pg struct { + Connect string `yaml:"connect"` + } `yaml:"postgres"` +} diff --git a/consumers/AXY_SK/dataModel.go b/consumers/AXY_SK/dataModel.go new file mode 100644 index 0000000..e755b77 --- /dev/null +++ b/consumers/AXY_SK/dataModel.go @@ -0,0 +1,28 @@ +package AXY_SK + +import ( + "github.com/lib/pq" + _ "github.com/lib/pq" + "goInOut/models" +) + +type AlarmTrigger struct { + Id int `json:"id" db:"id"` + StructId int `json:"struct_id" db:"struct_id"` + FactorId int `json:"factor_id" db:"factor_id"` + AlarmLevel int `json:"alarm_level" db:"alarm_level"` + ConditionRaw pq.Int32Array `json:"condition" db:"condition"` + ConditionArray []int32 + Rule int `json:"rule" db:"rule"` +} + +type StationAlarmTrigger struct { + AlarmTrigger + StationName string `json:"station_name" db:"station_name"` + StationId int `json:"station_id" db:"station_id"` +} + +type StationAlarmGroup struct { + Alarm3007 *models.EsAlarm + Alarm3008 *models.EsAlarm +} diff --git a/consumers/consumerAxySkAlarm.go b/consumers/consumerAxySkAlarm.go new file mode 100644 index 0000000..6ad6521 --- /dev/null +++ b/consumers/consumerAxySkAlarm.go @@ -0,0 +1,421 @@ +package consumers + +import ( + "encoding/json" + "fmt" + "goInOut/adaptors" + "goInOut/consumers/AXY_SK" + "goInOut/dbOperate" + "goInOut/dbOperate/_kafka" + "goInOut/models" + "goInOut/monitors" + "gopkg.in/yaml.v3" + "log" + "sync" + "time" +) + +type consumerAxySkAlarm struct { + //数据缓存管道 + dataCache chan *models.EsTheme + alarmCache map[string]models.EsAlarm + //具体配置 + Info AXY_SK.ConfigFile + InKafka _kafka.KafkaHelper + OutEs dbOperate.ESHelper + infoPg *dbOperate.DBHelper + sinkMap sync.Map + lock sync.Mutex + logTagId int + monitor *monitors.CommonMonitor + //数据库配置信息 + stationAlarmTrigger []AXY_SK.StationAlarmTrigger + configAlarmTrigger []AXY_SK.AlarmTrigger +} + +func (the *consumerAxySkAlarm) LoadConfigJson(cfgStr string) { + // 将 JSON 格式的数据解析到结构体中 + err := yaml.Unmarshal([]byte(cfgStr), &the.Info) + if err != nil { + log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error()) + panic(err) + } +} + +func (the *consumerAxySkAlarm) Initial(cfg string) error { + the.sinkMap = sync.Map{} + the.dataCache = make(chan *models.EsTheme, 1000) + + the.LoadConfigJson(cfg) + err := the.inputInitial() + if err != nil { + return err + } + err = the.outputInitial() + if err != nil { + return err + } + err = the.infoComponentInitial() + if err != nil { + return err + } + + err = the.monitorInitial() + return err +} +func (the *consumerAxySkAlarm) inputInitial() error { + //数据入口 + the.InKafka = _kafka.KafkaHelper{ + Brokers: the.Info.IoConfig.In.Kafka.Brokers, + GroupId: the.Info.IoConfig.In.Kafka.GroupId, + } + the.InKafka.Initial() + for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics { + the.InKafka.Subscribe(inTopic, the.onData) + } + + the.InKafka.Worker() + return nil +} +func (the *consumerAxySkAlarm) outputInitial() error { + //数据出口 + the.OutEs = *dbOperate.NewESHelper( + the.Info.IoConfig.Out.Es.Address, + the.Info.IoConfig.Out.Es.Auth.UserName, + the.Info.IoConfig.Out.Es.Auth.Password, + ) + + return nil +} + +func (the *consumerAxySkAlarm) infoComponentInitial() error { + //数据出口 + pgConnStr := the.Info.QueryComponent.Pg.Connect + the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) + return nil +} + +func (the *consumerAxySkAlarm) monitorInitial() error { + the.monitor = &monitors.CommonMonitor{ + MonitorHelper: &monitors.MonitorHelper{}, + } + + the.monitor.Start() + for taskName, cron := range the.Info.Monitor { + switch taskName { + case "cron": + the.monitor.RegisterTask(cron, the.updateTriggerConfig) + default: + log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) + } + } + return nil +} +func (the *consumerAxySkAlarm) updateTriggerConfig() { + log.Printf("--> 定时 查询pg 更新 双控配置信息") + sql := `SELECT at.* FROM t_alarm_trigger as at;` + err := the.infoPg.Query(&the.configAlarmTrigger, sql) + if err != nil { + log.Printf("查询数据库异常:err-> %s", err.Error()) + return + } + for i, trigger := range the.configAlarmTrigger { + the.configAlarmTrigger[i].ConditionArray = trigger.ConditionRaw + } + log.Printf("当前共 %d条 启用配置", len(the.configAlarmTrigger)) + + //立即触发 + the.judgeSK() +} +func (the *consumerAxySkAlarm) updateTriggerStationConfig() { + log.Printf("--> 定时 查询pg 更新 双控配置信息") + sql := `SELECT at.*,s.id as station_id,s.name as station_name + FROM t_alarm_trigger as at + left join t_sensor s + on at.struct_id=s.structure + and at.factor_id=s.factor;` + err := the.infoPg.Query(&the.stationAlarmTrigger, sql) + if err != nil { + log.Printf("查询数据库异常:err-> %s", err.Error()) + return + } + for i, trigger := range the.stationAlarmTrigger { + the.stationAlarmTrigger[i].ConditionArray = trigger.ConditionRaw + } + log.Printf("当前共 %d条 启用配置", len(the.stationAlarmTrigger)) + + //立即触发 + the.judgeSK() +} + +func (the *consumerAxySkAlarm) judgeSK() string { + for _, trigger := range the.configAlarmTrigger { + //配置的结构物的监测因素 去查询 + esSql := the.getEsAlarmTriggerQueryStr(trigger.StructId) + alarms, err := the.OutEs.SearchAlarm("native_sk_alarms", esSql) + if err != nil { + log.Printf("es查询异常err -> %s", err.Error()) + continue + } + stationAlarmMap := map[string]AXY_SK.StationAlarmGroup{} + for _, alarm := range alarms { + defaultStationAlarm := AXY_SK.StationAlarmGroup{} + if v, ok := stationAlarmMap[alarm.SourceId]; ok { + defaultStationAlarm = v + } + + switch alarm.AlarmTypeCode { + case "3007": + defaultStationAlarm.Alarm3007 = &alarm + case "3008": + defaultStationAlarm.Alarm3008 = &alarm + } + + stationAlarmMap[alarm.SourceId] = defaultStationAlarm + } + + //判断是否满足告警 + for sid, stationAlarmInfo := range stationAlarmMap { + log.Printf("判断测点[%s] 是否满足双控告警", sid) + isAlarm, level, detail := the.isRuleAlarm(trigger, stationAlarmInfo) + println(isAlarm, level, detail) + var alarmInfoTemplate *models.EsAlarm + if stationAlarmInfo.Alarm3007 != nil { + alarmInfoTemplate = stationAlarmInfo.Alarm3007 + } + if stationAlarmInfo.Alarm3008 != nil { + alarmInfoTemplate = stationAlarmInfo.Alarm3008 + } + if isAlarm && alarmInfoTemplate != nil { + payload := the.skAlarmInfo(alarmInfoTemplate, level, detail) + the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) + } else { + payload := the.skAlarmElimination(alarmInfoTemplate, level, detail) + the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload) + } + } + } + return "" +} + +func (the *consumerAxySkAlarm) skAlarmInfo(alarmInfoTemplate *models.EsAlarm, level int, detail string) []byte { + alarmMsg := models.KafkaAlarm{ + MessageMode: "AlarmGeneration", + StructureId: alarmInfoTemplate.StructureId, + StructureName: "", + SourceId: alarmInfoTemplate.SourceId, + SourceName: alarmInfoTemplate.SourceName, + AlarmTypeCode: "3077", + AlarmCode: fmt.Sprintf("3077000%d", level), + Content: detail, + Time: time.Now().Format("2006-01-02T15:04:05+0800"), + SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 + Sponsor: "goInOut_axySkAlarm", + Extras: nil, + SubDevices: nil, + } + payload, _ := json.Marshal(alarmMsg) + return payload +} +func (the *consumerAxySkAlarm) skAlarmElimination(alarmInfoTemplate *models.EsAlarm, level int, detail string) []byte { + alarmMsg := models.KafkaAlarm{ + MessageMode: "AlarmAutoElimination", + StructureId: alarmInfoTemplate.StructureId, + StructureName: "", + SourceId: alarmInfoTemplate.SourceId, + SourceName: alarmInfoTemplate.SourceName, + AlarmTypeCode: "3077", + AlarmCode: fmt.Sprintf("3077000%d", level), + Content: "", + Time: time.Now().Format("2006-01-02T15:04:05+0800"), + SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点 + Sponsor: "goInOut_axySkAlarm", + Extras: nil, + SubDevices: nil, + } + payload, _ := json.Marshal(alarmMsg) + return payload +} + +func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationAlarm AXY_SK.StationAlarmGroup) (bool, int, string) { + level := 0 + detail := "" + //3007和3008都要有 + if trigger.Rule == 0 { + isAlarm := true + for _, conditionInt := range trigger.ConditionArray { + switch conditionInt { + case 0: + if stationAlarm.Alarm3007 == nil || stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel { + isAlarm = false + } + if isAlarm { + + if len(detail) > 0 { + detail += "且 " + } + detail += stationAlarm.Alarm3007.Detail + + } + + case 1: + if stationAlarm.Alarm3008 == nil || stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel { + isAlarm = false + } + if isAlarm { + + if len(detail) > 0 { + detail += "且 " + } + detail += stationAlarm.Alarm3008.Detail + + } + } + + } + if isAlarm { + level = trigger.AlarmLevel + } + return isAlarm, level, detail + } + + //3007和3008 任何一个 + if trigger.Rule == 1 { + isAlarm := false + + for _, conditionInt := range trigger.ConditionArray { + switch conditionInt { + case 0: + if stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel { + isAlarm = true + } + case 1: + if stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel { + isAlarm = true + } + } + + if isAlarm { + level = trigger.AlarmLevel + break + } + } + return isAlarm, level, detail + } + return false, level, detail +} + +func (the *consumerAxySkAlarm) getEsAlarmTriggerQueryStr(structId int) string { + + esQuery := fmt.Sprintf(` +{ + "query": { + "bool": { + "must": [ + { + "term": { + "structure_id": { + "value": %d + } + } + }, + { + "term": { + "isTriggerPart": { + "value": true + } + } + }, + { + "terms": { + "state": [ + 0, + 1, + 2 + ] + } + } + ] + } + } +}`, structId) + return esQuery +} + +func (the *consumerAxySkAlarm) sinkTask() { + intervalSec := the.Info.IoConfig.Out.Es.Interval + ticker := time.NewTicker(time.Duration(intervalSec) * time.Second) + defer ticker.Stop() + for { + <-ticker.C + the.toSink() + } +} + +func (the *consumerAxySkAlarm) toSink() { + var themes []models.EsTheme + the.lock.Lock() + defer the.lock.Unlock() + the.sinkMap.Range(func(key, value any) bool { + if v, ok := value.(*models.EsTheme); ok { + themes = append(themes, *v) + //零时打日志用 + if v.Sensor == the.logTagId { + bs, _ := json.Marshal(v) + log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs)) + } + return ok + } else { + log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key) + } + return true + }) + if len(themes) > 0 { + index := the.Info.IoConfig.Out.Es.Index + log.Printf("写入es [%s] %d条", index, len(themes)) + the.OutEs.BulkWriteThemes2Es(index, themes) + the.sinkMap.Clear() + } +} + +func (the *consumerAxySkAlarm) Work() { + log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId) + go the.sinkTask() + go func() { + for { + pushEsTheme := <-the.dataCache + + if pushEsTheme.Sensor == the.logTagId { + bs, _ := json.Marshal(pushEsTheme) + log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs)) + } + + //有效数据存入缓存 + the.lock.Lock() + the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme) + the.lock.Unlock() + } + + }() +} +func (the *consumerAxySkAlarm) onData(topic string, msg string) bool { + //if len(msg) > 80 { + // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) + //} + adaptor := adaptors.Adaptor_Savoir_LastTheme{} + + needPush := adaptor.Transform(topic, msg) + + if needPush != nil && needPush.Data != nil { + the.dataCache <- needPush + } else { + s, _ := json.Marshal(needPush) + if needPush != nil { + if needPush.Sensor == the.logTagId { + log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) + } + } + } + + return true +} diff --git a/consumers/consumerManage.go b/consumers/consumerManage.go index bbc7beb..e8fb9df 100644 --- a/consumers/consumerManage.go +++ b/consumers/consumerManage.go @@ -53,6 +53,9 @@ func GetConsumer(name string) (consumer IConsumer) { case "consumerSavoirTheme": consumer = new(consumerSavoirTheme) + case "consumerAxySkAlarm": + consumer = new(consumerAxySkAlarm) + default: consumer = nil } diff --git a/dbOperate/elasticsearchHelper.go b/dbOperate/elasticsearchHelper.go index 08d9921..cdce690 100644 --- a/dbOperate/elasticsearchHelper.go +++ b/dbOperate/elasticsearchHelper.go @@ -221,6 +221,39 @@ func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]mod return sensors, err } +func (the *ESHelper) SearchAlarm(index string, queryBody string) ([]models.EsAlarm, error) { + var sensors []models.EsAlarm + alarmsResp, err := the.searchAlarm(index, queryBody) + + var sensor models.EsAlarm + for _, hitAlarm := range alarmsResp.Hits.Hits { + sensor = hitAlarm.Source + sensors = append(sensors, sensor) + } + + return sensors, err +} + +func (the *ESHelper) searchAlarm(index, reqBody string) (models.EsAlarmResp, error) { + body := &bytes.Buffer{} + body.WriteString(reqBody) + response, err := the.esClient.Search( + the.esClient.Search.WithIndex(index), + the.esClient.Search.WithBody(body), + ) + defer response.Body.Close() + if err != nil { + //return nil, err + } + log.Println(response.Status()) + r := models.EsAlarmResp{} + // Deserialize the response into a map. + if err := json.NewDecoder(response.Body).Decode(&r); err != nil { + log.Fatalf("Error parsing the response body: %s", err) + } + return r, err +} + func (the *ESHelper) BulkWrite(index, reqBody string) { body := &bytes.Buffer{} diff --git a/models/esAlarm.go b/models/esAlarm.go new file mode 100644 index 0000000..e2d49ab --- /dev/null +++ b/models/esAlarm.go @@ -0,0 +1,46 @@ +package models + +import "time" + +type EsAlarmResp struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + Hits []HitAlarm `json:"hits"` + } `json:"hits"` +} + +type HitAlarm struct { + Index string `json:"_index"` + Type string `json:"_type"` + Id string `json:"_id"` + Score float64 `json:"_score"` + Source EsAlarm `json:"_source"` +} + +type EsAlarm struct { + StartTime time.Time `json:"start_time"` + AlarmTypeCode string `json:"alarm_type_code"` + State int `json:"state"` //0 新,1次数,2等级提升,3自动恢复, + AlarmCount int `json:"alarm_count"` + AlarmTypeId int `json:"alarm_type_id"` + EndTime time.Time `json:"end_time"` + StructureId int `json:"structure_id"` + SourceTypeId int `json:"source_type_id"` + AlarmContent string `json:"alarm_content"` + SourceName string `json:"source_name"` + SourceId string `json:"source_id"` + InitialLevel int `json:"initial_level"` + CurrentLevel int `json:"current_level"` + Detail string `json:"detail"` + AlarmCode string `json:"alarm_code"` + IsTriggerPart bool `json:"isTriggerPart"` +}