|
|
|
package et_cache
|
|
|
|
|
|
|
|
import (
|
|
|
|
"et_cache/cacheSer"
|
|
|
|
"fmt"
|
|
|
|
"gitea.anxinyun.cn/container/common_models"
|
|
|
|
"gitea.anxinyun.cn/container/common_models/constant/redisKey"
|
|
|
|
"gitea.anxinyun.cn/container/common_utils"
|
|
|
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
|
|
|
"log"
|
|
|
|
"node/stages"
|
|
|
|
)
|
|
|
|
|
|
|
|
type CacheHandler struct {
|
|
|
|
cacheServer *cacheSer.CacheServer
|
|
|
|
stage *stages.Stage
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewCacheHandler() *CacheHandler {
|
|
|
|
redisAddr := configLoad.LoadConfig().GetString("redis.address")
|
|
|
|
configHelper := common_utils.NewConfigHelper(redisAddr)
|
|
|
|
the := &CacheHandler{
|
|
|
|
stage: stages.NewStage("测点数据缓存"),
|
|
|
|
cacheServer: cacheSer.NewCacheServer(configHelper),
|
|
|
|
}
|
|
|
|
the.stage.AddProcess(the.enqueue)
|
|
|
|
return the
|
|
|
|
}
|
|
|
|
func (the *CacheHandler) GetStage() stages.Stage {
|
|
|
|
return *the.stage
|
|
|
|
}
|
|
|
|
func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.ProcessData {
|
|
|
|
|
|
|
|
for _, station := range p.Stations {
|
|
|
|
for _, item := range station.Info.Proto.Items {
|
|
|
|
//字符串类型不处理
|
|
|
|
itemV := station.Data.ThemeData[item.FieldName]
|
|
|
|
if _, ok := itemV.(string); ok {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
cacheItemKey := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, station.Info.Id, item.FieldName)
|
|
|
|
cacheWindow, ok := the.cacheServer.ReadCacheNew(station.Info.Id, item.FieldName)
|
|
|
|
if !ok {
|
|
|
|
log.Printf("测点[%d][%s]无滑窗配置,跳过", station.Info.Id, item.FieldName)
|
|
|
|
if cacheWindow.Len() > 0 {
|
|
|
|
log.Printf("测点[%d][%s]无滑窗配置,清理缓存的历史滑窗", station.Info.Id, item.FieldName)
|
|
|
|
the.cacheServer.DeleteCacheMap(cacheItemKey, cacheWindow)
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
needItemCache := common_models.AnalyzeData{}
|
|
|
|
if value, ok := station.Data.ThemeData[item.FieldName]; ok {
|
|
|
|
//目前只支持float64类型的缓存
|
|
|
|
if v, ok := value.(float64); ok {
|
|
|
|
needItemCache = common_models.AnalyzeData{
|
|
|
|
Raw: v,
|
|
|
|
IsValid: true,
|
|
|
|
Data: 0,
|
|
|
|
}
|
|
|
|
//滑窗计算
|
|
|
|
isWinCalcValid := false
|
|
|
|
needItemCache.Data, isWinCalcValid = the.windowCalc(v, cacheWindow)
|
|
|
|
if isWinCalcValid {
|
|
|
|
station.Data.ThemeData[item.FieldName] = needItemCache.Data
|
|
|
|
station.Data.PhyData[item.FieldName] = needItemCache.Data
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
//缓存
|
|
|
|
cacheWindow.EnQueue(needItemCache)
|
|
|
|
the.cacheServer.UpdateCacheMap(cacheItemKey, cacheWindow)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
|
|
|
func (the *CacheHandler) windowCalc(raw float64, window common_models.CacheWindow) (float64, bool) {
|
|
|
|
if window.Size() == 0 || window.Len() == 0 {
|
|
|
|
return raw, false
|
|
|
|
}
|
|
|
|
result := 0.0
|
|
|
|
switch window.MethodId {
|
|
|
|
case common_models.Filter_CalcMedian:
|
|
|
|
result = filterForMedian(raw, window)
|
|
|
|
case common_models.Filter_LimitAmp:
|
|
|
|
result = filterForLimitAmp(raw, window)
|
|
|
|
case common_models.Filter_CalcMeanValue:
|
|
|
|
result = filterForMean(raw, window)
|
|
|
|
case common_models.Filter_CalcStvMean:
|
|
|
|
result = filterForMeanStandardDeviation(raw, window)
|
|
|
|
case common_models.Filter_CalcWindow:
|
|
|
|
//result = filterForWave(raw, window)
|
|
|
|
case common_models.Filter_ExtreAverage:
|
|
|
|
result = filterForExtreAverage(raw, window)
|
|
|
|
default:
|
|
|
|
if window.MethodId != 0 {
|
|
|
|
log.Printf("不支持滑窗公式id:[%d]", window.MethodId)
|
|
|
|
}
|
|
|
|
return raw, false
|
|
|
|
}
|
|
|
|
return result, true
|
|
|
|
}
|