From 3a1ef52537a9df868bb9587f873d88a5c5867117 Mon Sep 17 00:00:00 2001 From: yfh Date: Mon, 24 Feb 2025 22:56:18 +0800 Subject: [PATCH] =?UTF-8?q?=E7=8E=AF=E8=8A=82=E5=A4=84=E7=90=86=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E6=89=B9=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_analyze/aggThreshold.go | 2 +- et_analyze/threshold.go | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/et_analyze/aggThreshold.go b/et_analyze/aggThreshold.go index 43f7350..4fed32b 100644 --- a/et_analyze/aggThreshold.go +++ b/et_analyze/aggThreshold.go @@ -64,7 +64,7 @@ func (t *AggThresholdHandler) ProcessData(aggData *common_models.AggData) error } // 发布 kafka 消息 - fmt.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData) + log.Printf("测点[%d-%s][kafka-topic:%s]%s\n", stationInfo.Id, stationInfo.Name, t.alarmTopic, jsonData) t.kafkaAsyncProducer.Publish(t.alarmTopic, jsonData) return nil diff --git a/et_analyze/threshold.go b/et_analyze/threshold.go index f5fb421..6a969f7 100644 --- a/et_analyze/threshold.go +++ b/et_analyze/threshold.go @@ -10,6 +10,7 @@ import ( "log" "node/stages" "strconv" + "sync" ) type ThresholdHandler struct { @@ -34,7 +35,7 @@ func NewThresholdHandler() *ThresholdHandler { kafkaAlarmTopic: alarmTopic, } - model.stage.AddProcess(model.processData) + model.stage.AddProcess(model.processDatas) return model } @@ -43,6 +44,20 @@ func (t *ThresholdHandler) GetStage() stages.Stage { } // 必须 +func (t *ThresholdHandler) processDatas(data []*common_models.ProcessData) []*common_models.ProcessData { + go func() { + var wg sync.WaitGroup // 初始化 WaitGroup + for _, processData := range data { + wg.Add(1) + go func(pd *common_models.ProcessData) { + defer wg.Done() + t.processData(pd) + }(processData) + } + wg.Wait() + }() + return data +} func (t *ThresholdHandler) processData(resultData *common_models.ProcessData) *common_models.ProcessData { if resultData == nil || resultData.Stations == nil || len(resultData.Stations) == 0 { return resultData @@ -74,7 +89,7 @@ func (t *ThresholdHandler) processData(resultData *common_models.ProcessData) *c func (t *ThresholdHandler) judgeThreshold(station *common_models.Station) *common_models.AlarmMsg { // 检查测点是否有阈值配置信息 if station.Threshold == nil || station.Threshold.Items == nil { - log.Printf("测点[%d-%s]未配置阈值,无须进行阈值判断\n", station.Info.Id, station.Info.Name) + log.Printf("测点[%d-%s]未配置阈值,跳过\n", station.Info.Id, station.Info.Name) return nil }