Browse Source

环节处理改为批处理

dev
yfh 1 month ago
parent
commit
3a1ef52537
  1. 2
      et_analyze/aggThreshold.go
  2. 19
      et_analyze/threshold.go

2
et_analyze/aggThreshold.go

@ -64,7 +64,7 @@ func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) error
}
// 发布 kafka 消息
fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData)
log.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData)
t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData)
return nil

19
et_analyze/threshold.go

@ -10,6 +10,7 @@ import (
"log"
"node/stages"
"strconv"
"sync"
)
type ThresholdHandler struct {
@ -34,7 +35,7 @@ func NewThresholdHandler() *ThresholdHandler {
kafkaAlarmTopic: alarmTopic,
}
model.stage.AddProcess(model.processData)
model.stage.AddProcess(model.processDatas)
return model
}
@ -43,6 +44,20 @@ func (t *ThresholdHandler) GetStage() stages.Stage {
}
// 必须
func (t *ThresholdHandler) processDatas(data []*common_models.ProcessData) []*common_models.ProcessData {
go func() {
var wg sync.WaitGroup // 初始化 WaitGroup
for _, processData := range data {
wg.Add(1)
go func(pd *common_models.ProcessData) {
defer wg.Done()
t.processData(pd)
}(processData)
}
wg.Wait()
}()
return data
}
func (t *ThresholdHandler) processData(resultData *common_models.ProcessData) *common_models.ProcessData {
if resultData == nil || resultData.Stations == nil || len(resultData.Stations) == 0 {
return resultData
@ -74,7 +89,7 @@ func (t *ThresholdHandler) processData(resultData *common_models.ProcessData) *c
func (t *ThresholdHandler) judgeThreshold(station *common_models.Station) *common_models.AlarmMsg {
// 检查测点是否有阈值配置信息
if station.Threshold == nil || station.Threshold.Items == nil {
log.Printf("测点[%d-%s]未配置阈值,无须进行阈值判断\n", station.Info.Id, station.Info.Name)
log.Printf("测点[%d-%s]未配置阈值,跳过\n", station.Info.Id, station.Info.Name)
return nil
}

Loading…
Cancel
Save