Browse Source

update 添加 双控告警

dev
lucas 1 day ago
parent
commit
b6c565bf81
  1. 27
      configFiles/config_安心云告警双控.yaml
  2. 118
      configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml
  3. 31
      consumers/AXY_SK/config.go
  4. 28
      consumers/AXY_SK/dataModel.go
  5. 421
      consumers/consumerAxySkAlarm.go
  6. 3
      consumers/consumerManage.go
  7. 33
      dbOperate/elasticsearchHelper.go
  8. 46
      models/esAlarm.go

27
configFiles/config_安心云告警双控.yaml

@ -0,0 +1,27 @@
consumer: consumerAxySkAlarm
ioConfig:
in:
kafka:
brokers:
- 10.8.30.160:30992
groupId: axy_sk_alarm_inout
alarmTopic: anxinyun_alarm #推送告警的主题
topics:
- szbb
out:
es:
address:
- "http://10.8.30.160:30092"
index: "native_alarms" #推送告警索引
auth:
userName: post
password: 123
interval: 30 #多久写一次es(秒)
monitor:
cron: 24 * * * *
queryComponent:
postgres:
connect: "host=10.8.30.160 port=30432 user=postgres password=example dbname=jilin sslmode=disable"

118
configFiles/config_河北省公路基础设施监测_知物云_轻量化特征数据.yaml

@ -1,118 +0,0 @@
consumer: consumerZWYHBJCAS
ioConfig:
in:
http:
url: https://esproxy.anxinyun.cn/savoir_themes/_search
kafka:
brokers:
- 10.8.30.160:30992
groupId: anxinHebeiGL_01
topics:
- savoir_alarm
out:
mqtt:
host: 123.249.81.52
port: 8883
userName: bs1321
password: 9$TyND#ec$aZFfcl
clientId: zhangjiakouJTYS
topics:
- t/province/1321
monitor:
#振动是触发式,数据迟缓 cron10min也改成1小时一次 最多上报6条,不进行10min实时上报
cron10min: 6/10 * * * *
#普通类型 特征数据
cron1hour: 45 0/1 * * *
#推送摄像机状态 1小时
camera1hour: 0 0/1 * * *
#普推送健康度 24小时,每天8点执行
health24hour: 0 8 * * *
info:
urlIndex: https://hbjcas.hebitt.com/pms/api/v2/bsi/
secretKey: CrAd7tkSDXKt7dyk6ueY4OeWRnSHJhUa
rc4key: CrAd7tkSDXKt7dyk6ueY4OeWRnSHJhUa3Al3
systemID: 1321
queryComponent:
redis:
address: 10.8.30.160:30379
#结构物id对应
structInfo:
8926: 130830
8928: 130831
8929: 130832
8930: 130833
8921: 130834
8922: 130835
8923: 130836
8931: 130837
8932: 130838
8936: 136137
8940: 136139
8939: 136141
8938: 136142
8935: 136144
8967: 130812
8966: 130934
8968: 138198
#点位id对应信息
pointInfo: #测点类型支持 桥墩倾斜 15 裂缝18 支座位移20 桥面振动28
#定义http接口用于对外调用
httpServer: 0.0.0.0:8425
#需要上报健康度的(桥梁|隧道|边坡)唯一编码
codeInfo:
- 130812
- 130830
- 130831
- 130832
- 130833
- 130834
- 130835
- 130836
- 130837
- 130838
- 130934
- 136137
- 136138
- 136139
- 136140
- 136141
- 136142
- 136143
- 136144
- 136145
- 138198
cameraInfo:
- 13083099901
- 13083099902
- 13083199903
- 13083199904
- 13083299905
- 13083299906
- 13083399907
- 13083399908
- 13083499909
- 13083599910
- 13083699911
- 13083699912
- 13083799913
- 13083799914
- 13083899915
- 13083899916
- 13613799917
- 13613799918
- 13613899919
- 13613899920
- 13613999921
- 13613999922
- 13614099923
- 13614099924
- 13614199925
- 13614199926
- 13614299927
- 13614299928
- 13614399929
- 13614399930
- 13614499931
- 13614499932
- 13614599933
- 13614599934

31
consumers/AXY_SK/config.go

