You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
4.2 KiB

package common_models
import (
"container/ring"
"encoding/json"
"log"
"time"
)
type AnalyzeData struct {
Raw float64 `json:"raw"`
IsValid bool `json:"isValid"`
Data float64 `json:"data"`
}
type CacheWinSave struct {
*CacheWindow
AllData []AnalyzeData `json:"allData"`
}
type expirationInfo struct {
UpdateTime time.Time
//过期秒数
Duration float64
Expired bool
}
// CacheWindow
// 存储大量测点缓存数据
type CacheWindow struct {
Id string
windowLen int //窗体长度
WindowSize int //窗体大小
MethodId int //滑窗方法
LatestData any
ring *ring.Ring
Params FilterParams
//过期控制
Expire expirationInfo
}
func (c *CacheWindow) Size() int {
return c.WindowSize
}
// Len 获取窗口里有效长度(数据数量)
func (c *CacheWindow) Len() int {
return c.windowLen
}
func (c *CacheWindow) CheckExpiration() bool {
if time.Now().Sub(c.Expire.UpdateTime).Seconds() > c.Expire.Duration {
c.Expire.Expired = true
}
return c.Expire.Expired
}
func (c *CacheWindow) ToSaveCache() CacheWinSave {
return CacheWinSave{
CacheWindow: c,
AllData: c.DeQueueAllAnalyzeData(), //追加存储的中间字段
}
}
// redis序列化
func (c *CacheWindow) MarshalBinary() (data []byte, err error) {
// 序列化时包含新字段
sCache := CacheWinSave{
CacheWindow: c,
AllData: c.DeQueueAllAnalyzeData(), //追加存储的中间字段
}
return json.Marshal(sCache)
}
// redis序列化
func (c *CacheWindow) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, c)
}
func NewCacheWindow(id string, size, methodId int, params FilterParams) CacheWindow {
limit := 10
if size > limit {
size = 10
}
return CacheWindow{
Id: id,
WindowSize: size,
MethodId: methodId,
ring: ring.New(size),
Params: params,
Expire: expirationInfo{
UpdateTime: time.Now(),
Duration: 60 * 2,
Expired: false,
},
}
}
func (c *CacheWindow) ReInitialRing() {
defer func(*CacheWindow) {
if r := recover(); r != nil {
log.Println(r)
log.Println(c.WindowSize)
}
}(c)
if c == nil {
log.Printf("c=nil")
return
}
c.ring = ring.New(c.WindowSize)
}
func (c *CacheWindow) EnQueue(d any) {
if c.windowLen < c.WindowSize {
c.windowLen += 1
}
c.LatestData = d
c.ring.Value = d
c.ring = c.ring.Next()
if c.MethodId != 0 {
log.Printf("[%s]测点缓存[m=%d] %d/%d", c.Id, c.MethodId, c.WindowSize, c.ring.Len())
}
}
func (c *CacheWindow) EnQueueAnalyzeData(d AnalyzeData) {
if c.windowLen < c.WindowSize {
c.windowLen += 1
}
c.LatestData = d
c.ring.Value = d
c.ring = c.ring.Next()
log.Printf("[%s]测点缓存[m=%d] %d/%d", c.Id, c.MethodId, c.WindowSize, c.ring.Len())
}
func (c *CacheWindow) DeQueue() any {
return c.ring.Prev().Value
}
func (c *CacheWindow) Latest() any {
return c.ring.Prev().Value
}
func (c *CacheWindow) LatestByAnalyzeData() (AnalyzeData, bool) {
var d AnalyzeData
var valid bool
if v, ok := c.ring.Prev().Value.(AnalyzeData); ok {
d = v
valid = true
}
return d, valid
}
func (c *CacheWindow) LatestByRange(size int) []any {
var resp []any
if size <= c.WindowSize {
pre := c.ring.Prev()
for i := 0; i < size; i++ {
resp = append(resp, pre.Value)
pre = pre.Prev()
}
}
return resp
}
func (c *CacheWindow) DeQueueAll() []any {
var all []any
c.ring.Do(func(d any) {
if d != nil {
all = append(all, d)
}
})
return all
}
func (c *CacheWindow) DeQueueAllAnalyzeData() []AnalyzeData {
Objs := c.DeQueueAll()
var all []AnalyzeData
for _, obj := range Objs {
if obj == nil {
continue
}
if v, ok := obj.(AnalyzeData); ok {
all = append(all, v)
} else { //类型不对 立即返回
return all
}
}
return all
}
func (c *CacheWindow) DeQueueAllData() ([]float64, bool) {
Objs := c.DeQueueAll()
var all []float64
for _, obj := range Objs {
if obj == nil {
continue
}
if v, ok := obj.(AnalyzeData); ok {
all = append(all, v.Data)
} else {
return all, false
}
}
return all, true
}
func (c *CacheWindow) DeQueueAllRaw() ([]float64, bool) {
Objs := c.DeQueueAll()
var all []float64
for _, obj := range Objs {
if v, ok := obj.(AnalyzeData); ok {
all = append(all, v.Raw)
} else {
return all, false
}
}
return all, true
}