You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
210 lines
4.2 KiB
210 lines
4.2 KiB
package common_models
|
|
|
|
import (
|
|
"container/ring"
|
|
"encoding/json"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type AnalyzeData struct {
|
|
Raw float64 `json:"raw"`
|
|
IsValid bool `json:"isValid"`
|
|
Data float64 `json:"data"`
|
|
}
|
|
type CacheWinSave struct {
|
|
*CacheWindow
|
|
AllData []AnalyzeData `json:"allData"`
|
|
}
|
|
|
|
type expirationInfo struct {
|
|
UpdateTime time.Time
|
|
//过期秒数
|
|
Duration float64
|
|
Expired bool
|
|
}
|
|
|
|
// CacheWindow
|
|
// 存储大量测点缓存数据
|
|
type CacheWindow struct {
|
|
Id string
|
|
windowLen int //窗体长度
|
|
WindowSize int //窗体大小
|
|
MethodId int //滑窗方法
|
|
LatestData any
|
|
ring *ring.Ring
|
|
Params FilterParams
|
|
//过期控制
|
|
Expire expirationInfo
|
|
}
|
|
|
|
func (c *CacheWindow) Size() int {
|
|
return c.WindowSize
|
|
}
|
|
|
|
// Len 获取窗口里有效长度(数据数量)
|
|
func (c *CacheWindow) Len() int {
|
|
return c.windowLen
|
|
}
|
|
func (c *CacheWindow) CheckExpiration() bool {
|
|
if time.Now().Sub(c.Expire.UpdateTime).Seconds() > c.Expire.Duration {
|
|
c.Expire.Expired = true
|
|
}
|
|
return c.Expire.Expired
|
|
}
|
|
|
|
func (c *CacheWindow) ToSaveCache() CacheWinSave {
|
|
return CacheWinSave{
|
|
CacheWindow: c,
|
|
AllData: c.DeQueueAllAnalyzeData(), //追加存储的中间字段
|
|
}
|
|
}
|
|
|
|
// redis序列化
|
|
func (c *CacheWindow) MarshalBinary() (data []byte, err error) {
|
|
// 序列化时包含新字段
|
|
sCache := CacheWinSave{
|
|
CacheWindow: c,
|
|
AllData: c.DeQueueAllAnalyzeData(), //追加存储的中间字段
|
|
}
|
|
return json.Marshal(sCache)
|
|
}
|
|
|
|
// redis序列化
|
|
func (c *CacheWindow) UnmarshalBinary(data []byte) error {
|
|
return json.Unmarshal(data, c)
|
|
}
|
|
func NewCacheWindow(id string, size, methodId int, params FilterParams) CacheWindow {
|
|
limit := 10
|
|
if size > limit {
|
|
size = 10
|
|
}
|
|
return CacheWindow{
|
|
Id: id,
|
|
WindowSize: size,
|
|
MethodId: methodId,
|
|
ring: ring.New(size),
|
|
Params: params,
|
|
Expire: expirationInfo{
|
|
UpdateTime: time.Now(),
|
|
Duration: 60 * 2,
|
|
Expired: false,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *CacheWindow) ReInitialRing() {
|
|
defer func(*CacheWindow) {
|
|
if r := recover(); r != nil {
|
|
log.Println(r)
|
|
log.Println(c.WindowSize)
|
|
}
|
|
}(c)
|
|
|
|
if c == nil {
|
|
log.Printf("c=nil")
|
|
return
|
|
}
|
|
c.ring = ring.New(c.WindowSize)
|
|
}
|
|
|
|
func (c *CacheWindow) EnQueue(d any) {
|
|
if c.windowLen < c.WindowSize {
|
|
c.windowLen += 1
|
|
}
|
|
c.LatestData = d
|
|
c.ring.Value = d
|
|
c.ring = c.ring.Next()
|
|
if c.MethodId != 0 {
|
|
log.Printf("[%s]测点缓存[m=%d] %d/%d", c.Id, c.MethodId, c.WindowSize, c.ring.Len())
|
|
}
|
|
}
|
|
func (c *CacheWindow) EnQueueAnalyzeData(d AnalyzeData) {
|
|
if c.windowLen < c.WindowSize {
|
|
c.windowLen += 1
|
|
}
|
|
c.LatestData = d
|
|
c.ring.Value = d
|
|
c.ring = c.ring.Next()
|
|
log.Printf("[%s]测点缓存[m=%d] %d/%d", c.Id, c.MethodId, c.WindowSize, c.ring.Len())
|
|
}
|
|
func (c *CacheWindow) DeQueue() any {
|
|
return c.ring.Prev().Value
|
|
}
|
|
func (c *CacheWindow) Latest() any {
|
|
return c.ring.Prev().Value
|
|
}
|
|
func (c *CacheWindow) LatestByAnalyzeData() (AnalyzeData, bool) {
|
|
var d AnalyzeData
|
|
var valid bool
|
|
if v, ok := c.ring.Prev().Value.(AnalyzeData); ok {
|
|
d = v
|
|
valid = true
|
|
}
|
|
return d, valid
|
|
}
|
|
func (c *CacheWindow) LatestByRange(size int) []any {
|
|
var resp []any
|
|
if size <= c.WindowSize {
|
|
pre := c.ring.Prev()
|
|
for i := 0; i < size; i++ {
|
|
resp = append(resp, pre.Value)
|
|
pre = pre.Prev()
|
|
}
|
|
}
|
|
return resp
|
|
}
|
|
func (c *CacheWindow) DeQueueAll() []any {
|
|
var all []any
|
|
c.ring.Do(func(d any) {
|
|
if d != nil {
|
|
all = append(all, d)
|
|
}
|
|
})
|
|
return all
|
|
}
|
|
func (c *CacheWindow) DeQueueAllAnalyzeData() []AnalyzeData {
|
|
Objs := c.DeQueueAll()
|
|
var all []AnalyzeData
|
|
for _, obj := range Objs {
|
|
if obj == nil {
|
|
continue
|
|
}
|
|
if v, ok := obj.(AnalyzeData); ok {
|
|
all = append(all, v)
|
|
} else { //类型不对 立即返回
|
|
return all
|
|
}
|
|
}
|
|
|
|
return all
|
|
}
|
|
func (c *CacheWindow) DeQueueAllData() ([]float64, bool) {
|
|
Objs := c.DeQueueAll()
|
|
var all []float64
|
|
for _, obj := range Objs {
|
|
if obj == nil {
|
|
continue
|
|
}
|
|
if v, ok := obj.(AnalyzeData); ok {
|
|
all = append(all, v.Data)
|
|
} else {
|
|
return all, false
|
|
}
|
|
}
|
|
|
|
return all, true
|
|
}
|
|
func (c *CacheWindow) DeQueueAllRaw() ([]float64, bool) {
|
|
Objs := c.DeQueueAll()
|
|
var all []float64
|
|
for _, obj := range Objs {
|
|
if v, ok := obj.(AnalyzeData); ok {
|
|
all = append(all, v.Raw)
|
|
} else {
|
|
return all, false
|
|
}
|
|
}
|
|
|
|
return all, true
|
|
}
|
|
|