package et_analyze import ( "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/kafkaHelper" "log" "node/stages" "strconv" ) type ThresholdHandler struct { alarmTopic string // 本模块业务字段 stage *stages.Stage // 必须 configHelper *common_utils.ConfigHelper // 可选 kafkaAsyncProducer *kafkaHelper.KafkaAsyncProducer // 可选 kafkaAlarmTopic string // 可选 } func NewThresholdHandler() *ThresholdHandler { configYaml := configLoad.LoadConfig() redisAdd := configYaml.GetString("redis.address") kafkaBrokers := configYaml.GetStringSlice("kafka.brokers") alarmTopic := configYaml.GetString("kafka.topics.alarm_anxinyun") model := &ThresholdHandler{ alarmTopic: alarmTopic, stage: stages.NewStage("阈值判断"), configHelper: common_utils.NewConfigHelper(redisAdd), kafkaAsyncProducer: kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers), kafkaAlarmTopic: alarmTopic, } model.stage.AddProcess(model.processData) return model } func (t *ThresholdHandler) GetStage() stages.Stage { return *t.stage } // 必须 func (t *ThresholdHandler) processData(resultData *common_models.ProcessData) *common_models.ProcessData { if resultData == nil || resultData.Stations == nil || len(resultData.Stations) == 0 { return resultData } for i := range resultData.Stations { station := &resultData.Stations[i] alarmMsg := t.judgeThreshold(station) //fmt.Printf("测点[%d-%s]kafka[topic:%s][content:%s]\n", station.Info.Id, station.Info.Name, t.alarmTopic, alarmMsg.Content) if alarmMsg != nil { // 设置发起者 alarmMsg.Sponsor = common_models.Alarm_Sponsor_Threshold jsonData, err := json.Marshal(alarmMsg) if err != nil { log.Printf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", station.Info.Id, station.Info.Name, t.alarmTopic, alarmMsg, err) continue } // 发布 kafka 消息 log.Printf("测点[%d-%s][kafka-topic:%s]%s\n", station.Info.Id, station.Info.Name, t.alarmTopic, jsonData) t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData) } } return resultData } func (t *ThresholdHandler) judgeThreshold(station *common_models.Station) *common_models.AlarmMsg { // 检查测点是否有阈值配置信息 if station.Threshold == nil || station.Threshold.Items == nil { log.Printf("测点[%d-%s]未配置阈值,无须进行阈值判断\n", station.Info.Id, station.Info.Name) return nil } // 获取测点数据的阈值配置 filteredItems := station.Threshold.GetThresholdsByTime(station.Data.CollectTime) if len(filteredItems) == 0 { log.Printf("测点[%d-%s][%s]无对应阈值项,无须进行阈值判断\n", station.Info.Id, station.Info.Name, station.Data.CollectTime) return nil } var ls []common_models.ThresholdAlarmDetail for fieldName, themeVal := range station.Data.ThemeData { if val, ok := themeVal.(float64); ok { protoItem := station.Info.Proto.GetProtoItem(fieldName) if protoItem == nil { log.Printf("测点[%d-%s][%s][%s]themeData[%v],但是Redis中没有对应的[protoItem:%s]。\n", station.Info.Id, station.Info.Name, station.Data.CollectTime, fieldName, station.Data.ThemeData, fieldName) } else { protoItemThs := station.Threshold.GetThresholdsByItem(filteredItems, protoItem.Id) overThresholdItem := station.Threshold.FindThresholdInRange(protoItemThs, val) if overThresholdItem != nil { // content 格式如:1:湿度采集值:13.50%RH,超1级阈值[6~16] content := fmt.Sprintf("%s采集值:%.2f%s,超%d级阈值[%s]", protoItem.Name, val, protoItem.UnitName, overThresholdItem.Level, overThresholdItem.RangeText()) ls = append(ls, common_models.ThresholdAlarmDetail{Level: overThresholdItem.Level, Content: content}) } //fmt.Printf("[threshold judgeThreshold] 测点[%d-%s] fieldName: %s, ThemeVal (float64): %f\n", station.Info.Id, station.Info.Name, fieldName, val) } } else { log.Printf("[threshold judgeThreshold] 测点[%d-%s] fieldName: %s, ThemeVal cannot be converted to float64\n", station.Info.Id, station.Info.Name, fieldName) } } // []ThresholdAlarmDetail -> AlarmMsg alarmMsg := t.getAndCacheAlarmMsg(station, ls) return alarmMsg } // GetAndCacheAlarmMsg 获取和缓存 AlarmMsg func (t *ThresholdHandler) getAndCacheAlarmMsg(station *common_models.Station, ls []common_models.ThresholdAlarmDetail) *common_models.AlarmMsg { if ls != nil && len(ls) > 0 { // 超阈值告警 alarm := common_models.NewAlarmOverThreshold(findMinLevel(ls), stringifyThresholds(ls)) if alarm == nil { //log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s ERROR: 未找到对应告警等级的告警码。\n", // station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime) return nil } // 给测点数据设置告警等级 station.Data.AlarmLevel = alarm.Level log.Printf("over-threshold [%d-%s] level:%d code:%s content:%s time:%s\n", station.Info.Id, station.Info.Name, alarm.Level, alarm.Code, alarm.Content, station.Data.CollectTime) // 将测点信息 -> 告警信息 alarmMsg := alarm.ToAlarmMsg(station.Info, station.Data.CollectTime) // Redis 中添加告警记录, key = alarm:2:100 redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(station.Info.Id)) t.configHelper.SAddAlarm(redisKey, alarm.AlarmType) return &alarmMsg } else { // Redis 中删除告警记录, key = alarm:2:100 redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(station.Info.Id)) affects := t.configHelper.SRemAlarm(redisKey, common_models.Alarm_Type_Over_Threshold) if affects > 0 { // 如果Redis中存在告警,则要发送恢复告警 alarm := common_models.NewAlarmRecoverThreshold() alarmMsg := alarm.ToAlarmMsg(station.Info, station.Data.CollectTime) return &alarmMsg } } // 既没有产生新告警,也没有需要恢复的告警 return nil } func findMinLevel(ls []common_models.ThresholdAlarmDetail) int { minLevel := ls[0].Level for _, detail := range ls { if detail.Level < minLevel { minLevel = detail.Level } } return minLevel } func stringifyThresholds(ls []common_models.ThresholdAlarmDetail) string { str := "" for _, detail := range ls { str += fmt.Sprintf("%d:%s;", detail.Level, detail.Content) } return str }