Browse Source

update 更新滑窗缓存 计算部分

dev
lucas 2 weeks ago
parent
commit
0d78f58f2a
  1. 14
      et_cache/cacheHandler.go
  2. 83
      et_cache/cacheSer/cacheServer.go
  3. 12
      et_calc/dataCalc.go

14
et_cache/cacheHandler.go

@ -40,14 +40,14 @@ func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.Pr
} }
cacheItemKey := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, station.Info.Id, item.FieldName) cacheItemKey := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, station.Info.Id, item.FieldName)
cacheWindow, ok := the.cacheServer.ReadCacheMap(station.Info.Id, item.FieldName) cacheWindow, ok := the.cacheServer.ReadCacheNew(station.Info.Id, item.FieldName)
if !ok { if !ok {
cacheWindow = the.cacheServer.CreatFilterWindow(station.Info.Id, item.FieldName) log.Printf("测点[%d][%s]无滑窗配置,跳过", station.Info.Id, item.FieldName)
} if cacheWindow.Len() > 0 {
log.Printf("测点[%d][%s]无滑窗配置,清理缓存的历史滑窗", station.Info.Id, item.FieldName)
//todo 这里不能跳过 原因:关联公式 比如温补 就靠这个最新的一条进行存储后关联了... the.cacheServer.DeleteCacheMap(cacheItemKey, cacheWindow)
if cacheWindow.MethodId == 0 { }
//continue continue
} }
needItemCache := common_models.AnalyzeData{} needItemCache := common_models.AnalyzeData{}

83
et_cache/cacheSer/cacheServer.go

@ -28,15 +28,15 @@ func NewCacheServer(configHelper *common_utils.ConfigHelper) *CacheServer {
} }
} }
func (c *CacheServer) CreatFilterWindow(stationId int, itemName string) common_models.CacheWindow { func (c *CacheServer) CreatFilterWindow(stationId int, itemName string, reason string) common_models.CacheWindow {
//新测点滑窗 size默认1 //新测点滑窗 size默认1
k := fmt.Sprintf("%d-%s", stationId, itemName) k := fmt.Sprintf("%d-%s", stationId, itemName)
cacheWindow := common_models.NewCacheWindow(k, 1, 0, common_models.FilterParams{}) cacheWindow := common_models.NewCacheWindow(k, 1, 0, common_models.FilterParams{}, reason)
Filter, err := c.configHelper.GetFilter(stationId) Filter, err := c.configHelper.GetFilter(stationId)
if err == nil { if err == nil {
for _, item := range Filter.Items { for _, item := range Filter.Items {
if itemName == item.FieldName { if itemName == item.FieldName {
cacheWindow = common_models.NewCacheWindow(k, item.WindowSize, item.MethodId, item.Params) cacheWindow = common_models.NewCacheWindow(k, item.WindowSize, item.MethodId, item.Params, reason)
} }
} }
} }
@ -45,36 +45,81 @@ func (c *CacheServer) CreatFilterWindow(stationId int, itemName string) common_m
func (c *CacheServer) UpdateCacheMap(key string, value common_models.CacheWindow) { func (c *CacheServer) UpdateCacheMap(key string, value common_models.CacheWindow) {
c.CacheWindowMaps.Store(key, value) c.CacheWindowMaps.Store(key, value)
if value.Len() == 0 {
return
}
err := c.configHelper.SetChainedCacheObjWithExpiration(key, value.ToSaveCache(), time.Hour*6) err := c.configHelper.SetChainedCacheObjWithExpiration(key, value.ToSaveCache(), time.Hour*6)
if err != nil { if err != nil {
log.Printf("updateCacheMap 异常,err=%s", err.Error()) log.Printf("updateCacheMap 异常,err=%s", err.Error())
} }
} }
func (c *CacheServer) DeleteCacheMap(key string, value common_models.CacheWindow) {
c.CacheWindowMaps.Delete(key)
err := c.configHelper.DeleteChainedCacheObj(key)
if err != nil {
log.Printf("deleteCacheMap 异常,err=%s", err.Error())
}
}
// ReadCacheMap func (c *CacheServer) readCache(stationId int, itemName string) (common_models.CacheWindow, bool) {
// read map 时 判断时间 超过2分钟 重新读取
func (c *CacheServer) ReadCacheMap(stationId int, itemName string) (common_models.CacheWindow, bool) {
key := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, stationId, itemName) key := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, stationId, itemName)
if obj, ok := c.CacheWindowMaps.Load(key); ok { if obj, ok := c.CacheWindowMaps.Load(key); ok {
win, ok2 := obj.(common_models.CacheWindow) win, ok2 := obj.(common_models.CacheWindow)
return win, ok2 return win, ok2
} }
if preWindow, err := c.configHelper.GetCacheWindowObj(key); err == nil {
//过期 Filter, err := c.configHelper.GetFilter(stationId)
if preWindow.CheckExpiration() {
preData := preWindow.DeQueueAll() if err != nil {
reloadWindow := c.CreatFilterWindow(stationId, itemName) log.Printf("GetFilter 异常,err=%s", err.Error())
if len(preData) > 0 { return common_models.CacheWindow{}, false
for _, datum := range preData { }
reloadWindow.EnQueue(datum)
} if len(Filter.Items) == 0 {
return reloadWindow, true return common_models.CacheWindow{}, false
}
for _, item := range Filter.Items {
if itemName == item.FieldName {
//首次创建,尝试读取缓存
if preWindow, err := c.configHelper.GetCacheWindowObj(key); err == nil {
log.Printf("CacheWindow读取[%s]", key)
return preWindow, true
}
log.Printf("创建[%s]", key)
newWin := c.CreatFilterWindow(stationId, itemName, "default")
return newWin, true
}
}
return common_models.CacheWindow{}, false
}
func (c *CacheServer) ReadCacheNew(stationId int, itemName string) (common_models.CacheWindow, bool) {
key := fmt.Sprintf("%s:%d:%s", redisKey.CacheWindow, stationId, itemName)
win, ok := c.readCache(stationId, itemName)
if !ok {
log.Printf("无滑窗[%s]", key)
return win, ok
}
if win.CheckExpiration() {
//过期判定 有没有删除
filterItem, err := c.configHelper.GetFilterItem(stationId, itemName)
log.Printf("滑窗[%s]过期,err=%v", key, err)
if err != nil || filterItem.WindowSize == 0 {
log.Printf("滑窗[%s]过期,且FilterItem无配置或窗口长度为0", key)
return win, false
}
//重新创建滑窗
preData := win.DeQueueAll()
reloadWindow := c.CreatFilterWindow(stationId, itemName, "default")
if len(preData) > 0 {
for _, datum := range preData {
reloadWindow.EnQueue(datum)
} }
log.Printf("滑窗过期,触发重建[%s]", key)
return reloadWindow, true
} }
return preWindow, true
} }
v := c.CreatFilterWindow(stationId, itemName) return win, ok
return v, true
} }
// 获取缓存总数 测试用 // 获取缓存总数 测试用

