diff --git a/adaptors/知物云es告警组合聚集to触发爆闪.go b/adaptors/知物云es告警组合聚集to触发爆闪.go new file mode 100644 index 0000000..105229c --- /dev/null +++ b/adaptors/知物云es告警组合聚集to触发爆闪.go @@ -0,0 +1,89 @@ +package adaptors + +import ( + "encoding/json" + "fmt" + + "goInOut/consumers/AlarmCombination" + + "goInOut/models" + + "log" + + "strconv" + "strings" + "time" +) + +// Adaptor_ZWY_AlarmCombin 知物云的告警数据 组合统计 触发后 发到 kafka +type Adaptor_ZWY_AlarmCombin struct { + + //一些必要信息 + Info map[string]string +} + +func (the Adaptor_ZWY_AlarmCombin) Transform(config AlarmCombination.CombinationInfo, rawMsg string) []NeedPush { + log.Printf("解析数据") + configItems := config.ConfigItems + esAggData := AlarmCombination.EsAggAlarm{} + esAggData.Aggregations.GroupBySensor.Buckets = []AlarmCombination.BucketsSensorDataCount{} + var needPush []NeedPush + err := json.Unmarshal([]byte(rawMsg), &esAggData) + if err != nil { + log.Printf("解析 es proxy 数据异常: %s", err.Error()) + return nil + } + pointsCount := 0 + for _, item := range configItems { + pointsCount += len(item.StationIds) + } + + esAggPointsCount := len(esAggData.Aggregations.GroupBySensor.Buckets) + + if esAggPointsCount < pointsCount { + log.Printf("es 聚集查询告警数=%d < 配置测点数 %d", esAggPointsCount, pointsCount) + return nil + } + msg := fmt.Sprintf("组合告警[%s]生效,存在%d个不同测点的告警", config.Name, esAggPointsCount) + log.Println(msg) + prefix := "zh-" + sourceId := prefix + strconv.Itoa(config.Id) + alarmMsg := models.KafkaAlarm{ + MessageMode: "AlarmGeneration", + StructureId: config.StructId, + StructureName: config.StructName, + SourceId: sourceId, + SourceName: config.Name, + AlarmTypeCode: "8003", + AlarmCode: "80030001", + Content: msg, + Time: time.Now().Format("2006-01-02T15:04:05+0800"), + SourceTypeId: 4, // 0:DTU, 1:传感器, 2:测点 + Sponsor: "goInOut_zhgj", + Extras: nil, + SubDevices: nil, + } + + Payload, _ := json.Marshal(alarmMsg) + + needPush = append(needPush, NeedPush{ + Payload: Payload, + }) + return needPush +} + +func (the Adaptor_ZWY_AlarmCombin) GetPointCodeFromLabel(label string) int64 { + //解析label {13010600001} + pointUniqueCode := int64(0) + if len(label) > 2 { + newLabel := strings.TrimLeft(label, "{") + str := strings.TrimRight(newLabel, "}") + codeInt64, err := strconv.ParseInt(str, 10, 64) + if err != nil { + log.Printf("测点标签转换异常[%s]", label) + } + pointUniqueCode = codeInt64 + } + + return pointUniqueCode +} diff --git a/configFiles/config_知物云_组合告警.yaml b/configFiles/config_知物云_组合告警.yaml index ad5cfd2..c0902e4 100644 --- a/configFiles/config_知物云_组合告警.yaml +++ b/configFiles/config_知物云_组合告警.yaml @@ -10,14 +10,9 @@ ioConfig: topics: - zuhe_alarm monitor: - cron_pg: 23 0/1 * * * #6/10 * * * * - #普通类型 特征数据 - cron_redis: 20 0/1 * * * + cron: 31 0/1 * * * #6/10 * * * * info: - rc4key: sK3kJttzZyf7aZI94zSYgytBYCrfZRt6yil2 queryComponent: - redis: - address: 10.8.30.160:30379 postgres: connect: "host=10.8.30.166 port=5432 user=FashionAdmin password=123456 dbname=SavoirCloud sslmode=disable" #点位id对应信息 diff --git a/consumers/AlarmCombination/alarmAggResp.go b/consumers/AlarmCombination/alarmAggResp.go new file mode 100644 index 0000000..fb66faa --- /dev/null +++ b/consumers/AlarmCombination/alarmAggResp.go @@ -0,0 +1,28 @@ +package AlarmCombination + +type EsAggAlarm struct { + Took int `json:"took"` + TimedOut bool `json:"timed_out"` + Shards struct { + Total int `json:"total"` + Successful int `json:"successful"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + } `json:"_shards"` + Hits struct { + Total int `json:"total"` + MaxScore float64 `json:"max_score"` + } `json:"hits"` + Aggregations struct { + GroupBySensor struct { + DocCountErrorUpperBound int `json:"doc_count_error_upper_bound"` + SumOtherDocCount int `json:"sum_other_doc_count"` + Buckets []BucketsSensorDataCount `json:"buckets"` + } `json:"groupBySensor"` + } `json:"aggregations"` +} + +type BucketsSensorDataCount struct { + Key string `json:"key"` + DocCount int `json:"doc_count"` +} diff --git a/consumers/AlarmCombination/dataModel.go b/consumers/AlarmCombination/dataModel.go index 73fd3a0..cc0413e 100644 --- a/consumers/AlarmCombination/dataModel.go +++ b/consumers/AlarmCombination/dataModel.go @@ -1,14 +1,23 @@ package AlarmCombination -import "time" +import ( + "time" +) type CombinationInfo struct { - Id int64 `json:"id" db:"id"` - Name string `json:"name" db:"name"` - Description string `json:"description" db:"description"` - Config any `json:"config" db:"config"` - Enable bool `json:"enable" db:"enable"` - StructId int32 `json:"struct_id" db:"struct_id"` - OrgId int32 `json:"org_id" db:"org_id"` - UpdateTime time.Time `json:"update_time" db:"update_time"` + Id int `json:"id" db:"id"` + Name string `json:"name" db:"name"` + Description string `json:"description" db:"description"` + ConfigStr string `json:"config" db:"config"` + ConfigItems []ConfigItems `json:"ConfigItems" ` + Enable bool `json:"enable" db:"enable"` + StructId int `json:"struct_id" db:"struct_id"` + StructName string `json:"struct_name" db:"struct_name"` + OrgId int `json:"org_id" db:"org_id"` + UpdateTime time.Time `json:"update_time" db:"update_time"` +} + +type ConfigItems struct { + FactorId int `json:"factorId"` + StationIds []int `json:"stationIds"` } diff --git a/consumers/consumerAlarmCombination.go b/consumers/consumerAlarmCombination.go index 23fa041..fd5188e 100644 --- a/consumers/consumerAlarmCombination.go +++ b/consumers/consumerAlarmCombination.go @@ -1,13 +1,13 @@ package consumers import ( + "encoding/json" "fmt" "goInOut/adaptors" "goInOut/consumers/AlarmCombination" "goInOut/dbOperate" "goInOut/dbOperate/_kafka" "goInOut/monitors" - "goInOut/utils" "gopkg.in/yaml.v3" "log" "time" @@ -23,6 +23,8 @@ type consumerAlarmCombination struct { monitor *monitors.CommonMonitor infoRedis *dbOperate.RedisHelper infoPg *dbOperate.DBHelper + //数据库配置信息 + combinationInfo []AlarmCombination.CombinationInfo } func (the *consumerAlarmCombination) LoadConfigJson(cfgStr string) { @@ -58,10 +60,8 @@ func (the *consumerAlarmCombination) InputInitial() error { the.monitor.Start() for taskName, cron := range the.Info.Monitor { switch taskName { - case "cron_pg": + case "cron": the.monitor.RegisterTask(cron, the.updateCombinationInfo) - case "cron_redis": - the.monitor.RegisterTask(cron, the.getEs1HourAggData) default: log.Printf("定时任务[%s],cron=[%s] 无匹配", taskName, cron) } @@ -79,8 +79,6 @@ func (the *consumerAlarmCombination) OutputInitial() error { func (the *consumerAlarmCombination) infoComponentInitial() error { //数据出口 - addr := the.Info.QueryComponent.Redis.Address - the.infoRedis = dbOperate.NewRedisHelper("", addr) pgConnStr := the.Info.QueryComponent.Pg.Connect the.infoPg = dbOperate.NewDBHelper("postgres", pgConnStr) return nil @@ -111,45 +109,71 @@ func (the *consumerAlarmCombination) Work() { }() } -func (the *consumerAlarmCombination) getAdaptor() (adaptor adaptors.Adaptor_AXYES_HBGL) { +func (the *consumerAlarmCombination) getAdaptor() (adaptor adaptors.Adaptor_ZWY_AlarmCombin) { - return adaptors.Adaptor_AXYES_HBGL{ - Redis: the.infoRedis, - } + return adaptors.Adaptor_ZWY_AlarmCombin{} } -func (the *consumerAlarmCombination) getEs1HourAggData() { - start, end := utils.GetTimeRangeByHour(-1) - log.Printf("查询数据时间范围 %s - %s", start, end) - hourFactorIds := []int{15, 18, 20} - structIds := []int64{1, 2} - for _, structId := range structIds { - for _, factorId := range hourFactorIds { - esQuery := the.getESQueryStrByHour(structId, factorId, start, end) - auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} - esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) +func (the *consumerAlarmCombination) getEs1minAlarmData() { + adaptor := the.getAdaptor() - adaptor := the.getAdaptor() - needPushes := adaptor.Transform(structId, factorId, esAggResultStr) + for _, comInfos := range the.combinationInfo { + //满足条件 sourceTypeId: Int = 1, // 0:DTU, 1:传感器, 2:测点 + //state 0 新告警 1次数更新 2 等级提升 3 自动恢复 4 手动恢复 5 已恢复待确认 - if len(needPushes) > 0 { - the.ch <- needPushes - } + esQuery := the.getESAlarmAggQueryStr(comInfos) + auth := map[string]string{"Authorization": "Bear 85a441d4-022b-4613-abba-aaa8e2693bf7"} + esAggResultStr := the.InHttp.HttpGetWithHeader(esQuery, auth) + lenRes := len(esAggResultStr) + if lenRes < 120 { + log.Printf("[s=%d]es agg 返回 无 有效数据,resp=%s ", comInfos.StructId, esAggResultStr) + continue + } + needPushes := adaptor.Transform(comInfos, esAggResultStr) + + if len(needPushes) > 0 { + the.ch <- needPushes } + } } func (the *consumerAlarmCombination) updateCombinationInfo() { log.Printf("更新 数据库 组合配置信息") - sql := `SELECT * FROM "t_alarm_combination"` - var CombinationInfos []AlarmCombination.CombinationInfo - the.infoPg.Query(&CombinationInfos, sql) - println("======") + sql := `SELECT a.*,s."name" as struct_name FROM "t_alarm_combination" as a + left join t_structure as s + ON a.struct_id=s."id"` + sql += " where a.id=1" //测试用 + //var CombinationInfos []AlarmCombination.CombinationInfo + err := the.infoPg.Query(&the.combinationInfo, sql) + if err != nil { + log.Printf("查询数据库异常:err-> %s", err.Error()) + return + } + for i, info := range the.combinationInfo { + err := json.Unmarshal([]byte(info.ConfigStr), &info.ConfigItems) + if err != nil { + return + } + the.combinationInfo[i].ConfigItems = info.ConfigItems + } + log.Printf("共刷新%d条配置", len(the.combinationInfo)) + + //立即触发 + the.getEs1minAlarmData() } -func (the *consumerAlarmCombination) getESQueryStrByHour(structureId int64, factorId int, start, end string) string { - aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) +func (the *consumerAlarmCombination) getESAlarmAggQueryStr(onceConfig AlarmCombination.CombinationInfo) string { + structureId := onceConfig.StructId + var allStationId []int + for _, item := range onceConfig.ConfigItems { + if len(item.StationIds) > 0 { + allStationId = append(allStationId, item.StationIds...) + } + } + + allStationIdStr, _ := json.Marshal(allStationId) esQuery := fmt.Sprintf(` { "size": 0, @@ -158,106 +182,47 @@ func (the *consumerAlarmCombination) getESQueryStrByHour(structureId int64, fact "must": [ { "term": { - "structure": { - "value": %d - } - } - }, - { - "term": { - "factor": { + "structure_id": { "value": %d } } }, - { - "range": { - "collect_time": { - "gte": "%s", - "lt": "%s" - } - } - } - ] - } - }, - "aggs": { - "groupSensor": { - "terms": { - "field": "sensor" - }, - "aggs": { - "groupDate": { - "date_histogram": { - "field": "collect_time", - "interval": "1h", - "time_zone": "Asia/Shanghai", - "min_doc_count": 1 - }, - "aggs": %s - } - } - } - } -} -`, structureId, factorId, start, end, aggSubSql) - - return esQuery -} - -func (the *consumerAlarmCombination) getESQueryStrBy10min(structureId int64, factorId int, start, end string) string { - aggSubSql := utils.GetEsAggSubSqlByAxyFactorId(factorId) - esQuery := fmt.Sprintf(` -{ - "size": 0, - "query": { - "bool": { - "must": [ { "term": { - "structure": { - "value": %d + "source_type_id": { + "value": 2 } } }, { - "term": { - "factor": { - "value": %d - } + "terms": { + "state": [ + 0, + 1, + 2 + ] } }, { - "range": { - "collect_time": { - "gte": "%s", - "lte": "%s" - } + "terms": { + "source_id": %s } } ] } }, "aggs": { - "groupSensor": { + "groupBySensor": { "terms": { - "field": "sensor" - }, - "aggs": { - "groupDate": { - "date_histogram": { - "field": "collect_time", - "interval": "10m", - "time_zone": "Asia/Shanghai", - "min_doc_count": 1 - }, - "aggs": %s + "field": "source_id", + "order": { + "_count": "DESC" } } } } } -`, structureId, factorId, start, end, aggSubSql) +`, structureId, allStationIdStr) return esQuery } diff --git a/models/kafkaMsg.go b/models/kafkaMsg.go new file mode 100644 index 0000000..0d58004 --- /dev/null +++ b/models/kafkaMsg.go @@ -0,0 +1,17 @@ +package models + +type KafkaAlarm struct { + MessageMode string `json:"messageMode"` + StructureId int `json:"structureId"` + StructureName string `json:"structureName"` + SourceId string `json:"sourceId"` + SourceName string `json:"sourceName"` + AlarmTypeCode string `json:"alarmTypeCode"` + AlarmCode string `json:"alarmCode"` + Content string `json:"content"` + Time string `json:"time"` + SourceTypeId int `json:"sourceTypeId"` + Sponsor string `json:"sponsor"` + Extras any `json:"extras"` + SubDevices []any `json:"subDevices"` +}