@ -0,0 +1,31 @@
package AXY_SK
import "goInOut/config"
type ConfigFile struct {
IoConfig ioConfig `yaml:"ioConfig"`
Monitor map[string]string `yaml:"monitor"`
QueryComponent queryComponent `yaml:"queryComponent"`
}
type ioConfig struct {
In in `json:"in"`
Out out `json:"out"`
}
type in struct {
Kafka config.KafkaConfig `json:"kafka"`
}
type out struct {
Es config.EsConfig `json:"es"`
}
type Info struct {
//Common map[string]string `json:"common"`
QueryComponent queryComponent `json:"queryComponent"`
}
type queryComponent struct {
Pg struct {
Connect string `yaml:"connect"`
} `yaml:"postgres"`
}

28
consumers/AXY_SK/dataModel.go

@ -0,0 +1,28 @@
package AXY_SK
import (
"github.com/lib/pq"
_ "github.com/lib/pq"
"goInOut/models"
)
type AlarmTrigger struct {
Id int `json:"id" db:"id"`
StructId int `json:"struct_id" db:"struct_id"`
FactorId int `json:"factor_id" db:"factor_id"`
AlarmLevel int `json:"alarm_level" db:"alarm_level"`
ConditionRaw pq.Int32Array `json:"condition" db:"condition"`
ConditionArray []int32
Rule int `json:"rule" db:"rule"`
}
type StationAlarmTrigger struct {
AlarmTrigger
StationName string `json:"station_name" db:"station_name"`
StationId int `json:"station_id" db:"station_id"`
}
type StationAlarmGroup struct {
Alarm3007 *models.EsAlarm
Alarm3008 *models.EsAlarm
}

421
consumers/consumerAxySkAlarm.go

