Browse Source

聚集数据阈值分析

dev
yfh 1 month ago
parent
commit
92818de8bf
  1. 42
      dataSource/kafka/aggData.go
  2. 14
      dataSource/kafka/aggData_test.go
  3. 71
      et_analyze/aggThreshold.go

42
dataSource/kafka/aggData.go

@ -4,33 +4,55 @@ import (
"dataSource"
"encoding/json"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"strings"
"time"
)
type AggDataHandler struct{}
type AggDataHandler struct {
configHelper *common_utils.ConfigHelper
}
func NewAggDataHandler() *AggDataHandler {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
return &AggDataHandler{
configHelper: common_utils.NewConfigHelper(redisAddr),
}
}
func (h AggDataHandler) HandleMessage(message string) bool {
// aggDataMsg: {"date":"2024-04-19T01:10:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
// aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
// aggDataMsg 中的时间为UTC格式 2024-04-19T01:10:59.999+0000,
// 在进行 json.Unmarshal() 时报错
// 解决方案:先将 +0000 -> +00:00,然后再将 UTC 时间转换为 +08:00 时区时间
// 解决方案:先将 +0000 -> Z,然后再将 UTC 时间转换为中国时区时间("Asia/Shanghai")
// 将 "+0000" 替换为 "+00:00"
replacedStr := strings.Replace(message, "+0000", "+00:00", 1)
// 将 2024-04-19T01:10:59.999+0000 -> 2024-04-19T01:10:59.999Z
utcTimeStr := strings.Replace(message, "+0000", "Z", 1)
aggData := common_models.AggData{}
err := json.Unmarshal([]byte(replacedStr), &aggData)
err := json.Unmarshal([]byte(utcTimeStr), &aggData)
if err != nil {
log.Printf("json parse error: %v", err)
return false
}
// 转换为中国时区时间("Asia/Shanghai")
loc, _ := time.LoadLocation("Asia/Shanghai")
chinaTime := aggData.Date.In(loc)
aggData.Date = chinaTime
//log.Printf("message:%v\n, cvt: %+v", message, aggData)
if aggData.ThingId == "" {
structure, err := h.configHelper.GetStructure(aggData.StructId)
if err != nil {
log.Printf("redis 中无 key = structure:%d 的缓存数据.", aggData.StructId)
return false
}
aggData.ThingId = structure.ThingId
}
// 将 UTC 时间加上8小时得到中国的本地时间
aggData.Date = aggData.Date.Add(8 * time.Hour)
log.Printf("handler 处理[%d]消息", aggData.SensorId)
log.Printf("handler 处理sensorId[%d]消息", aggData.SensorId)
dataSource.GetChannels().AggDataChan <- aggData
return true
}

14
dataSource/kafka/aggData_test.go

@ -0,0 +1,14 @@
package kafka
import (
"testing"
)
func TestAggDataHandler_HandleMessage(t *testing.T) {
h := AggDataHandler{}
aggDataMsg := `
{"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
`
h.HandleMessage(aggDataMsg)
}

71
et_analyze/aggThreshold.go

