package et_cache import ( "et_cache/cacheSer" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models/constant/redisKey" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "log" "node/stages" ) type CacheHandler struct { cacheServer *cacheSer.CacheServer stage *stages.Stage } func NewCacheHandler() *CacheHandler { redisAddr := configLoad.LoadConfig().GetString("redis.address") configHelper := common_utils.NewConfigHelper(redisAddr) the := &CacheHandler{ stage: stages.NewStage("测点数据缓存"), cacheServer: cacheSer.NewCacheServer(configHelper), } the.stage.AddProcess(the.enqueue) return the } func (the *CacheHandler) GetStage() stages.Stage { return *the.stage } func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.ProcessData { for _, station := range p.Stations { for _, item := range station.Info.Proto.Items { //字符串类型不处理 itemV := station.Data.ThemeData[item.FieldName] if _, ok := itemV.(string); ok { continue } cacheItemKey := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, station.Info.Id, item.FieldName) cacheWindow, ok := the.cacheServer.ReadCacheNew(station.Info.Id, item.FieldName) if !ok { log.Printf("测点[%d][%s]无滑窗配置,跳过", station.Info.Id, item.FieldName) if cacheWindow.Len() > 0 { log.Printf("测点[%d][%s]无滑窗配置,清理缓存的历史滑窗", station.Info.Id, item.FieldName) the.cacheServer.DeleteCacheMap(cacheItemKey, cacheWindow) } continue } needItemCache := common_models.AnalyzeData{} if value, ok := station.Data.ThemeData[item.FieldName]; ok { //目前只支持float64类型的缓存 if v, ok := value.(float64); ok { needItemCache = common_models.AnalyzeData{ Raw: v, IsValid: true, Data: 0, } //滑窗计算 isWinCalcValid := false needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow) if isWinCalcValid { station.Data.ThemeData[item.FieldName] = needItemCache.Data station.Data.PhyData[item.FieldName] = needItemCache.Data } } } //缓存 cacheWindow.EnQueue(needItemCache) the.cacheServer.UpdateCacheMap(cacheItemKey, cacheWindow) } } return p } func (the *CacheHandler) windowCalc(raw float64, window common_models.CacheWindow) (float64, bool) { if window.Size() == 0 || window.Len() == 0 { return raw, false } result := 0.0 switch window.MethodId { case common_models.Filter_CalcMedian: result = filterForMedian(raw, window) case common_models.Filter_LimitAmp: result = filterForLimitAmp(raw, window) case common_models.Filter_CalcMeanValue: result = filterForMean(raw, window) case common_models.Filter_CalcStvMean: result = filterForMeanStandardDeviation(raw, window) case common_models.Filter_CalcWindow: //result = filterForWave(raw, window) case common_models.Filter_ExtreAverage: result = filterForExtreAverage(raw, window) default: if window.MethodId != 0 { log.Printf("不支持滑窗公式id:[%d]", window.MethodId) } return raw, false } return result, true }