From 0d78f58f2aefe4651756091556ed8bc32461a439 Mon Sep 17 00:00:00 2001 From: lucas Date: Thu, 17 Oct 2024 14:00:06 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=9B=B4=E6=96=B0=E6=BB=91=E7=AA=97?= =?UTF-8?q?=E7=BC=93=E5=AD=98=20=E8=AE=A1=E7=AE=97=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_cache/cacheHandler.go | 14 +++--- et_cache/cacheSer/cacheServer.go | 83 ++++++++++++++++++++++++-------- et_calc/dataCalc.go | 12 ++--- 3 files changed, 77 insertions(+), 32 deletions(-) diff --git a/et_cache/cacheHandler.go b/et_cache/cacheHandler.go index 6f9a46c..4d5fc7e 100644 --- a/et_cache/cacheHandler.go +++ b/et_cache/cacheHandler.go @@ -40,14 +40,14 @@ func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.Pr } cacheItemKey := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, station.Info.Id, item.FieldName) - cacheWindow, ok := the.cacheServer.ReadCacheMap(station.Info.Id, item.FieldName) + cacheWindow, ok := the.cacheServer.ReadCacheNew(station.Info.Id, item.FieldName) if !ok { - cacheWindow = the.cacheServer.CreatFilterWindow(station.Info.Id, item.FieldName) - } - - //todo 这里不能跳过 原因:关联公式 比如温补 就靠这个最新的一条进行存储后关联了... - if cacheWindow.MethodId == 0 { - //continue + log.Printf("测点[%d][%s]无滑窗配置,跳过", station.Info.Id, item.FieldName) + if cacheWindow.Len() > 0 { + log.Printf("测点[%d][%s]无滑窗配置,清理缓存的历史滑窗", station.Info.Id, item.FieldName) + the.cacheServer.DeleteCacheMap(cacheItemKey, cacheWindow) + } + continue } needItemCache := common_models.AnalyzeData{} diff --git a/et_cache/cacheSer/cacheServer.go b/et_cache/cacheSer/cacheServer.go index 9657c25..80aaad6 100644 --- a/et_cache/cacheSer/cacheServer.go +++ b/et_cache/cacheSer/cacheServer.go @@ -28,15 +28,15 @@ func NewCacheServer(configHelper *common_utils.ConfigHelper) *CacheServer { } } -func (c *CacheServer) CreatFilterWindow(stationId int, itemName string) common_models.CacheWindow { +func (c *CacheServer) CreatFilterWindow(stationId int, itemName string, reason string) common_models.CacheWindow { //新测点滑窗 size默认1 k := fmt.Sprintf("%d-%s", stationId, itemName) - cacheWindow := common_models.NewCacheWindow(k, 1, 0, common_models.FilterParams{}) + cacheWindow := common_models.NewCacheWindow(k, 1, 0, common_models.FilterParams{}, reason) Filter, err := c.configHelper.GetFilter(stationId) if err == nil { for _, item := range Filter.Items { if itemName == item.FieldName { - cacheWindow = common_models.NewCacheWindow(k, item.WindowSize, item.MethodId, item.Params) + cacheWindow = common_models.NewCacheWindow(k, item.WindowSize, item.MethodId, item.Params, reason) } } } @@ -45,36 +45,81 @@ func (c *CacheServer) CreatFilterWindow(stationId int, itemName string) common_m func (c *CacheServer) UpdateCacheMap(key string, value common_models.CacheWindow) { c.CacheWindowMaps.Store(key, value) + if value.Len() == 0 { + return + } err := c.configHelper.SetChainedCacheObjWithExpiration(key, value.ToSaveCache(), time.Hour*6) if err != nil { log.Printf("updateCacheMap 异常,err=%s", err.Error()) } } +func (c *CacheServer) DeleteCacheMap(key string, value common_models.CacheWindow) { + c.CacheWindowMaps.Delete(key) + err := c.configHelper.DeleteChainedCacheObj(key) + if err != nil { + log.Printf("deleteCacheMap 异常,err=%s", err.Error()) + } +} -// ReadCacheMap -// read map 时 判断时间 超过2分钟 重新读取 -func (c *CacheServer) ReadCacheMap(stationId int, itemName string) (common_models.CacheWindow, bool) { +func (c *CacheServer) readCache(stationId int, itemName string) (common_models.CacheWindow, bool) { key := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, stationId, itemName) if obj, ok := c.CacheWindowMaps.Load(key); ok { win, ok2 := obj.(common_models.CacheWindow) return win, ok2 } - if preWindow, err := c.configHelper.GetCacheWindowObj(key); err == nil { - //过期 - if preWindow.CheckExpiration() { - preData := preWindow.DeQueueAll() - reloadWindow := c.CreatFilterWindow(stationId, itemName) - if len(preData) > 0 { - for _, datum := range preData { - reloadWindow.EnQueue(datum) - } - return reloadWindow, true + + Filter, err := c.configHelper.GetFilter(stationId) + + if err != nil { + log.Printf("GetFilter 异常,err=%s", err.Error()) + return common_models.CacheWindow{}, false + } + + if len(Filter.Items) == 0 { + return common_models.CacheWindow{}, false + } + for _, item := range Filter.Items { + if itemName == item.FieldName { + //首次创建,尝试读取缓存 + if preWindow, err := c.configHelper.GetCacheWindowObj(key); err == nil { + log.Printf("CacheWindow读取[%s]", key) + return preWindow, true + } + log.Printf("创建[%s]", key) + newWin := c.CreatFilterWindow(stationId, itemName, "default") + return newWin, true + } + } + return common_models.CacheWindow{}, false +} +func (c *CacheServer) ReadCacheNew(stationId int, itemName string) (common_models.CacheWindow, bool) { + key := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, stationId, itemName) + win, ok := c.readCache(stationId, itemName) + if !ok { + log.Printf("无滑窗[%s]", key) + return win, ok + } + + if win.CheckExpiration() { + //过期判定 有没有删除 + filterItem, err := c.configHelper.GetFilterItem(stationId, itemName) + log.Printf("滑窗[%s]过期,err=%v", key, err) + if err != nil || filterItem.WindowSize == 0 { + log.Printf("滑窗[%s]过期,且FilterItem无配置或窗口长度为0", key) + return win, false + } + //重新创建滑窗 + preData := win.DeQueueAll() + reloadWindow := c.CreatFilterWindow(stationId, itemName, "default") + if len(preData) > 0 { + for _, datum := range preData { + reloadWindow.EnQueue(datum) } + log.Printf("滑窗过期,触发重建[%s]", key) + return reloadWindow, true } - return preWindow, true } - v := c.CreatFilterWindow(stationId, itemName) - return v, true + return win, ok } // 获取缓存总数 测试用 diff --git a/et_calc/dataCalc.go b/et_calc/dataCalc.go index 6aafeca..4805a80 100644 --- a/et_calc/dataCalc.go +++ b/et_calc/dataCalc.go @@ -214,7 +214,12 @@ func (the *CalcHandler) strainCompensationByTemperature(device common_models.Sec err = errors.New(errMsg) return } - tempWindow, valid := the.cacheServer.ReadCacheMap(tempStationId, itemName) + tempWindow, valid := the.cacheServer.ReadCacheNew(tempStationId, itemName) + //如果没有温度窗口 创建温度窗口 + if !valid { + log.Printf("应变[%s]创建关联的温补滑窗 [%d]-[%s]", device.IotaDeviceId, tempStationId, itemName) + tempWindow = the.cacheServer.CreatFilterWindow(tempStationId, itemName, "strainRelated") + } if tempWindow.Len() == 0 { //redis 无 再去es中查询 indexName := configLoad.LoadConfig().GetString("es.index.theme") @@ -233,11 +238,6 @@ func (the *CalcHandler) strainCompensationByTemperature(device common_models.Sec } } - if !valid { - errMsg := fmt.Sprintf("[%s]计算公式[%d],无关联测点[%d]温度缓存数据", device.IotaDeviceId, device.FormulaInfo.Id, tempStationId) - err = errors.New(errMsg) - return - } if tempObj, ok := tempWindow.LatestByAnalyzeData(); ok { tempData := tempObj.Data