You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
173 lines
6.5 KiB
173 lines
6.5 KiB
1 month ago
|
package et_analyze
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"gitea.anxinyun.cn/container/common_models"
|
||
|
"gitea.anxinyun.cn/container/common_utils"
|
||
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
||
|
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
|
||
|
"log"
|
||
|
"node/stages"
|
||
|
"strconv"
|
||
|
)
|
||
|
|
||
|
type ThresholdHandler struct {
|
||
|
alarmTopic string // 本模块业务字段
|
||
|
stage *stages.Stage // 必须
|
||
|
configHelper *common_utils.ConfigHelper // 可选
|
||
|
kafkaAsyncProducer *kafkaHelper.KafkaAsyncProducer // 可选
|
||
|
kafkaAlarmTopic string // 可选
|
||
|
}
|
||
|
|
||
|
func NewThresholdHandler() *ThresholdHandler {
|
||
|
configYaml := configLoad.LoadConfig()
|
||
|
redisAdd := configYaml.GetString("redis.address")
|
||
|
kafkaBrokers := configYaml.GetStringSlice("kafka.brokers")
|
||
|
alarmTopic := configYaml.GetString("kafka.topics.alarm_anxinyun")
|
||
|
|
||
|
model := &ThresholdHandler{
|
||
|
alarmTopic: alarmTopic,
|
||
|
stage: stages.NewStage("阈值判断"),
|
||
|
configHelper: common_utils.NewConfigHelper(redisAdd),
|
||
|
kafkaAsyncProducer: kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers),
|
||
|
kafkaAlarmTopic: alarmTopic,
|
||
|
}
|
||
|
|
||
|
model.stage.AddProcess(model.processData)
|
||
|
return model
|
||
|
}
|
||
|
|
||
|
func (the *ThresholdHandler) GetStage() stages.Stage {
|
||
|
return *the.stage
|
||
|
}
|
||
|
|
||
|
// 必须
|
||
|
func (t *ThresholdHandler) processData(resultData *common_models.ProcessData) *common_models.ProcessData {
|
||
|
if resultData == nil || resultData.Stations == nil || len(resultData.Stations) == 0 {
|
||
|
return resultData
|
||
|
}
|
||
|
|
||
|
for i := range resultData.Stations {
|
||
|
station := &resultData.Stations[i]
|
||
|
alarmMsg := t.judgeThreshold(station)
|
||
|
|
||
|
//fmt.Printf("测点[%d-%s]kafka[topic:%s][content:%s]\n", station.Info.Id, station.Info.Name, t.alarmTopic, alarmMsg.Content)
|
||
|
if alarmMsg != nil {
|
||
|
// 设置发起者
|
||
|
alarmMsg.Sponsor = common_models.Alarm_Sponsor_Threshold
|
||
|
jsonData, err := json.Marshal(alarmMsg)
|
||
|
if err != nil {
|
||
|
log.Printf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", station.Info.Id, station.Info.Name, t.alarmTopic, alarmMsg, err)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// 发布 kafka 消息
|
||
|
log.Printf("测点[%d-%s][kafka-topic:%s]%s\n", station.Info.Id, station.Info.Name, t.alarmTopic, jsonData)
|
||
|
t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return resultData
|
||
|
}
|
||
|
|
||
|
func (t *ThresholdHandler) judgeThreshold(station *common_models.Station) *common_models.AlarmMsg {
|
||
|
// 检查测点是否有阈值配置信息
|
||
|
if station.Threshold == nil || station.Threshold.Items == nil {
|
||
|
log.Printf("测点[%d-%s]未配置阈值,无须进行阈值判断\n", station.Info.Id, station.Info.Name)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// 获取测点数据的阈值配置
|
||
|
filteredItems := station.Threshold.GetThresholdsByTime(station.Data.CollectTime)
|
||
|
if len(filteredItems) == 0 {
|
||
|
log.Printf("测点[%d-%s][%s]无对应阈值项,无须进行阈值判断\n", station.Info.Id, station.Info.Name, station.Data.CollectTime)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
var ls []common_models.ThresholdAlarmDetail
|
||
|
for fieldName, themeVal := range station.Data.ThemeData {
|
||
|
if val, ok := themeVal.(float64); ok {
|
||
|
protoItem := station.Info.Proto.GetProtoItem(fieldName)
|
||
|
if protoItem == nil {
|
||
|
log.Printf("测点[%d-%s][%s][%s]themeData[%v],但是Redis中没有对应的[protoItem:%s]。\n",
|
||
|
station.Info.Id, station.Info.Name, station.Data.CollectTime, fieldName, station.Data.ThemeData, fieldName)
|
||
|
} else {
|
||
|
protoItemThs := station.Threshold.GetThresholdsByItem(filteredItems, protoItem.Id)
|
||
|
overThresholdItem := station.Threshold.FindThresholdInRange(protoItemThs, val)
|
||
|
if overThresholdItem != nil {
|
||
|
// content 格式如:1:湿度采集值:13.50%RH,超1级阈值[6~16]
|
||
|
content := fmt.Sprintf("%s采集值:%.2f%s,超%d级阈值[%s]", protoItem.Name, val, protoItem.UnitName, overThresholdItem.Level, overThresholdItem.RangeText())
|
||
|
ls = append(ls, common_models.ThresholdAlarmDetail{Level: overThresholdItem.Level, Content: content})
|
||
|
}
|
||
|
//fmt.Printf("[threshold judgeThreshold] 测点[%d-%s] fieldName: %s, ThemeVal (float64): %f\n", station.Info.Id, station.Info.Name, fieldName, val)
|
||
|
}
|
||
|
} else {
|
||
|
log.Printf("[threshold judgeThreshold] 测点[%d-%s] fieldName: %s, ThemeVal cannot be converted to float64\n", station.Info.Id, station.Info.Name, fieldName)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// []ThresholdAlarmDetail -> AlarmMsg
|
||
|
alarmMsg := t.getAndCacheAlarmMsg(station, ls)
|
||
|
return alarmMsg
|
||
|
}
|
||
|
|
||
|
// GetAndCacheAlarmMsg 获取和缓存 AlarmMsg
|
||
|
func (t *ThresholdHandler) getAndCacheAlarmMsg(station *common_models.Station, ls []common_models.ThresholdAlarmDetail) *common_models.AlarmMsg {
|
||
|
if ls != nil && len(ls) > 0 {
|
||
|
// 超阈值告警
|
||
|
alarm := common_models.NewAlarmOverThreshold(findMinLevel(ls), stringifyThresholds(ls))
|
||
|
if alarm == nil {
|
||
|
log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s ERROR: 未找到对应告警等级的告警码。\n",
|
||
|
station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// 给测点数据设置告警等级
|
||
|
station.Data.AlarmLevel = alarm.Level
|
||
|
log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s\n",
|
||
|
station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime)
|
||
|
|
||
|
// 将测点信息 -> 告警信息
|
||
|
alarmMsg := alarm.ToAlarmMsg(station.Info, station.Data.CollectTime)
|
||
|
|
||
|
// Redis 中添加告警记录, key = alarm:2:100
|
||
|
redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(station.Info.Id))
|
||
|
t.configHelper.SAddAlarm(redisKey, alarm.AlarmType)
|
||
|
|
||
|
return &alarmMsg
|
||
|
|
||
|
} else {
|
||
|
// Redis 中删除告警记录, key = alarm:2:100
|
||
|
redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(station.Info.Id))
|
||
|
affects := t.configHelper.SRemAlarm(redisKey, common_models.Alarm_Type_Over_Threshold)
|
||
|
if affects > 0 {
|
||
|
// 如果Redis中存在告警,则要发送恢复告警
|
||
|
alarm := common_models.NewAlarmRecoverThreshold()
|
||
|
alarmMsg := alarm.ToAlarmMsg(station.Info, station.Data.CollectTime)
|
||
|
return &alarmMsg
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 既没有产生新告警,也没有需要恢复的告警
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func findMinLevel(ls []common_models.ThresholdAlarmDetail) int {
|
||
|
minLevel := ls[0].Level
|
||
|
for _, detail := range ls {
|
||
|
if detail.Level < minLevel {
|
||
|
minLevel = detail.Level
|
||
|
}
|
||
|
}
|
||
|
return minLevel
|
||
|
}
|
||
|
|
||
|
func stringifyThresholds(ls []common_models.ThresholdAlarmDetail) string {
|
||
|
str := ""
|
||
|
for _, detail := range ls {
|
||
|
str += fmt.Sprintf("%d:%s;", detail.Level, detail.Content)
|
||
|
}
|
||
|
return str
|
||
|
}
|