Compare commits

...

5 Commits
v0.0.12 ... dev

  1. 310
      configHelper.go
  2. 16
      redisHelper.go
  3. 7
      unitHelper.go

310
configHelper.go

@ -4,6 +4,7 @@ import (
"context" "context"
"gitea.anxinyun.cn/container/common_models/constant/settlementParam" "gitea.anxinyun.cn/container/common_models/constant/settlementParam"
"strings" "strings"
"sync"
//"encoding/json" //"encoding/json"
"errors" "errors"
@ -18,31 +19,26 @@ import (
var ProtoCache map[string]common_models.Proto var ProtoCache map[string]common_models.Proto
var FormulaCache map[int]common_models.Formula var FormulaCache map[int]common_models.Formula
var DeviceInfoCache map[string]common_models.DeviceInfo var DeviceInfoCache map[string]common_models.DeviceInfo
var DeviceMetaCache map[string]common_models.DeviceMeta
var deviceStationIdsCache map[string][]int var deviceStationIdsCache map[string][]int
var deviceFactorProtoMap map[string]common_models.DeviceFactorProto var deviceFactorProtoMap map[string]common_models.DeviceFactorProto
var stationCache map[int]common_models.Station //todo 新类型 var stationCache map[int]common_models.Station //todo 新类型
var IotaDeviceCache map[string]common_models.IotaDevice var IotaDeviceCache map[string]common_models.IotaDevice
var ThingStructCache map[string]common_models.ThingStruct
var DeviceNodeCache map[string]common_models.IotaInstances var DeviceNodeCache map[string]common_models.IotaInstances
var AlarmCodeCache map[string]common_models.AlarmCode var AlarmCodeCache map[string]common_models.AlarmCode
var StructureCache map[int]common_models.Structure var StructureCache map[int]common_models.Structure
// var stationThreshold map[int]common_models.Threshold
// var aggThreshold map[string]common_models.AggThreshold
var taskTime *time.Ticker
type ConfigHelper struct { type ConfigHelper struct {
redisHelper *RedisHelper //普通缓存用 redisHelper *RedisHelper //普通缓存用
chainedCache *ChainedCache chainedCache *ChainedCache
ctx context.Context ctx context.Context
mu sync.RWMutex
} }
func NewConfigHelper(redisAddr string) *ConfigHelper { func NewConfigHelper(redisAddr string) *ConfigHelper {
initDeviceInfoMapCache() initDeviceInfoMapCache()
return &ConfigHelper{ return &ConfigHelper{
redisHelper: NewRedisHelper("", redisAddr), redisHelper: NewRedisHelper("", redisAddr),
chainedCache: NewChainedCache(redisAddr), chainedCache: NewChainedCache(redisAddr),
@ -59,14 +55,6 @@ func initDeviceInfoMapCache() {
IotaDeviceCache = make(map[string]common_models.IotaDevice) IotaDeviceCache = make(map[string]common_models.IotaDevice)
} }
if ThingStructCache == nil {
ThingStructCache = make(map[string]common_models.ThingStruct)
}
if DeviceMetaCache == nil {
DeviceMetaCache = make(map[string]common_models.DeviceMeta)
}
if deviceStationIdsCache == nil { if deviceStationIdsCache == nil {
deviceStationIdsCache = make(map[string][]int) deviceStationIdsCache = make(map[string][]int)
} }
@ -94,23 +82,61 @@ func initDeviceInfoMapCache() {
} }
} }
// GetDeviceInfo 通过 func (the *ConfigHelper) GetDeviceInfo(deviceId string, forceUpdate bool) (*common_models.DeviceInfo, error) {
func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceInfo, error) { if !forceUpdate {
deviceInfo, ok := DeviceInfoCache[deviceId] the.mu.RLock()
if !ok { //去redis查询 deviceInfo, ok := DeviceInfoCache[deviceId]
device, err := the.GetIotaDevice(deviceId) the.mu.RUnlock()
if err != nil {
return nil, err if ok {
} return &deviceInfo, nil
thingStruct, err := the.GetThingStruct(device.ThingId)
if err != nil {
return nil, err
}
iotaMeta, err := the.GetIotaMeta(device.DeviceMetaId)
if err != nil {
return nil, err
} }
s := common_models.Structure{ }
// 首先获取 device
device, err := the.GetIotaDevice(deviceId)
if err != nil {
return nil, err
}
if device.ThingId == "" || device.DeviceMetaId == "" {
return nil, errors.New("IotaDevice 的 ThingId 或者 DeviceMetaId 为空")
}
// 使用 goroutines 并发加载数据
var wg sync.WaitGroup
var thingStruct common_models.ThingStruct
var iotaMeta common_models.DeviceMeta
var thingErr, metaErr error
// 获取 thingStruct
wg.Add(1)
go func() {
defer wg.Done()
thingStruct, thingErr = the.GetThingStruct(device.ThingId)
}()
// 获取 iotaMeta
wg.Add(1)
go func() {
defer wg.Done()
iotaMeta, metaErr = the.GetIotaMeta(device.DeviceMetaId)
}()
wg.Wait()
// 检查是否有错误
if thingErr != nil {
return nil, thingErr
}
if metaErr != nil {
return nil, metaErr
}
// 构建 DeviceInfo
result := common_models.DeviceInfo{
Id: device.Id,
Name: device.Name,
Structure: common_models.Structure{
ThingId: thingStruct.ThingId, ThingId: thingStruct.ThingId,
Id: thingStruct.Id, Id: thingStruct.Id,
Name: thingStruct.Name, Name: thingStruct.Name,
@ -118,20 +144,103 @@ func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceIn
OrgId: thingStruct.OrgId, OrgId: thingStruct.OrgId,
Latitude: 0, Latitude: 0,
Longitude: 0, Longitude: 0,
} },
DeviceMeta: iotaMeta,
}
//缓存 deviceInfo
the.mu.Lock()
DeviceInfoCache[deviceId] = result
the.mu.Unlock()
return &result, nil
}
deviceInfo = common_models.DeviceInfo{ // 定时更新设备信息
Id: device.Id, func (the *ConfigHelper) StartUpdateDeviceInfo(interval time.Duration, batchSize int) {
Name: device.Name, log.Printf("==== 启动定时更新 DeviceInfo 服务,interval = %v, batchSize = %d", interval, batchSize)
Structure: s,
DeviceMeta: iotaMeta, ticker := time.NewTicker(interval)
go func() {
for {
select {
case <-ticker.C:
func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("[StartUpdateDeviceInfo] Recovered from panic:", r)
}
}()
the.updateDeviceInfoCache(batchSize)
}()
}
} }
//缓存deviceInfo }()
DeviceInfoCache[deviceId] = deviceInfo }
var updatesPool = sync.Pool{
New: func() interface{} {
return make(map[string]common_models.DeviceInfo)
},
}
func (the *ConfigHelper) updateDeviceInfoCache(batchSize int) {
if batchSize < 0 {
panic("batchSize 不能为负数。")
} }
return &deviceInfo, nil
deviceIds := make([]string, 0, len(DeviceInfoCache))
for deviceId := range DeviceInfoCache {
deviceIds = append(deviceIds, deviceId)
}
// 批次处理设备
for i := 0; i < len(deviceIds); i += batchSize {
end := i + batchSize
if end > len(deviceIds) {
end = len(deviceIds)
}
batch := deviceIds[i:end]
var wg sync.WaitGroup
updates := updatesPool.Get().(map[string]common_models.DeviceInfo)
for k := range updates {
delete(updates, k)
}
var mu sync.Mutex
for _, deviceId := range batch {
wg.Add(1)
go func(id string) {
defer wg.Done()
// 强制更新设备信息
deviceInfo, err := the.GetDeviceInfo(id, true)
if err == nil {
mu.Lock()
updates[id] = *deviceInfo
mu.Unlock()
} else {
fmt.Printf("Failed to update cache for deviceId %s: %v\n", id, err)
}
}(deviceId)
}
wg.Wait()
// 批量更新 DeviceInfoCache
the.mu.Lock()
for id, info := range updates {
DeviceInfoCache[id] = info // 更新缓存
}
the.mu.Unlock()
// 将 updates 放回对象池
updatesPool.Put(updates)
}
} }
func (the *ConfigHelper) GetFormulaInfo(formulaId int) (common_models.Formula, error) { func (the *ConfigHelper) GetFormulaInfo(formulaId int) (common_models.Formula, error) {
var err error var err error
result, ok := FormulaCache[formulaId] result, ok := FormulaCache[formulaId]
@ -407,7 +516,7 @@ func (the *ConfigHelper) GetStationGroup(groupId int) (common_models.StationGrou
} }
err = json.Unmarshal([]byte(v), &group) err = json.Unmarshal([]byte(v), &group)
if err != nil { if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error()) log.Printf("【redisKey.Group】【%s】json unmarshal error:%s \n", k, err.Error())
} }
} }
@ -429,7 +538,7 @@ func (the *ConfigHelper) GetStationGroupInfo(stationId int) (common_models.Stati
if v, ok := value.(string); ok { if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &info) err = json.Unmarshal([]byte(v), &info)
if err != nil { if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error()) log.Printf("【redisKey.Station_group】【%s】json unmarshal error:%s \n", k, err.Error())
} }
} }
return info, err return info, err
@ -448,7 +557,7 @@ func (the *ConfigHelper) GetStationCorrGroups(stationId int) ([]common_models.St
if v, ok := value.(string); ok { if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &groupIds) err = json.Unmarshal([]byte(v), &groupIds)
if err != nil { if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error()) log.Printf("【redisKey.Station_corr_group】【%s】json unmarshal error:%s \n", k, err.Error())
} }
} }
if err != nil { if err != nil {
@ -509,27 +618,75 @@ func (the *ConfigHelper) GetAggThreshold(structId int, factorId int) (*common_mo
} }
func (the *ConfigHelper) GetIotaMeta(deviceId string) (common_models.DeviceMeta, error) { func (the *ConfigHelper) GetIotaMeta(deviceId string) (common_models.DeviceMeta, error) {
var err error //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
result, ok := DeviceMetaCache[deviceId] k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
if !ok { //去redis查询
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf result := common_models.DeviceMeta{}
k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
//var deviceMeta common_models.DeviceMeta if err != nil {
err = the.redisHelper.GetObj(k, &result) return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
} }
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err return result, err
//var err error
//result, ok := DeviceMetaCache[deviceId]
//if !ok { //去redis查询
// //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
// k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
// //var deviceMeta common_models.DeviceMeta
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
} }
func (the *ConfigHelper) GetIotaDevice(deviceId string) (common_models.IotaDevice, error) { func (the *ConfigHelper) GetIotaDevice(deviceId string) (common_models.IotaDevice, error) {
var err error k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
result, ok := IotaDeviceCache[deviceId]
if !ok { //去redis查询
//iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
err = the.redisHelper.GetObj(k, &result) result := common_models.IotaDevice{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
} }
return result, err return result, err
//var err error
//result, ok := IotaDeviceCache[deviceId]
//if !ok { //去redis查询
// //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
// k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
} }
func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string) (common_models.DeviceFactorProto, error) { func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string) (common_models.DeviceFactorProto, error) {
@ -545,16 +702,41 @@ func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string
} }
func (the *ConfigHelper) GetThingStruct(deviceId string) (common_models.ThingStruct, error) { func (the *ConfigHelper) GetThingStruct(deviceId string) (common_models.ThingStruct, error) {
var err error //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
result, ok := ThingStructCache[deviceId] k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
if !ok { //去redis查询
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
err = the.redisHelper.GetObj(k, &result) result := common_models.ThingStruct{}
ThingStructCache[deviceId] = result value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
} }
return result, err return result, err
//var err error
//result, ok := ThingStructCache[deviceId]
//if !ok { //去redis查询
// //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
// k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
//
// err = the.redisHelper.GetObj(k, &result)
// ThingStructCache[deviceId] = result
//}
//return result, err
} }
func (the *ConfigHelper) GetDataUnit() ([]common_models.DataUnit, error) { func (the *ConfigHelper) GetDataUnit() ([]common_models.DataUnit, error) {

16
redisHelper.go

@ -31,7 +31,23 @@ func NewRedisHelper(master string, address ...string) *RedisHelper {
} }
func (the *RedisHelper) InitialCluster(master string, address ...string) { func (the *RedisHelper) InitialCluster(master string, address ...string) {
var opts *redis.UniversalOptions
if master != "" {
opts = &redis.UniversalOptions{Addrs: address, MasterName: "mymaster", PoolSize: 10}
} else {
opts = &redis.UniversalOptions{Addrs: address, PoolSize: 10}
}
the.rdb = redis.NewUniversalClient(opts)
if the.rdb == nil {
log.Fatal("Failed to initialize Redis client")
}
log.Printf("Redis client initialized with addresses: %s", address)
the.isReady = true
}
func (the *RedisHelper) InitialCluster_old(master string, address ...string) {
if master != "" { if master != "" {
the.rdb = redis.NewUniversalClient(&redis.UniversalOptions{ the.rdb = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: address, Addrs: address,

7
unitHelper.go

@ -2,7 +2,6 @@ package common_utils
import ( import (
"gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log" "log"
"strings" "strings"
) )
@ -12,10 +11,10 @@ type UnitHelper struct {
unitsGroup map[string][]common_models.DataUnit unitsGroup map[string][]common_models.DataUnit
} }
func NewUnitHelper() *UnitHelper { func NewUnitHelper(configHelper *ConfigHelper) *UnitHelper {
redisAddr := configLoad.LoadConfig().GetString("redis.address") //redisAddr := configLoad.LoadConfig().GetString("redis.address")
unitHelper := &UnitHelper{ unitHelper := &UnitHelper{
configHelper: NewConfigHelper(redisAddr), configHelper: configHelper, //NewConfigHelper(redisAddr),
unitsGroup: map[string][]common_models.DataUnit{}, unitsGroup: map[string][]common_models.DataUnit{},
} }
unitHelper.getUnitsGroup() unitHelper.getUnitsGroup()

Loading…
Cancel
Save