package et_analyze import ( "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/kafkaHelper" "log" "strconv" ) type AggThresholdHandler struct { alarmTopic string configHelper *common_utils.ConfigHelper kafkaAsyncProducer *kafkaHelper.KafkaAsyncProducer } func NewAggThresholdHandler() *AggThresholdHandler { configYaml := configLoad.LoadConfig() redisAdd := configYaml.GetString("redis.address") kafkaBrokers := configYaml.GetStringSlice("kafka.brokers") alarmTopic := configYaml.GetString("kafka.topics.alarm_anxinyun") return &AggThresholdHandler{ alarmTopic: alarmTopic, configHelper: common_utils.NewConfigHelper(redisAdd), kafkaAsyncProducer: kafkaHelper.NewKafkaAsyncProducer(kafkaBrokers), } } // ProcessData 进行 aggData 阈值分析,向 kafka 发送 alarm 消息 func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) { if aggData == nil || aggData.Changed == nil || len(aggData.Changed) == 0 { log.Printf("aggData 非法数据:[%v]\n", aggData) return } alarmMsg := t.judgeThreshold(aggData) if alarmMsg != nil { stationInfo, _ := t.configHelper.GetStationInfo(aggData.SensorId) alarmMsg.Sponsor = common_models.Alarm_Sponsor_Recv jsonData, err := json.Marshal(alarmMsg) if err != nil { fmt.Printf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", stationInfo.Id, stationInfo.Name, t.alarmTopic, alarmMsg, err) } // 发布 kafka 消息 fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData) t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData) } } func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *common_models.AlarmMsg { var aggTypeMap = map[int]string{ 2001: "日聚集", 2002: "周聚集", 2003: "月聚集", 2004: "年聚集", 2005: "时聚集", 2006: "10分钟聚集", } aggTypeStr := aggTypeMap[aggData.AggTypeId] if aggTypeStr == "" { aggTypeStr = "其他聚集" } // 检查测点是否有聚集阈值配置信息 aggThreshold, err := t.configHelper.GetAggThreshold(aggData.StructId, aggData.FactorId) if err != nil || aggThreshold.Items == nil || len(aggThreshold.Items) == 0 { log.Printf("未配置aggThreshold,无须进行阈值判断:[structId:%d factorId:%d] \n", aggData.StructId, aggData.FactorId) return nil } factor, err := t.configHelper.GetFactorInfo(aggData.FactorId) if err != nil || factor.Items == nil || len(factor.Items) == 0 { log.Printf("获取factor异常:[structId:%d factorId:%d]Error: %v \n", aggData.StructId, aggData.FactorId, err) return nil } var ls []common_models.ThresholdAlarmDetail // Changed map[string]float64 // 变化量 for fieldName, val := range aggData.Changed { protoItem := factor.GetProtoItem(fieldName) if protoItem == nil { log.Printf("测点[%d-%d-%d][%s]aggData[%v],但是Redis中没有对应的[protoItem:%s]。\n", aggData.StructId, aggData.FactorId, aggData.SensorId, aggData.Date, aggData, fieldName) } else { protoItemThs := aggThreshold.GetThresholdsByItem(aggThreshold.Items, protoItem.Id) overThresholdItem := aggThreshold.FindThresholdInRange(protoItemThs, val) if overThresholdItem != nil { // content 格式如:应变的10分钟聚集变化率:-0.60με,超1级阈值[-1~-0.5] content := fmt.Sprintf("%s的%s变化率:%.2f%s,超%d级阈值[%s]", protoItem.Name, aggTypeStr, val, protoItem.UnitName, overThresholdItem.Level, overThresholdItem.RangeText()) ls = append(ls, common_models.ThresholdAlarmDetail{Level: overThresholdItem.Level, Content: content}) } //log.Printf("[aggThreshold judgeThreshold] 测点[sensorId: %d] fieldName: %s, ChangedVal (float64): %f\n", aggData.SensorId, fieldName, val) } } alarmMsg := t.getAndCacheAlarmMsg(aggData, ls) return alarmMsg } func (t *AggThresholdHandler) getAndCacheAlarmMsg(aggData *common_models.AggData, ls []common_models.ThresholdAlarmDetail) *common_models.AlarmMsg { stationInfo, err := t.configHelper.GetStationInfo(aggData.SensorId) if err != nil { log.Printf("[%v]测点不存在\n", aggData) return nil } if ls != nil && len(ls) > 0 { // 超阈值告警 alarm := common_models.NewOverChangingRateThreshold(findMinLevel(ls), stringifyThresholds(ls)) if alarm == nil { //log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content) return nil } log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]", aggData, alarm.Level, alarm.Content) // 将测点信息 -> 告警信息 alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date) // Redis 中添加告警记录, key = alarm:2:100 redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) t.configHelper.SAddAlarm(redisKey, alarm.AlarmType) return &alarmMsg } else { // Redis 中删除告警记录,key = alarm:2:100 redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) affects := t.configHelper.SRemAlarm(redisKey, common_models.Alarm_Type_Over_ChangeRate_Threshold) // 如果Redis中存在告警,则要发送恢复告警 if affects > 0 { alarm := common_models.NewRecoverChangingRateThreshold() alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date) return &alarmMsg } } return nil }