et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
3.1 KiB

1 month ago
package et_cache
import (
"et_cache/cacheSer"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_models/constant/redisKey"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"node/stages"
)
type CacheHandler struct {
cacheServer *cacheSer.CacheServer
stage *stages.Stage
}
func NewCacheHandler() *CacheHandler {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
configHelper := common_utils.NewConfigHelper(redisAddr)
the := &CacheHandler{
stage: stages.NewStage("测点数据缓存"),
cacheServer: cacheSer.NewCacheServer(configHelper),
}
the.stage.AddProcess(the.enqueue)
return the
}
func (the *CacheHandler) GetStage() stages.Stage {
return *the.stage
}
func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.ProcessData {
1 month ago
for _, station := range p.Stations {
for _, item := range station.Info.Proto.Items {
//字符串类型不处理
itemV := station.Data.ThemeData[item.FieldName]
if _, ok := itemV.(string); ok {
continue
}
1 month ago
cacheItemKey := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, station.Info.Id, item.FieldName)
cacheWindow, ok := the.cacheServer.ReadCacheMap(station.Info.Id, item.FieldName)
if !ok {
cacheWindow = the.cacheServer.CreatFilterWindow(station.Info.Id, item.FieldName)
}
//todo 这里不能跳过 原因:关联公式 比如温补 就靠这个最新的一条进行存储后关联了...
if cacheWindow.MethodId == 0 {
//continue
}
1 month ago
needItemCache := common_models.AnalyzeData{}
if value, ok := station.Data.ThemeData[item.FieldName]; ok {
//目前只支持float64类型的缓存
if v, ok := value.(float64); ok {
needItemCache = common_models.AnalyzeData{
Raw: v,
IsValid: true,
Data: 0,
}
//滑窗计算
isWinCalcValid := false
needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow)
if isWinCalcValid {
station.Data.ThemeData[item.FieldName] = needItemCache.Data
station.Data.PhyData[item.FieldName] = needItemCache.Data
1 month ago
}
}
}
//缓存
cacheWindow.EnQueue(needItemCache)
the.cacheServer.UpdateCacheMap(cacheItemKey, cacheWindow)
1 month ago
}
}
return p
}
func (the *CacheHandler) windowCalc(raw float64, window common_models.CacheWindow) (float64, bool) {
if window.Size() == 0 {
return raw, false
}
result := 0.0
switch window.MethodId {
case common_models.Filter_CalcMedian:
result = filterForMedian(raw, window)
case common_models.Filter_LimitAmp:
result = filterForLimitAmp(raw, window)
case common_models.Filter_CalcMeanValue:
result = filterForMean(raw, window)
case common_models.Filter_CalcStvMean:
result = filterForMeanStandardDeviation(raw, window)
case common_models.Filter_CalcWindow:
//result = filterForWave(raw, window)
case common_models.Filter_ExtreAverage:
result = filterForExtreAverage(raw, window)
default:
if window.MethodId != 0 {
log.Printf("不支持滑窗公式id:[%d]", window.MethodId)
}
1 month ago
return raw, false
}
return result, true
}