@ -0,0 +1,421 @@
package consumers
import (
"encoding/json"
"fmt"
"goInOut/adaptors"
"goInOut/consumers/AXY_SK"
"goInOut/dbOperate"
"goInOut/dbOperate/_kafka"
"goInOut/models"
"goInOut/monitors"
"gopkg.in/yaml.v3"
"log"
"sync"
"time"
)
type consumerAxySkAlarm struct {
//数据缓存管道
dataCache chan *models.EsTheme
alarmCache map[string]models.EsAlarm
//具体配置
Info AXY_SK.ConfigFile
InKafka _kafka.KafkaHelper
OutEs dbOperate.ESHelper
infoPg *dbOperate.DBHelper
sinkMap sync.Map
lock sync.Mutex
logTagId int
monitor *monitors.CommonMonitor
//数据库配置信息
stationAlarmTrigger []AXY_SK.StationAlarmTrigger
configAlarmTrigger []AXY_SK.AlarmTrigger
}
func (the *consumerAxySkAlarm) LoadConfigJson(cfgStr string) {
// 将 JSON 格式的数据解析到结构体中
err := yaml.Unmarshal([]byte(cfgStr), &the.Info)
if err != nil {
log.Printf("读取配置文件[%s]异常 err=%v", cfgStr, err.Error())
panic(err)
}
}
func (the *consumerAxySkAlarm) Initial(cfg string) error {
the.sinkMap = sync.Map{}
the.dataCache = make(chan *models.EsTheme, 1000)
the.LoadConfigJson(cfg)
err := the.inputInitial()
if err != nil {
return err
}
err = the.outputInitial()
if err != nil {
return err
}
err = the.infoComponentInitial()
if err != nil {
return err
}
err = the.monitorInitial()
return err
}
func (the *consumerAxySkAlarm) inputInitial() error {
//数据入口
the.InKafka = _kafka.KafkaHelper{
Brokers: the.Info.IoConfig.In.Kafka.Brokers,
GroupId: the.Info.IoConfig.In.Kafka.GroupId,
}
the.InKafka.Initial()
for _, inTopic := range the.Info.IoConfig.In.Kafka.Topics {
the.InKafka.Subscribe(inTopic, the.onData)
}
the.InKafka.Worker()
return nil
}
func (the *consumerAxySkAlarm) outputInitial() error {
//数据出口
the.OutEs = *dbOperate.NewESHelper(
the.Info.IoConfig.Out.Es.Address,
the.Info.IoConfig.Out.Es.Auth.UserName,
the.Info.IoConfig.Out.Es.Auth.Password,
)
return nil
}
func (the *consumerAxySkAlarm) infoComponentInitial() error {
//数据出口
pgConnStr := the.Info.QueryComponent.Pg.Connect
the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr)
return nil
}
func (the *consumerAxySkAlarm) monitorInitial() error {
the.monitor = &monitors.CommonMonitor{
MonitorHelper: &monitors.MonitorHelper{},
}
the.monitor.Start()
for taskName, cron := range the.Info.Monitor {
switch taskName {
case "cron":
the.monitor.RegisterTask(cron, the.updateTriggerConfig)
default:
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
}
}
return nil
}
func (the *consumerAxySkAlarm) updateTriggerConfig() {
log.Printf("--> 定时 查询pg 更新 双控配置信息")
sql := `SELECT at.* FROM t_alarm_trigger as at;`
err := the.infoPg.Query(&the.configAlarmTrigger, sql)
if err != nil {
log.Printf("查询数据库异常:err-> %s", err.Error())
return
}
for i, trigger := range the.configAlarmTrigger {
the.configAlarmTrigger[i].ConditionArray = trigger.ConditionRaw
}
log.Printf("当前共 %d条 启用配置", len(the.configAlarmTrigger))
//立即触发
the.judgeSK()
}
func (the *consumerAxySkAlarm) updateTriggerStationConfig() {
log.Printf("--> 定时 查询pg 更新 双控配置信息")
sql := `SELECT at.*,s.id as station_id,s.name as station_name
FROM t_alarm_trigger as at
left join t_sensor s
on at.struct_id=s.structure
and at.factor_id=s.factor;`
err := the.infoPg.Query(&the.stationAlarmTrigger, sql)
if err != nil {
log.Printf("查询数据库异常:err-> %s", err.Error())
return
}
for i, trigger := range the.stationAlarmTrigger {
the.stationAlarmTrigger[i].ConditionArray = trigger.ConditionRaw
}
log.Printf("当前共 %d条 启用配置", len(the.stationAlarmTrigger))
//立即触发
the.judgeSK()
}
func (the *consumerAxySkAlarm) judgeSK() string {
for _, trigger := range the.configAlarmTrigger {
//配置的结构物的监测因素 去查询
esSql := the.getEsAlarmTriggerQueryStr(trigger.StructId)
alarms, err := the.OutEs.SearchAlarm("native_sk_alarms", esSql)
if err != nil {
log.Printf("es查询异常err -> %s", err.Error())
continue
}
stationAlarmMap := map[string]AXY_SK.StationAlarmGroup{}
for _, alarm := range alarms {
defaultStationAlarm := AXY_SK.StationAlarmGroup{}
if v, ok := stationAlarmMap[alarm.SourceId]; ok {
defaultStationAlarm = v
}
switch alarm.AlarmTypeCode {
case "3007":
defaultStationAlarm.Alarm3007 = &alarm
case "3008":
defaultStationAlarm.Alarm3008 = &alarm
}
stationAlarmMap[alarm.SourceId] = defaultStationAlarm
}
//判断是否满足告警
for sid, stationAlarmInfo := range stationAlarmMap {
log.Printf("判断测点[%s] 是否满足双控告警", sid)
isAlarm, level, detail := the.isRuleAlarm(trigger, stationAlarmInfo)
println(isAlarm, level, detail)
var alarmInfoTemplate *models.EsAlarm
if stationAlarmInfo.Alarm3007 != nil {
alarmInfoTemplate = stationAlarmInfo.Alarm3007
}
if stationAlarmInfo.Alarm3008 != nil {
alarmInfoTemplate = stationAlarmInfo.Alarm3008
}
if isAlarm && alarmInfoTemplate != nil {
payload := the.skAlarmInfo(alarmInfoTemplate, level, detail)
the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload)
} else {
payload := the.skAlarmElimination(alarmInfoTemplate, level, detail)
the.InKafka.Publish(the.Info.IoConfig.In.Kafka.AlarmTopic, payload)
}
}
}
return ""
}
func (the *consumerAxySkAlarm) skAlarmInfo(alarmInfoTemplate *models.EsAlarm, level int, detail string) []byte {
alarmMsg := models.KafkaAlarm{
MessageMode: "AlarmGeneration",
StructureId: alarmInfoTemplate.StructureId,
StructureName: "",
SourceId: alarmInfoTemplate.SourceId,
SourceName: alarmInfoTemplate.SourceName,
AlarmTypeCode: "3077",
AlarmCode: fmt.Sprintf("3077000%d", level),
Content: detail,
Time: time.Now().Format("2006-01-02T15:04:05+0800"),
SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点
Sponsor: "goInOut_axySkAlarm",
Extras: nil,
SubDevices: nil,
}
payload, _ := json.Marshal(alarmMsg)
return payload
}
func (the *consumerAxySkAlarm) skAlarmElimination(alarmInfoTemplate *models.EsAlarm, level int, detail string) []byte {
alarmMsg := models.KafkaAlarm{
MessageMode: "AlarmAutoElimination",
StructureId: alarmInfoTemplate.StructureId,
StructureName: "",
SourceId: alarmInfoTemplate.SourceId,
SourceName: alarmInfoTemplate.SourceName,
AlarmTypeCode: "3077",
AlarmCode: fmt.Sprintf("3077000%d", level),
Content: "",
Time: time.Now().Format("2006-01-02T15:04:05+0800"),
SourceTypeId: 2, // 0:DTU, 1:传感器, 2:测点
Sponsor: "goInOut_axySkAlarm",
Extras: nil,
SubDevices: nil,
}
payload, _ := json.Marshal(alarmMsg)
return payload
}
func (the *consumerAxySkAlarm) isRuleAlarm(trigger AXY_SK.AlarmTrigger, stationAlarm AXY_SK.StationAlarmGroup) (bool, int, string) {
level := 0
detail := ""
//3007和3008都要有
if trigger.Rule == 0 {
isAlarm := true
for _, conditionInt := range trigger.ConditionArray {
switch conditionInt {
case 0:
if stationAlarm.Alarm3007 == nil || stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel {
isAlarm = false
}
if isAlarm {
if len(detail) > 0 {
detail += "且 "
}
detail += stationAlarm.Alarm3007.Detail
}
case 1:
if stationAlarm.Alarm3008 == nil || stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel {
isAlarm = false
}
if isAlarm {
if len(detail) > 0 {
detail += "且 "
}
detail += stationAlarm.Alarm3008.Detail
}
}
}
if isAlarm {
level = trigger.AlarmLevel
}
return isAlarm, level, detail
}
//3007和3008 任何一个
if trigger.Rule == 1 {
isAlarm := false
for _, conditionInt := range trigger.ConditionArray {
switch conditionInt {
case 0:
if stationAlarm.Alarm3007.CurrentLevel > trigger.AlarmLevel {
isAlarm = true
}
case 1:
if stationAlarm.Alarm3008.CurrentLevel > trigger.AlarmLevel {
isAlarm = true
}
}
if isAlarm {
level = trigger.AlarmLevel
break
}
}
return isAlarm, level, detail
}
return false, level, detail
}
func (the *consumerAxySkAlarm) getEsAlarmTriggerQueryStr(structId int) string {
esQuery := fmt.Sprintf(`
{
"query": {
"bool": {
"must": [
{
"term": {
"structure_id": {
"value": %d
}
}
},
{
"term": {
"isTriggerPart": {
"value": true
}
}
},
{
"terms": {
"state": [
0,
1,
2
]
}
}
]
}
}
}`, structId)
return esQuery
}
func (the *consumerAxySkAlarm) sinkTask() {
intervalSec := the.Info.IoConfig.Out.Es.Interval
ticker := time.NewTicker(time.Duration(intervalSec) * time.Second)
defer ticker.Stop()
for {
<-ticker.C
the.toSink()
}
}
func (the *consumerAxySkAlarm) toSink() {
var themes []models.EsTheme
the.lock.Lock()
defer the.lock.Unlock()
the.sinkMap.Range(func(key, value any) bool {
if v, ok := value.(*models.EsTheme); ok {
themes = append(themes, *v)
//零时打日志用
if v.Sensor == the.logTagId {
bs, _ := json.Marshal(v)
log.Printf("toSink -> Range 标记测点数据 [%d] %s ", the.logTagId, string(bs))
}
return ok
} else {
log.Printf("!!! toSink -> Range 类型转换异常 [%v]", key)
}
return true
})
if len(themes) > 0 {
index := the.Info.IoConfig.Out.Es.Index
log.Printf("写入es [%s] %d条", index, len(themes))
the.OutEs.BulkWriteThemes2Es(index, themes)
the.sinkMap.Clear()
}
}
func (the *consumerAxySkAlarm) Work() {
log.Printf("监控 指定设备 logTagId=[%d]", the.logTagId)
go the.sinkTask()
go func() {
for {
pushEsTheme := <-the.dataCache
if pushEsTheme.Sensor == the.logTagId {
bs, _ := json.Marshal(pushEsTheme)
log.Printf("存储 标记测点数据 [%d] %s ", the.logTagId, string(bs))
}
//有效数据存入缓存
the.lock.Lock()
the.sinkMap.Store(pushEsTheme.Sensor, pushEsTheme)
the.lock.Unlock()
}
}()
}
func (the *consumerAxySkAlarm) onData(topic string, msg string) bool {
//if len(msg) > 80 {
// log.Printf("recv:[%s]:%s ...", topic, msg[:80])
//}
adaptor := adaptors.Adaptor_Savoir_LastTheme{}
needPush := adaptor.Transform(topic, msg)
if needPush != nil && needPush.Data != nil {
the.dataCache <- needPush
} else {
s, _ := json.Marshal(needPush)
if needPush != nil {
if needPush.Sensor == the.logTagId {
log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s)
}
}
}
return true
}

