Browse Source

update 更新组合告警判断

dev
lucas 3 days ago
parent
commit
067cc102f9
  1. 89
      adaptors/知物云es告警组合聚集to触发爆闪.go
  2. 7
      configFiles/config_知物云_组合告警.yaml
  3. 28
      consumers/AlarmCombination/alarmAggResp.go
  4. 27
      consumers/AlarmCombination/dataModel.go
  5. 177
      consumers/consumerAlarmCombination.go
  6. 17
      models/kafkaMsg.go

89
adaptors/知物云es告警组合聚集to触发爆闪.go

@ -0,0 +1,89 @@
package adaptors
import (
"encoding/json"
"fmt"
"goInOut/consumers/AlarmCombination"
"goInOut/models"
"log"
"strconv"
"strings"
"time"
)
// Adaptor_ZWY_AlarmCombin 知物云的告警数据 组合统计 触发后 发到 kafka
type Adaptor_ZWY_AlarmCombin struct {
//一些必要信息
Info map[string]string
}
func (the Adaptor_ZWY_AlarmCombin) Transform(config AlarmCombination.CombinationInfo, rawMsg string) []NeedPush {
log.Printf("解析数据")
configItems := config.ConfigItems
esAggData := AlarmCombination.EsAggAlarm{}
esAggData.Aggregations.GroupBySensor.Buckets = []AlarmCombination.BucketsSensorDataCount{}
var needPush []NeedPush
err := json.Unmarshal([]byte(rawMsg), &esAggData)
if err != nil {
log.Printf("解析 es proxy 数据异常: %s", err.Error())
return nil
}
pointsCount := 0
for _, item := range configItems {
pointsCount += len(item.StationIds)
}
esAggPointsCount := len(esAggData.Aggregations.GroupBySensor.Buckets)
if esAggPointsCount < pointsCount {
log.Printf("es 聚集查询告警数=%d < 配置测点数 %d", esAggPointsCount, pointsCount)
return nil
}
msg := fmt.Sprintf("组合告警[%s]生效,存在%d个不同测点的告警", config.Name, esAggPointsCount)
log.Println(msg)
prefix := "zh-"
sourceId := prefix + strconv.Itoa(config.Id)
alarmMsg := models.KafkaAlarm{
MessageMode: "AlarmGeneration",
StructureId: config.StructId,
StructureName: config.StructName,
SourceId: sourceId,
SourceName: config.Name,
AlarmTypeCode: "8003",
AlarmCode: "80030001",
Content: msg,
Time: time.Now().Format("2006-01-02T15:04:05+0800"),
SourceTypeId: 4, // 0:DTU, 1:传感器, 2:测点
Sponsor: "goInOut_zhgj",
Extras: nil,
SubDevices: nil,
}
Payload, _ := json.Marshal(alarmMsg)
needPush = append(needPush, NeedPush{
Payload: Payload,
})
return needPush
}
func (the Adaptor_ZWY_AlarmCombin) GetPointCodeFromLabel(label string) int64 {
//解析label {13010600001}
pointUniqueCode := int64(0)
if len(label) > 2 {
newLabel := strings.TrimLeft(label, "{")
str := strings.TrimRight(newLabel, "}")
codeInt64, err := strconv.ParseInt(str, 10, 64)
if err != nil {
log.Printf("测点标签转换异常[%s]", label)
}
pointUniqueCode = codeInt64
}
return pointUniqueCode
}

7
configFiles/config_知物云_组合告警.yaml

@ -10,14 +10,9 @@ ioConfig:
topics: topics:
- zuhe_alarm - zuhe_alarm
monitor: monitor:
cron_pg: 23 0/1 * * * #6/10 * * * * cron: 31 0/1 * * * #6/10 * * * *
#普通类型 特征数据
cron_redis: 20 0/1 * * *
info: info:
rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2
queryComponent: queryComponent:
redis:
address: 10.8.30.160:30379
postgres: postgres:
connect: "host=10.8.30.166 port=5432 user=FashionAdmin password=123456 dbname=SavoirCloud sslmode=disable" connect: "host=10.8.30.166 port=5432 user=FashionAdmin password=123456 dbname=SavoirCloud sslmode=disable"
#点位id对应信息 #点位id对应信息

28
consumers/AlarmCombination/alarmAggResp.go

