From da861825af029e4ff2c7cca1bd614a603b3f0624 Mon Sep 17 00:00:00 2001 From: yfh Date: Mon, 24 Feb 2025 22:54:25 +0800 Subject: [PATCH] =?UTF-8?q?=E7=8E=AF=E8=8A=82=E5=A4=84=E7=90=86=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E6=89=B9=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_calc/dataCalc.go | 29 +++++++++++++++++++++-------- et_calc/group/groupCalc.go | 36 ++++++++++++++++++++++++++---------- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/et_calc/dataCalc.go b/et_calc/dataCalc.go index 19214a9..5b71a61 100644 --- a/et_calc/dataCalc.go +++ b/et_calc/dataCalc.go @@ -17,6 +17,7 @@ import ( "node/stages" "sort" "strings" + "sync" ) type CalcHandler struct { @@ -26,25 +27,37 @@ type CalcHandler struct { stage *stages.Stage } -func NewCalcHandler() *CalcHandler { - redisAddr := configLoad.LoadConfig().GetString("redis.address") - esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") - configHp := common_utils.NewConfigHelper(redisAddr) +func NewCalcHandler(configHelper *common_utils.ConfigHelper, cacheServer *cacheSer.CacheServer, esESHelper *dbHelper.ESHelper) *CalcHandler { the := &CalcHandler{ - cacheServer: cacheSer.NewCacheServer(configHp), - unitHelper: common_utils.NewUnitHelper(), - esESHelper: dbHelper.NewESHelper(esAddresses, "", ""), + cacheServer: cacheServer, + esESHelper: esESHelper, + unitHelper: common_utils.NewUnitHelper(configHelper), stage: stages.NewStage("单测点计算"), } - the.stage.AddProcess(the.calcFormula) + the.stage.AddProcess(the.calcFormulaForStations) return the } + func (the *CalcHandler) GetStage() stages.Stage { return *the.stage } +func (the *CalcHandler) calcFormulaForStations(data []*common_models.ProcessData) []*common_models.ProcessData { + var wg sync.WaitGroup // 初始化 WaitGroup + for _, processData := range data { + wg.Add(1) + go func(pd *common_models.ProcessData) { + defer wg.Done() + the.calcFormula(pd) + }(processData) + } + wg.Wait() + return data +} + // 单设备测点 func (the *CalcHandler) calcFormula(p *common_models.ProcessData) *common_models.ProcessData { + for i := range p.Stations { for _, device := range p.Stations[i].Info.Devices { //计算结果 diff --git a/et_calc/group/groupCalc.go b/et_calc/group/groupCalc.go index d6513f1..3ad4cd0 100644 --- a/et_calc/group/groupCalc.go +++ b/et_calc/group/groupCalc.go @@ -62,16 +62,16 @@ type GroupCalc struct { esHandler *et_sink.SinkHandler } -func NewGroupCalc() *GroupCalc { +func NewGroupCalc(configHelper *common_utils.ConfigHelper) *GroupCalc { calcTaskManager := &GroupCalc{ stage: stages.NewStage("测点分组计算"), - configHelper: GetConfigHelper(), + configHelper: configHelper, signCalc: make(chan bool), esHandler: et_sink.NewSinkGroupHandler(), } // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES) - calcTaskManager.stage.AddProcess(calcTaskManager.processData) + calcTaskManager.stage.AddProcess(calcTaskManager.calcGroups) return calcTaskManager } @@ -79,9 +79,25 @@ func (gc *GroupCalc) GetStage() stages.Stage { return *gc.stage } +func (gc *GroupCalc) calcGroups(data []*common_models.ProcessData) []*common_models.ProcessData { + var wg sync.WaitGroup + result := make([]*common_models.ProcessData, len(data)) + + for i, p := range data { + wg.Add(1) + go func(i int, p *common_models.ProcessData) { + defer wg.Done() + result[i] = gc.processData(p) + }(i, p) + } + + wg.Wait() + return result +} + // processData 的 stations 被改变了 func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { - log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations)) + log.Printf("********* 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations)) resultStations := make([]common_models.Station, 0) // 分组超时任务 resultStations = append(resultStations, gc.processTimeoutTasks()...) @@ -95,12 +111,12 @@ func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_mode calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) if err == nil { - log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet)) + log.Printf("******* 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet)) resultStations = append(resultStations, calcRet...) } else { println(err) } - log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations)) + log.Printf("******** 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations)) } } @@ -112,22 +128,22 @@ func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_mode //filterSensorIds := []int{18, 20, 21} //for _, number := range filterSensorIds { // if number == station.Info.Id { - // log.Printf("*************** 110行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) + // log.Printf("******* 122行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) // break // } //} //// #TEST END // 需要返回的测点 - //log.Printf("*************** 121行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) + //log.Printf("******* 128行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) result = append(result, station) } else { - log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) + log.Printf("****** 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) } } // 计算后的 result 可能为空 - log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result)) + log.Printf("****** 分组一次处理结束 len(inData.Stations)=%d *************", len(result)) inData.Stations = result return inData }