3
consumers/consumerManage.go

@ -53,6 +53,9 @@ func GetConsumer(name string) (consumer IConsumer) {
case "consumerSavoirTheme": case "consumerSavoirTheme":
consumer = new(consumerSavoirTheme) consumer = new(consumerSavoirTheme)
case "consumerAxySkAlarm":
consumer = new(consumerAxySkAlarm)
default: default:
consumer = nil consumer = nil
} }

33
dbOperate/elasticsearchHelper.go

@ -221,6 +221,39 @@ func (the *ESHelper) SearchAlarmThemeData(index string, queryBody string) ([]mod
return sensors, err return sensors, err
} }
func (the *ESHelper) SearchAlarm(index string, queryBody string) ([]models.EsAlarm, error) {
var sensors []models.EsAlarm
alarmsResp, err := the.searchAlarm(index, queryBody)
var sensor models.EsAlarm
for _, hitAlarm := range alarmsResp.Hits.Hits {
sensor = hitAlarm.Source
sensors = append(sensors, sensor)
}
return sensors, err
}
func (the *ESHelper) searchAlarm(index, reqBody string) (models.EsAlarmResp, error) {
body := &bytes.Buffer{}
body.WriteString(reqBody)
response, err := the.esClient.Search(
the.esClient.Search.WithIndex(index),
the.esClient.Search.WithBody(body),
)
defer response.Body.Close()
if err != nil {
//return nil, err
}
log.Println(response.Status())
r := models.EsAlarmResp{}
// Deserialize the response into a map.
if err := json.NewDecoder(response.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
return r, err
}
func (the *ESHelper) BulkWrite(index, reqBody string) { func (the *ESHelper) BulkWrite(index, reqBody string) {
body := &bytes.Buffer{} body := &bytes.Buffer{}

46
models/esAlarm.go

@ -0,0 +1,46 @@
package models
import "time"
type EsAlarmResp struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
Hits []HitAlarm `json:"hits"`
} `json:"hits"`
}
type HitAlarm struct {
Index string `json:"_index"`
Type string `json:"_type"`
Id string `json:"_id"`
Score float64 `json:"_score"`
Source EsAlarm `json:"_source"`
}
type EsAlarm struct {
StartTime time.Time `json:"start_time"`
AlarmTypeCode string `json:"alarm_type_code"`
State int `json:"state"` //0 新,1次数,2等级提升,3自动恢复,
AlarmCount int `json:"alarm_count"`
AlarmTypeId int `json:"alarm_type_id"`
EndTime time.Time `json:"end_time"`
StructureId int `json:"structure_id"`
SourceTypeId int `json:"source_type_id"`
AlarmContent string `json:"alarm_content"`
SourceName string `json:"source_name"`
SourceId string `json:"source_id"`
InitialLevel int `json:"initial_level"`
CurrentLevel int `json:"current_level"`
Detail string `json:"detail"`
AlarmCode string `json:"alarm_code"`
IsTriggerPart bool `json:"isTriggerPart"`
}
Loading…
Cancel
Save