@ -0,0 +1,28 @@
package AlarmCombination
type EsAggAlarm struct {
Took int `json:"took"`
TimedOut bool `json:"timed_out"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
} `json:"_shards"`
Hits struct {
Total int `json:"total"`
MaxScore float64 `json:"max_score"`
} `json:"hits"`
Aggregations struct {
GroupBySensor struct {
DocCountErrorUpperBound int `json:"doc_count_error_upper_bound"`
SumOtherDocCount int `json:"sum_other_doc_count"`
Buckets []BucketsSensorDataCount `json:"buckets"`
} `json:"groupBySensor"`
} `json:"aggregations"`
}
type BucketsSensorDataCount struct {
Key string `json:"key"`
DocCount int `json:"doc_count"`
}

27
consumers/AlarmCombination/dataModel.go

@ -1,14 +1,23 @@
package AlarmCombination package AlarmCombination
import "time" import (
"time"
)
type CombinationInfo struct { type CombinationInfo struct {
Id int64 `json:"id" db:"id"` Id int `json:"id" db:"id"`
Name string `json:"name" db:"name"` Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"` Description string `json:"description" db:"description"`
Config any `json:"config" db:"config"` ConfigStr string `json:"config" db:"config"`
Enable bool `json:"enable" db:"enable"` ConfigItems []ConfigItems `json:"ConfigItems" `
StructId int32 `json:"struct_id" db:"struct_id"` Enable bool `json:"enable" db:"enable"`
OrgId int32 `json:"org_id" db:"org_id"` StructId int `json:"struct_id" db:"struct_id"`
UpdateTime time.Time `json:"update_time" db:"update_time"` StructName string `json:"struct_name" db:"struct_name"`
OrgId int `json:"org_id" db:"org_id"`
UpdateTime time.Time `json:"update_time" db:"update_time"`
}
type ConfigItems struct {
FactorId int `json:"factorId"`
StationIds []int `json:"stationIds"`
} }

177
consumers/consumerAlarmCombination.go