12
et_calc/dataCalc.go

@ -214,7 +214,12 @@ func (the *CalcHandler) strainCompensationByTemperature(device common_models.Sec
err = errors.New(errMsg) err = errors.New(errMsg)
return return
} }
tempWindow, valid := the.cacheServer.ReadCacheMap(tempStationId, itemName) tempWindow, valid := the.cacheServer.ReadCacheNew(tempStationId, itemName)
//如果没有温度窗口 创建温度窗口
if !valid {
log.Printf("应变[%s]创建关联的温补滑窗 [%d]-[%s]", device.IotaDeviceId, tempStationId, itemName)
tempWindow = the.cacheServer.CreatFilterWindow(tempStationId, itemName, "strainRelated")
}
if tempWindow.Len() == 0 { if tempWindow.Len() == 0 {
//redis 无 再去es中查询 //redis 无 再去es中查询
indexName := configLoad.LoadConfig().GetString("es.index.theme") indexName := configLoad.LoadConfig().GetString("es.index.theme")
@ -233,11 +238,6 @@ func (the *CalcHandler) strainCompensationByTemperature(device common_models.Sec
} }
} }
if !valid {
errMsg := fmt.Sprintf("[%s]计算公式[%d],无关联测点[%d]温度缓存数据", device.IotaDeviceId, device.FormulaInfo.Id, tempStationId)
err = errors.New(errMsg)
return
}
if tempObj, ok := tempWindow.LatestByAnalyzeData(); ok { if tempObj, ok := tempWindow.LatestByAnalyzeData(); ok {
tempData := tempObj.Data tempData := tempObj.Data

Loading…
Cancel
Save