@ -2,8 +2,10 @@ package et_analyze
import (
"encoding/json"
"errors"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_models/constant/redisKey"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"gitea.anxinyun.cn/container/common_utils/kafkaHelper"
@ -31,25 +33,41 @@ func NewAggThresholdHandler() *AggThresholdHandler {
}
// ProcessData 进行 aggData 阈值分析,向 kafka 发送 alarm 消息
func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) {
func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) error {
if aggData == nil || aggData.Changed == nil || len(aggData.Changed) == 0 {
log.Printf("aggData 非法数据:[%v]\n", aggData)
return
errmsg := fmt.Sprintf("aggData非法数据:[%v]", aggData)
//log.Println(errmsg)
return errors.New(errmsg)
}
// 无告警
alarmMsg := t.judgeThreshold(aggData)
if alarmMsg != nil {
stationInfo, _ := t.configHelper.GetStationInfo(aggData.SensorId)
alarmMsg.Sponsor = common_models.Alarm_Sponsor_Recv
jsonData, err := json.Marshal(alarmMsg)
if err != nil {
fmt.Printf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", stationInfo.Id, stationInfo.Name, t.alarmTopic, alarmMsg, err)
}
if alarmMsg == nil {
return nil
}
// 以下为有告警产生后的处理
stationInfo, err := t.configHelper.GetStationInfo(aggData.SensorId)
if err != nil {
key := fmt.Sprintf("%s:%d", redisKey.Station, aggData.SensorId)
errmsg := fmt.Sprintf("[%s] StationId not found in redis ", key)
log.Println(errmsg)
return errors.New(errmsg)
}
// 发布 kafka 消息
fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData)
t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData)
alarmMsg.Sponsor = common_models.Alarm_Sponsor_Recv
jsonData, err := json.Marshal(alarmMsg)
if err != nil {
errmsg := fmt.Sprintf("测点[%d-%s][kafka-topic:%s][%v]Error marshalling JSON:%s \n", stationInfo.Id, stationInfo.Name, t.alarmTopic, alarmMsg, err)
log.Println(errmsg)
return err
}
// 发布 kafka 消息
fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData)
t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData)
return nil
}
func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *common_models.AlarmMsg {
@ -68,9 +86,20 @@ func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *co
// 检查测点是否有聚集阈值配置信息
aggThreshold, err := t.configHelper.GetAggThreshold(aggData.StructId, aggData.FactorId)
var filteredThres = make([]common_models.AggThresholdItem, 0)
if err != nil || aggThreshold.Items == nil || len(aggThreshold.Items) == 0 {
log.Printf("未配置aggThreshold,无须进行阈值判断:[structId:%d factorId:%d] \n", aggData.StructId, aggData.FactorId)
log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId)
return nil
} else {
for _, item := range aggThreshold.Items {
if item.AggCategory != nil && *item.AggCategory == aggData.AggTypeId {
filteredThres = append(filteredThres, item)
}
}
if len(filteredThres) == 0 {
log.Printf("未配置变化速率阈值。%s aggTypeId[%d]\n", aggData.R(), aggData.AggTypeId)
return nil
}
}
factor, err := t.configHelper.GetFactorInfo(aggData.FactorId)
@ -88,7 +117,7 @@ func (t *AggThresholdHandler) judgeThreshold(aggData *common_models.AggData) *co
log.Printf("测点[%d-%d-%d][%s]aggData[%v],但是Redis中没有对应的[protoItem:%s]。\n",
aggData.StructId, aggData.FactorId, aggData.SensorId, aggData.Date, aggData, fieldName)
} else {
protoItemThs := aggThreshold.GetThresholdsByItem(aggThreshold.Items, protoItem.Id)
protoItemThs := aggThreshold.GetThresholdsByItem(filteredThres, protoItem.Id)
overThresholdItem := aggThreshold.FindThresholdInRange(protoItemThs, val)
if overThresholdItem != nil {
// content 格式如:应变的10分钟聚集变化率:-0.60με,超1级阈值[-1~-0.5]
@ -114,24 +143,24 @@ func (t *AggThresholdHandler) getAndCacheAlarmMsg(aggData *common_models.AggData
// 超阈值告警
alarm := common_models.NewOverChangingRateThreshold(findMinLevel(ls), stringifyThresholds(ls))
if alarm == nil {
//log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content)
log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]ERROR: 未找到对应告警等级的告警码。", aggData, alarm.Level, alarm.Content)
return nil
}
log.Printf("[%v] over-agg-threshold:[Level:%d] content:[%s]", aggData, alarm.Level, alarm.Content)
log.Printf("%s[aggTypeId:%d][%s] over-agg-threshold: [%s]\n", aggData.R(), aggData.AggTypeId, aggData.T(), alarm.Content)
// 将测点信息 -> 告警信息
alarmMsg := alarm.ToAlarmMsg(stationInfo, aggData.Date)
// Redis 中添加告警记录, key = alarm:2:100
redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId))
t.configHelper.SAddAlarm(redisKey, alarm.AlarmType)
key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId))
t.configHelper.SAddAlarm(key, alarm.AlarmType)
return &alarmMsg
} else {
// Redis 中删除告警记录,key = alarm:2:100
redisKey := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId))
affects := t.configHelper.SRemAlarm(redisKey, common_models.Alarm_Type_Over_ChangeRate_Threshold)
key := common_models.AlarmRedisKey(common_models.ALARM_SOURCE_STATION, strconv.Itoa(aggData.SensorId))
affects := t.configHelper.SRemAlarm(key, common_models.Alarm_Type_Over_ChangeRate_Threshold)
// 如果Redis中存在告警,则要发送恢复告警
if affects > 0 {

Loading…
Cancel
Save