package et_analyze import ( "encoding/json" "errors" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models/constant/redisKey" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/kafkaHelper" "log" "strconv" ) type AggThresholdHandler struct { alarmTopic string configHelper *common_utils.ConfigHelper kafkaAsyncProducer *kafkaHelper.KafkaAsyncProducer } func NewAggThresholdHandler() *AggThresholdHandler { configYaml := configLoad.LoadConfig() redisAdd := configYaml.GetString("redis.address") kafkaBrokers := configYaml.GetStringSlice("kafka.brokers") alarmTopic := configYaml.GetString("kafka.topics.alarm_anxinyun") return &AggThresholdHandler{ alarmTopic: alarmTopic, configHelper: common_utils.NewConfigHelper(redisAdd), kafkaAsyncProducer: kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers), } } // ProcessData 进行 aggData 阈值分析,向 kafka 发送 alarm 消息 func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) error { if aggData == nil || aggData.Changed == nil || len(aggData.Changed) == 0 { errmsg := fmt.Sprintf("aggData非法数据:[%v]", aggData) //log.Println(errmsg) return errors.New(errmsg) } // 无告警 alarmMsg := t.judgeThreshold(aggData) if alarmMsg == nil { return nil } // 以下为有告警产生后的处理 stationInfo, err := t.configHelper.GetStationInfo(aggData.SensorId) if err != nil { key := fmt.Sprintf("%s:%d", redisKey.Station, aggData.SensorId) errmsg := fmt.Sprintf("[%s] StationId not found in redis ", key) log.Println(errmsg) return errors.New(errmsg) } alarmMsg.Sponsor = common_models.Alarm_Sponsor_Recv jsonData, err := json.Marshal(alarmMsg) if err != nil { errmsg := fmt.Sprintf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", stationInfo.Id, stationInfo.Name, t.alarmTopic, alarmMsg, err) log.Println(errmsg) return err } // 发布 kafka 消息 fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData) t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData) return nil } func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *common_models.AlarmMsg { var aggTypeMap = map[int]string{ 2001: "日聚集", 2002: "周聚集", 2003: "月聚集", 2004: "年聚集", 2005: "时聚集", 2006: "10分钟聚集", } aggTypeStr := aggTypeMap[aggData.AggTypeId] if aggTypeStr == "" { aggTypeStr = "其他聚集" } // 检查测点是否有聚集阈值配置信息 aggThreshold, err := t.configHelper.GetAggThreshold(aggData.StructId, aggData.FactorId) var filteredThres = make([]common_models.AggThresholdItem, 0) if err != nil || aggThreshold.Items == nil || len(aggThreshold.Items) == 0 { log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId) return nil } else { for _, item := range aggThreshold.Items { if item.AggCategory != nil && *item.AggCategory == aggData.AggTypeId { filteredThres = append(filteredThres, item) } } if len(filteredThres) == 0 { log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId) return nil } } factor, err := t.configHelper.GetFactorInfo(aggData.FactorId) if err != nil || factor.Items == nil || len(factor.Items) == 0 { log.Printf("获取factor异常:[structId:%d factorId:%d]Error: %v \n", aggData.StructId, aggData.FactorId, err) return nil } var ls []common_models.ThresholdAlarmDetail // Changed map[string]float64 // 变化量 for fieldName, val := range aggData.Changed { protoItem := factor.GetProtoItem(fieldName) if protoItem == nil { log.Printf("测点[%d-%d-%d][%s]aggData[%v],但是Redis中没有对应的[protoItem:%s]。\n", aggData.StructId, aggData.FactorId, aggData.SensorId, aggData.Date, aggData, fieldName) } else { protoItemThs := aggThreshold.GetThresholdsByItem(filteredThres, protoItem.Id) overThresholdItem := aggThreshold.FindThresholdInRange(protoItemThs, val) if overThresholdItem != nil { // content 格式如:应变的10分钟聚集变化率:-0.60με,超1级阈值[-1~-0.5] content := fmt.Sprintf("%s的%s变化率:%f%s,超%d级阈值[%s]", protoItem.Name, aggTypeStr, val, protoItem.UnitName, overThresholdItem.Level, overThresholdItem.RangeText()) ls = append(ls, common_models.ThresholdAlarmDetail{Level: overThresholdItem.Level, Content: content}) } //log.Printf("[aggThreshold judgeThreshold] 测点[sensorId: %d] fieldName: %s, ChangedVal (float64): %f\n", aggData.SensorId, fieldName, val) } } alarmMsg := t.getAndCacheAlarmMsg(aggData, ls) return alarmMsg } func (t *AggThresholdHandler) getAndCacheAlarmMsg(aggData *common_models.AggData, ls []common_models.ThresholdAlarmDetail) *common_models.AlarmMsg { stationInfo, err := t.configHelper.GetStationInfo(aggData.SensorId) if err != nil { log.Printf("[%v]测点不存在\n", aggData) return nil } if ls != nil && len(ls) > 0 { // 超阈值告警 alarm := common_models.NewOverChangingRateThreshold(findMinLevel(ls), stringifyThresholds(ls)) if alarm == nil { log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content) return nil } log.Printf("%s[aggTypeId:%d][%s] over-agg-threshold: [%s]\n", aggData.R(), aggData.AggTypeId, aggData.T(), alarm.Content) // 将测点信息 -> 告警信息 alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date) // Redis 中添加告警记录, key = alarm:2:100 key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) t.configHelper.SAddAlarm(key, alarm.AlarmType) return &alarmMsg } else { // Redis 中删除告警记录,key = alarm:2:100 key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) affects := t.configHelper.SRemAlarm(key, common_models.Alarm_Type_Over_ChangeRate_Threshold) // 如果Redis中存在告警,则要发送恢复告警 if affects > 0 { alarm := common_models.NewRecoverChangingRateThreshold() alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date) return &alarmMsg } } return nil }