From 92818de8bff236fd3b52723ad29b74542d6114ac Mon Sep 17 00:00:00 2001 From: yfh Date: Fri, 27 Sep 2024 17:08:05 +0800 Subject: [PATCH] =?UTF-8?q?=E8=81=9A=E9=9B=86=E6=95=B0=E6=8D=AE=E9=98=88?= =?UTF-8?q?=E5=80=BC=E5=88=86=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dataSource/kafka/aggData.go | 42 ++++++++++++++----- dataSource/kafka/aggData_test.go | 14 +++++++ et_analyze/aggThreshold.go | 71 ++++++++++++++++++++++---------- 3 files changed, 96 insertions(+), 31 deletions(-) create mode 100644 dataSource/kafka/aggData_test.go diff --git a/dataSource/kafka/aggData.go b/dataSource/kafka/aggData.go index 8e7f0cf..7497802 100644 --- a/dataSource/kafka/aggData.go +++ b/dataSource/kafka/aggData.go @@ -4,33 +4,55 @@ import ( "dataSource" "encoding/json" "gitea.anxinyun.cn/container/common_models" + "gitea.anxinyun.cn/container/common_utils" + "gitea.anxinyun.cn/container/common_utils/configLoad" "log" "strings" "time" ) -type AggDataHandler struct{} +type AggDataHandler struct { + configHelper *common_utils.ConfigHelper +} + +func NewAggDataHandler() *AggDataHandler { + redisAddr := configLoad.LoadConfig().GetString("redis.address") + return &AggDataHandler{ + configHelper: common_utils.NewConfigHelper(redisAddr), + } + +} func (h AggDataHandler) HandleMessage(message string) bool { - // aggDataMsg: {"date":"2024-04-19T01:10:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}} + // aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}} // aggDataMsg 中的时间为UTC格式 2024-04-19T01:10:59.999+0000, // 在进行 json.Unmarshal() 时报错 - // 解决方案:先将 +0000 -> +00:00,然后再将 UTC 时间转换为 +08:00 时区时间 + // 解决方案:先将 +0000 -> Z,然后再将 UTC 时间转换为中国时区时间("Asia/Shanghai") - // 将 "+0000" 替换为 "+00:00" - replacedStr := strings.Replace(message, "+0000", "+00:00", 1) + // 将 2024-04-19T01:10:59.999+0000 -> 2024-04-19T01:10:59.999Z + utcTimeStr := strings.Replace(message, "+0000", "Z", 1) aggData := common_models.AggData{} - err := json.Unmarshal([]byte(replacedStr), &aggData) + err := json.Unmarshal([]byte(utcTimeStr), &aggData) if err != nil { log.Printf("json parse error: %v", err) return false } + // 转换为中国时区时间("Asia/Shanghai") + loc, _ := time.LoadLocation("Asia/Shanghai") + chinaTime := aggData.Date.In(loc) + aggData.Date = chinaTime + //log.Printf("message:%v\n, cvt: %+v", message, aggData) + if aggData.ThingId == "" { + structure, err := h.configHelper.GetStructure(aggData.StructId) + if err != nil { + log.Printf("redis 中无 key = structure:%d 的缓存数据.", aggData.StructId) + return false + } + aggData.ThingId = structure.ThingId + } - // 将 UTC 时间加上8小时得到中国的本地时间 - aggData.Date = aggData.Date.Add(8 * time.Hour) - - log.Printf("handler 处理[%d]消息", aggData.SensorId) + log.Printf("handler 处理sensorId[%d]消息", aggData.SensorId) dataSource.GetChannels().AggDataChan <- aggData return true } diff --git a/dataSource/kafka/aggData_test.go b/dataSource/kafka/aggData_test.go new file mode 100644 index 0000000..66f91c3 --- /dev/null +++ b/dataSource/kafka/aggData_test.go @@ -0,0 +1,14 @@ +package kafka + +import ( + "testing" +) + +func TestAggDataHandler_HandleMessage(t *testing.T) { + h := AggDataHandler{} + + aggDataMsg := ` +{"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}} +` + h.HandleMessage(aggDataMsg) +} diff --git a/et_analyze/aggThreshold.go b/et_analyze/aggThreshold.go index fb01bbe..a69e4da 100644 --- a/et_analyze/aggThreshold.go +++ b/et_analyze/aggThreshold.go @@ -2,8 +2,10 @@ package et_analyze import ( "encoding/json" + "errors" "fmt" "gitea.anxinyun.cn/container/common_models" + "gitea.anxinyun.cn/container/common_models/constant/redisKey" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "gitea.anxinyun.cn/container/common_utils/kafkaHelper" @@ -31,25 +33,41 @@ func NewAggThresholdHandler() *AggThresholdHandler { } // ProcessData 进行 aggData 阈值分析,向 kafka 发送 alarm 消息 -func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) { +func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) error { if aggData == nil || aggData.Changed == nil || len(aggData.Changed) == 0 { - log.Printf("aggData 非法数据:[%v]\n", aggData) - return + errmsg := fmt.Sprintf("aggData非法数据:[%v]", aggData) + //log.Println(errmsg) + return errors.New(errmsg) } + // 无告警 alarmMsg := t.judgeThreshold(aggData) - if alarmMsg != nil { - stationInfo, _ := t.configHelper.GetStationInfo(aggData.SensorId) - alarmMsg.Sponsor = common_models.Alarm_Sponsor_Recv - jsonData, err := json.Marshal(alarmMsg) - if err != nil { - fmt.Printf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", stationInfo.Id, stationInfo.Name, t.alarmTopic, alarmMsg, err) - } + if alarmMsg == nil { + return nil + } + + // 以下为有告警产生后的处理 + stationInfo, err := t.configHelper.GetStationInfo(aggData.SensorId) + if err != nil { + key := fmt.Sprintf("%s:%d", redisKey.Station, aggData.SensorId) + errmsg := fmt.Sprintf("[%s] StationId not found in redis ", key) + log.Println(errmsg) + return errors.New(errmsg) + } - // 发布 kafka 消息 - fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData) - t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData) + alarmMsg.Sponsor = common_models.Alarm_Sponsor_Recv + jsonData, err := json.Marshal(alarmMsg) + if err != nil { + errmsg := fmt.Sprintf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", stationInfo.Id, stationInfo.Name, t.alarmTopic, alarmMsg, err) + log.Println(errmsg) + return err } + + // 发布 kafka 消息 + fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData) + t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData) + + return nil } func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *common_models.AlarmMsg { @@ -68,9 +86,20 @@ func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *co // 检查测点是否有聚集阈值配置信息 aggThreshold, err := t.configHelper.GetAggThreshold(aggData.StructId, aggData.FactorId) + var filteredThres = make([]common_models.AggThresholdItem, 0) if err != nil || aggThreshold.Items == nil || len(aggThreshold.Items) == 0 { - log.Printf("未配置aggThreshold,无须进行阈值判断:[structId:%d factorId:%d] \n", aggData.StructId, aggData.FactorId) + log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId) return nil + } else { + for _, item := range aggThreshold.Items { + if item.AggCategory != nil && *item.AggCategory == aggData.AggTypeId { + filteredThres = append(filteredThres, item) + } + } + if len(filteredThres) == 0 { + log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId) + return nil + } } factor, err := t.configHelper.GetFactorInfo(aggData.FactorId) @@ -88,7 +117,7 @@ func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *co log.Printf("测点[%d-%d-%d][%s]aggData[%v],但是Redis中没有对应的[protoItem:%s]。\n", aggData.StructId, aggData.FactorId, aggData.SensorId, aggData.Date, aggData, fieldName) } else { - protoItemThs := aggThreshold.GetThresholdsByItem(aggThreshold.Items, protoItem.Id) + protoItemThs := aggThreshold.GetThresholdsByItem(filteredThres, protoItem.Id) overThresholdItem := aggThreshold.FindThresholdInRange(protoItemThs, val) if overThresholdItem != nil { // content 格式如:应变的10分钟聚集变化率:-0.60με,超1级阈值[-1~-0.5] @@ -114,24 +143,24 @@ func (t *AggThresholdHandler) getAndCacheAlarmMsg(aggData *common_models.AggData // 超阈值告警 alarm := common_models.NewOverChangingRateThreshold(findMinLevel(ls), stringifyThresholds(ls)) if alarm == nil { - //log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content) + log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content) return nil } - log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]", aggData, alarm.Level, alarm.Content) + log.Printf("%s[aggTypeId:%d][%s] over-agg-threshold: [%s]\n", aggData.R(), aggData.AggTypeId, aggData.T(), alarm.Content) // 将测点信息 -> 告警信息 alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date) // Redis 中添加告警记录, key = alarm:2:100 - redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) - t.configHelper.SAddAlarm(redisKey, alarm.AlarmType) + key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) + t.configHelper.SAddAlarm(key, alarm.AlarmType) return &alarmMsg } else { // Redis 中删除告警记录,key = alarm:2:100 - redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) - affects := t.configHelper.SRemAlarm(redisKey, common_models.Alarm_Type_Over_ChangeRate_Threshold) + key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId)) + affects := t.configHelper.SRemAlarm(key, common_models.Alarm_Type_Over_ChangeRate_Threshold) // 如果Redis中存在告警,则要发送恢复告警 if affects > 0 {