et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

174 lines
6.3 KiB

package et_analyze
import (
"encoding/json"
"errors"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_models/constant/redisKey"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
"strconv"
)
type AggThresholdHandler struct {
alarmTopic string
configHelper *common_utils.ConfigHelper
kafkaAsyncProducer *kafkaHelper.KafkaAsyncProducer
}
func NewAggThresholdHandler() *AggThresholdHandler {
configYaml := configLoad.LoadConfig()
redisAdd := configYaml.GetString("redis.address")
kafkaBrokers := configYaml.GetStringSlice("kafka.brokers")
alarmTopic := configYaml.GetString("kafka.topics.alarm_anxinyun")
return &AggThresholdHandler{
alarmTopic: alarmTopic,
configHelper: common_utils.NewConfigHelper(redisAdd),
kafkaAsyncProducer: kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers),
}
}
// ProcessData 进行 aggData 阈值分析,向 kafka 发送 alarm 消息
func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) error {
if aggData == nil || aggData.Changed == nil || len(aggData.Changed) == 0 {
errmsg := fmt.Sprintf("aggData非法数据:[%v]", aggData)
//log.Println(errmsg)
return errors.New(errmsg)
}
// 无告警
alarmMsg := t.judgeThreshold(aggData)
if alarmMsg == nil {
return nil
}
// 以下为有告警产生后的处理
stationInfo, err := t.configHelper.GetStationInfo(aggData.SensorId)
if err != nil {
key := fmt.Sprintf("%s:%d", redisKey.Station, aggData.SensorId)
errmsg := fmt.Sprintf("[%s] StationId not found in redis ", key)
log.Println(errmsg)
return errors.New(errmsg)
}
alarmMsg.Sponsor = common_models.Alarm_Sponsor_Recv
jsonData, err := json.Marshal(alarmMsg)
if err != nil {
errmsg := fmt.Sprintf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", stationInfo.Id, stationInfo.Name, t.alarmTopic, alarmMsg, err)
log.Println(errmsg)
return err
}
// 发布 kafka 消息
fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData)
t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData)
return nil
}
func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *common_models.AlarmMsg {
var aggTypeMap = map[int]string{
2001: "日聚集",
2002: "周聚集",
2003: "月聚集",
2004: "年聚集",
2005: "时聚集",
2006: "10分钟聚集",
}
aggTypeStr := aggTypeMap[aggData.AggTypeId]
if aggTypeStr == "" {
aggTypeStr = "其他聚集"
}
// 检查测点是否有聚集阈值配置信息
aggThreshold, err := t.configHelper.GetAggThreshold(aggData.StructId, aggData.FactorId)
var filteredThres = make([]common_models.AggThresholdItem, 0)
if err != nil || aggThreshold.Items == nil || len(aggThreshold.Items) == 0 {
log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId)
return nil
} else {
for _, item := range aggThreshold.Items {
if item.AggCategory != nil && *item.AggCategory == aggData.AggTypeId {
filteredThres = append(filteredThres, item)
}
}
if len(filteredThres) == 0 {
log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId)
return nil
}
}
factor, err := t.configHelper.GetFactorInfo(aggData.FactorId)
if err != nil || factor.Items == nil || len(factor.Items) == 0 {
log.Printf("获取factor异常:[structId:%d factorId:%d]Error: %v \n", aggData.StructId, aggData.FactorId, err)
return nil
}
var ls []common_models.ThresholdAlarmDetail
// Changed map[string]float64 // 变化量
for fieldName, val := range aggData.Changed {
protoItem := factor.GetProtoItem(fieldName)
if protoItem == nil {
log.Printf("测点[%d-%d-%d][%s]aggData[%v],但是Redis中没有对应的[protoItem:%s]。\n",
aggData.StructId, aggData.FactorId, aggData.SensorId, aggData.Date, aggData, fieldName)
} else {
protoItemThs := aggThreshold.GetThresholdsByItem(filteredThres, protoItem.Id)
overThresholdItem := aggThreshold.FindThresholdInRange(protoItemThs, val)
if overThresholdItem != nil {
// content 格式如:应变的10分钟聚集变化率:-0.60με,超1级阈值[-1~-0.5]
content := fmt.Sprintf("%s的%s变化率:%f%s,超%d级阈值[%s]", protoItem.Name, aggTypeStr, val, protoItem.UnitName, overThresholdItem.Level, overThresholdItem.RangeText())
ls = append(ls, common_models.ThresholdAlarmDetail{Level: overThresholdItem.Level, Content: content})
}
//log.Printf("[aggThreshold judgeThreshold] 测点[sensorId: %d] fieldName: %s, ChangedVal (float64): %f\n", aggData.SensorId, fieldName, val)
}
}
alarmMsg := t.getAndCacheAlarmMsg(aggData, ls)
return alarmMsg
}
func (t *AggThresholdHandler) getAndCacheAlarmMsg(aggData *common_models.AggData, ls []common_models.ThresholdAlarmDetail) *common_models.AlarmMsg {
stationInfo, err := t.configHelper.GetStationInfo(aggData.SensorId)
if err != nil {
log.Printf("[%v]测点不存在\n", aggData)
return nil
}
if ls != nil && len(ls) > 0 {
// 超阈值告警
alarm := common_models.NewOverChangingRateThreshold(findMinLevel(ls), stringifyThresholds(ls))
if alarm == nil {
log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content)
return nil
}
log.Printf("%s[aggTypeId:%d][%s] over-agg-threshold: [%s]\n", aggData.R(), aggData.AggTypeId, aggData.T(), alarm.Content)
// 将测点信息 -> 告警信息
alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date)
// Redis 中添加告警记录, key = alarm:2:100
key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId))
t.configHelper.SAddAlarm(key, alarm.AlarmType)
return &alarmMsg
} else {
// Redis 中删除告警记录,key = alarm:2:100
key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId))
affects := t.configHelper.SRemAlarm(key, common_models.Alarm_Type_Over_ChangeRate_Threshold)
// 如果Redis中存在告警,则要发送恢复告警
if affects > 0 {
alarm := common_models.NewRecoverChangingRateThreshold()
alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date)
return &alarmMsg
}
}
return nil
}