et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

173 lines
6.5 KiB

1 month ago
package et_analyze
import (
"encoding/json"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
"log"
"node/stages"
"strconv"
)
type ThresholdHandler struct {
alarmTopic string // 本模块业务字段
stage *stages.Stage // 必须
configHelper *common_utils.ConfigHelper // 可选
kafkaAsyncProducer *kafkaHelper.KafkaAsyncProducer // 可选
kafkaAlarmTopic string // 可选
}
func NewThresholdHandler() *ThresholdHandler {
configYaml := configLoad.LoadConfig()
redisAdd := configYaml.GetString("redis.address")
kafkaBrokers := configYaml.GetStringSlice("kafka.brokers")
alarmTopic := configYaml.GetString("kafka.topics.alarm_anxinyun")
model := &ThresholdHandler{
alarmTopic: alarmTopic,
stage: stages.NewStage("阈值判断"),
configHelper: common_utils.NewConfigHelper(redisAdd),
kafkaAsyncProducer: kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers),
kafkaAlarmTopic: alarmTopic,
}
model.stage.AddProcess(model.processData)
return model
}
func (t *ThresholdHandler) GetStage() stages.Stage {
return *t.stage
1 month ago
}
// 必须
func (t *ThresholdHandler) processData(resultData *common_models.ProcessData) *common_models.ProcessData {
if resultData == nil || resultData.Stations == nil || len(resultData.Stations) == 0 {
return resultData
}
for i := range resultData.Stations {
station := &resultData.Stations[i]
alarmMsg := t.judgeThreshold(station)
//fmt.Printf("测点[%d-%s]kafka[topic:%s][content:%s]\n", station.Info.Id, station.Info.Name, t.alarmTopic, alarmMsg.Content)
if alarmMsg != nil {
// 设置发起者
alarmMsg.Sponsor = common_models.Alarm_Sponsor_Threshold
jsonData, err := json.Marshal(alarmMsg)
if err != nil {
log.Printf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", station.Info.Id, station.Info.Name, t.alarmTopic, alarmMsg, err)
continue
}
// 发布 kafka 消息
log.Printf("测点[%d-%s][kafka-topic:%s]%s\n", station.Info.Id, station.Info.Name, t.alarmTopic, jsonData)
t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData)
}
}
return resultData
}
func (t *ThresholdHandler) judgeThreshold(station *common_models.Station) *common_models.AlarmMsg {
// 检查测点是否有阈值配置信息
if station.Threshold == nil || station.Threshold.Items == nil {
log.Printf("测点[%d-%s]未配置阈值,无须进行阈值判断\n", station.Info.Id, station.Info.Name)
return nil
}
// 获取测点数据的阈值配置
filteredItems := station.Threshold.GetThresholdsByTime(station.Data.CollectTime)
if len(filteredItems) == 0 {
log.Printf("测点[%d-%s][%s]无对应阈值项,无须进行阈值判断\n", station.Info.Id, station.Info.Name, station.Data.CollectTime)
return nil
}
var ls []common_models.ThresholdAlarmDetail
for fieldName, themeVal := range station.Data.ThemeData {
if val, ok := themeVal.(float64); ok {
protoItem := station.Info.Proto.GetProtoItem(fieldName)
if protoItem == nil {
log.Printf("测点[%d-%s][%s][%s]themeData[%v],但是Redis中没有对应的[protoItem:%s]。\n",
station.Info.Id, station.Info.Name, station.Data.CollectTime, fieldName, station.Data.ThemeData, fieldName)
} else {
protoItemThs := station.Threshold.GetThresholdsByItem(filteredItems, protoItem.Id)
overThresholdItem := station.Threshold.FindThresholdInRange(protoItemThs, val)
if overThresholdItem != nil {
// content 格式如:1:湿度采集值:13.50%RH,超1级阈值[6~16]
content := fmt.Sprintf("%s采集值:%.2f%s,超%d级阈值[%s]", protoItem.Name, val, protoItem.UnitName, overThresholdItem.Level, overThresholdItem.RangeText())
ls = append(ls, common_models.ThresholdAlarmDetail{Level: overThresholdItem.Level, Content: content})
}
//fmt.Printf("[threshold judgeThreshold] 测点[%d-%s] fieldName: %s, ThemeVal (float64): %f\n", station.Info.Id, station.Info.Name, fieldName, val)
}
} else {
log.Printf("[threshold judgeThreshold] 测点[%d-%s] fieldName: %s, ThemeVal cannot be converted to float64\n", station.Info.Id, station.Info.Name, fieldName)
}
}
// []ThresholdAlarmDetail -> AlarmMsg
alarmMsg := t.getAndCacheAlarmMsg(station, ls)
return alarmMsg
}
// GetAndCacheAlarmMsg 获取和缓存 AlarmMsg
func (t *ThresholdHandler) getAndCacheAlarmMsg(station *common_models.Station, ls []common_models.ThresholdAlarmDetail) *common_models.AlarmMsg {
if ls != nil && len(ls) > 0 {
// 超阈值告警
alarm := common_models.NewAlarmOverThreshold(findMinLevel(ls), stringifyThresholds(ls))
if alarm == nil {
//log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s ERROR: 未找到对应告警等级的告警码。\n",
// station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime)
1 month ago
return nil
}
// 给测点数据设置告警等级
station.Data.AlarmLevel = alarm.Level
log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s\n",
station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime)
// 将测点信息 -> 告警信息
alarmMsg := alarm.ToAlarmMsg(station.Info, station.Data.CollectTime)
// Redis 中添加告警记录, key = alarm:2:100
redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(station.Info.Id))
t.configHelper.SAddAlarm(redisKey, alarm.AlarmType)
return &alarmMsg
} else {
// Redis 中删除告警记录, key = alarm:2:100
redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(station.Info.Id))
affects := t.configHelper.SRemAlarm(redisKey, common_models.Alarm_Type_Over_Threshold)
if affects > 0 {
// 如果Redis中存在告警,则要发送恢复告警
alarm := common_models.NewAlarmRecoverThreshold()
alarmMsg := alarm.ToAlarmMsg(station.Info, station.Data.CollectTime)
return &alarmMsg
}
}
// 既没有产生新告警,也没有需要恢复的告警
return nil
}
func findMinLevel(ls []common_models.ThresholdAlarmDetail) int {
minLevel := ls[0].Level
for _, detail := range ls {
if detail.Level < minLevel {
minLevel = detail.Level
}
}
return minLevel
}
func stringifyThresholds(ls []common_models.ThresholdAlarmDetail) string {
str := ""
for _, detail := range ls {
str += fmt.Sprintf("%d:%s;", detail.Level, detail.Content)
}
return str
}