From c6f66e56cee16feba67b1bb7cb58cc798bef5a71 Mon Sep 17 00:00:00 2001 From: yfh Date: Mon, 24 Feb 2025 22:55:38 +0800 Subject: [PATCH] =?UTF-8?q?=E7=8E=AF=E8=8A=82=E5=A4=84=E7=90=86=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E6=89=B9=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_cache/cacheHandler.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/et_cache/cacheHandler.go b/et_cache/cacheHandler.go index 4d5fc7e..3741025 100644 --- a/et_cache/cacheHandler.go +++ b/et_cache/cacheHandler.go @@ -5,10 +5,9 @@ import ( "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models/constant/redisKey" - "gitea.anxinyun.cn/container/common_utils" - "gitea.anxinyun.cn/container/common_utils/configLoad" "log" "node/stages" + "sync" ) type CacheHandler struct { @@ -16,19 +15,33 @@ type CacheHandler struct { stage *stages.Stage } -func NewCacheHandler() *CacheHandler { - redisAddr := configLoad.LoadConfig().GetString("redis.address") - configHelper := common_utils.NewConfigHelper(redisAddr) +func NewCacheHandler(cacheServer *cacheSer.CacheServer) *CacheHandler { + //redisAddr := configLoad.LoadConfig().GetString("redis.address") + //configHelper := common_utils.NewConfigHelper(redisAddr) the := &CacheHandler{ - stage: stages.NewStage("测点数据缓存"), - cacheServer: cacheSer.NewCacheServer(configHelper), + stage: stages.NewStage("滑窗过滤"), + cacheServer: cacheServer, //cacheSer.NewCacheServer(configHelper), } - the.stage.AddProcess(the.enqueue) + the.stage.AddProcess(the.enqueueForStations) return the } func (the *CacheHandler) GetStage() stages.Stage { return *the.stage } + +func (the *CacheHandler) enqueueForStations(data []*common_models.ProcessData) []*common_models.ProcessData { + var wg sync.WaitGroup // 初始化 WaitGroup + for _, processData := range data { + wg.Add(1) + go func(pd *common_models.ProcessData) { + defer wg.Done() + the.enqueue(pd) + }(processData) + } + wg.Wait() + return data +} + func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.ProcessData { for _, station := range p.Stations {