@ -1,13 +1,13 @@
package consumers package consumers
import ( import (
"encoding/json"
"fmt" "fmt"
"goInOut/adaptors" "goInOut/adaptors"
"goInOut/consumers/AlarmCombination" "goInOut/consumers/AlarmCombination"
"goInOut/dbOperate" "goInOut/dbOperate"
"goInOut/dbOperate/_kafka" "goInOut/dbOperate/_kafka"
"goInOut/monitors" "goInOut/monitors"
"goInOut/utils"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"log" "log"
"time" "time"
@ -23,6 +23,8 @@ type consumerAlarmCombination struct {
monitor *monitors.CommonMonitor monitor *monitors.CommonMonitor
infoRedis *dbOperate.RedisHelper infoRedis *dbOperate.RedisHelper
infoPg *dbOperate.DBHelper infoPg *dbOperate.DBHelper
//数据库配置信息
combinationInfo []AlarmCombination.CombinationInfo
} }
func (the *consumerAlarmCombination) LoadConfigJson(cfgStr string) { func (the *consumerAlarmCombination) LoadConfigJson(cfgStr string) {
@ -58,10 +60,8 @@ func (the *consumerAlarmCombination) InputInitial() error {
the.monitor.Start() the.monitor.Start()
for taskName, cron := range the.Info.Monitor { for taskName, cron := range the.Info.Monitor {
switch taskName { switch taskName {
case "cron_pg": case "cron":
the.monitor.RegisterTask(cron, the.updateCombinationInfo) the.monitor.RegisterTask(cron, the.updateCombinationInfo)
case "cron_redis":
the.monitor.RegisterTask(cron, the.getEs1HourAggData)
default: default:
log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron)
} }
@ -79,8 +79,6 @@ func (the *consumerAlarmCombination) OutputInitial() error {
func (the *consumerAlarmCombination) infoComponentInitial() error { func (the *consumerAlarmCombination) infoComponentInitial() error {
//数据出口 //数据出口
addr := the.Info.QueryComponent.Redis.Address
the.infoRedis = dbOperate.NewRedisHelper("", addr)
pgConnStr := the.Info.QueryComponent.Pg.Connect pgConnStr := the.Info.QueryComponent.Pg.Connect
the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr)
return nil return nil
@ -111,45 +109,71 @@ func (the *consumerAlarmCombination) Work() {
}() }()
} }
func (the *consumerAlarmCombination) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { func (the *consumerAlarmCombination) getAdaptor() (adaptor adaptors.Adaptor_ZWY_AlarmCombin) {
return adaptors.Adaptor_AXYES_HBGL{ return adaptors.Adaptor_ZWY_AlarmCombin{}
Redis: the.infoRedis,
}
} }
func (the *consumerAlarmCombination) getEs1HourAggData() { func (the *consumerAlarmCombination) getEs1minAlarmData() {
start, end := utils.GetTimeRangeByHour(-1) adaptor := the.getAdaptor()
log.Printf("查询数据时间范围 %s - %s", start, end)
hourFactorIds := []int{15, 18, 20}
structIds := []int64{1, 2}
for _, structId := range structIds {
for _, factorId := range hourFactorIds {
esQuery := the.getESQueryStrByHour(structId, factorId, start, end)
auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
adaptor := the.getAdaptor() for _, comInfos := range the.combinationInfo {
needPushes := adaptor.Transform(structId, factorId, esAggResultStr) //满足条件 sourceTypeId: Int = 1, // 0:DTU, 1:传感器, 2:测点
//state 0 新告警 1次数更新 2 等级提升 3 自动恢复 4 手动恢复 5 已恢复待确认
if len(needPushes) > 0 { esQuery := the.getESAlarmAggQueryStr(comInfos)
the.ch <- needPushes auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"}
} esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth)
lenRes := len(esAggResultStr)
if lenRes < 120 {
log.Printf("[s=%d]es agg 返回 无 有效数据,resp=%s ", comInfos.StructId, esAggResultStr)
continue
}
needPushes := adaptor.Transform(comInfos, esAggResultStr)
if len(needPushes) > 0 {
the.ch <- needPushes
} }
} }
} }
func (the *consumerAlarmCombination) updateCombinationInfo() { func (the *consumerAlarmCombination) updateCombinationInfo() {
log.Printf("更新 数据库 组合配置信息") log.Printf("更新 数据库 组合配置信息")
sql := `SELECT * FROM "t_alarm_combination"` sql := `SELECT a.*,s."name" as struct_name FROM "t_alarm_combination" as a
var CombinationInfos []AlarmCombination.CombinationInfo left join t_structure as s
the.infoPg.Query(&CombinationInfos, sql) ON a.struct_id=s."id"`
println("======") sql += " where a.id=1" //测试用
//var CombinationInfos []AlarmCombination.CombinationInfo
err := the.infoPg.Query(&the.combinationInfo, sql)
if err != nil {
log.Printf("查询数据库异常:err-> %s", err.Error())
return
}
for i, info := range the.combinationInfo {
err := json.Unmarshal([]byte(info.ConfigStr), &info.ConfigItems)
if err != nil {
return
}
the.combinationInfo[i].ConfigItems = info.ConfigItems
}
log.Printf("共刷新%d条配置", len(the.combinationInfo))
//立即触发
the.getEs1minAlarmData()
} }
func (the *consumerAlarmCombination) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { func (the *consumerAlarmCombination) getESAlarmAggQueryStr(onceConfig AlarmCombination.CombinationInfo) string {
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) structureId := onceConfig.StructId
var allStationId []int
for _, item := range onceConfig.ConfigItems {
if len(item.StationIds) > 0 {
allStationId = append(allStationId, item.StationIds...)
}
}
allStationIdStr, _ := json.Marshal(allStationId)
esQuery := fmt.Sprintf(` esQuery := fmt.Sprintf(`
{ {
"size": 0, "size": 0,
@ -158,106 +182,47 @@ func (the *consumerAlarmCombination) getESQueryStrByHour(structureId int64, fact
"must": [ "must": [
{ {
"term": { "term": {
"structure": { "structure_id": {
"value": %d
}
}
},
{
"term": {
"factor": {
"value": %d "value": %d
} }
} }
}, },
{
"range": {
"collect_time": {
"gte": "%s",
"lt": "%s"
}
}
}
]
}
},
"aggs": {
"groupSensor": {
"terms": {
"field": "sensor"
},
"aggs": {
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "1h",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
}
}
}
}
}
`, structureId, factorId, start, end, aggSubSql)
return esQuery
}
func (the *consumerAlarmCombination) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string {
aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId)
esQuery := fmt.Sprintf(`
{
"size": 0,
"query": {
"bool": {
"must": [
{ {
"term": { "term": {
"structure": { "source_type_id": {
"value": %d "value": 2
} }
} }
}, },
{ {
"term": { "terms": {
"factor": { "state": [
"value": %d 0,
} 1,
2
]
} }
}, },
{ {
"range": { "terms": {
"collect_time": { "source_id": %s
"gte": "%s",
"lte": "%s"
}
} }
} }
] ]
} }
}, },
"aggs": { "aggs": {
"groupSensor": { "groupBySensor": {
"terms": { "terms": {
"field": "sensor" "field": "source_id",
}, "order": {
"aggs": { "_count": "DESC"
"groupDate": {
"date_histogram": {
"field": "collect_time",
"interval": "10m",
"time_zone": "Asia/Shanghai",
"min_doc_count": 1
},
"aggs": %s
} }
} }
} }
} }
} }
`, structureId, factorId, start, end, aggSubSql) `, structureId, allStationIdStr)
return esQuery return esQuery
} }

17
models/kafkaMsg.go

@ -0,0 +1,17 @@
package models
type KafkaAlarm struct {
MessageMode string `json:"messageMode"`
StructureId int `json:"structureId"`
StructureName string `json:"structureName"`
SourceId string `json:"sourceId"`
SourceName string `json:"sourceName"`
AlarmTypeCode string `json:"alarmTypeCode"`
AlarmCode string `json:"alarmCode"`
Content string `json:"content"`
Time string `json:"time"`
SourceTypeId int `json:"sourceTypeId"`
Sponsor string `json:"sponsor"`
Extras any `json:"extras"`
SubDevices []any `json:"subDevices"`
}
Loading…
Cancel
Save