Compare commits

...

5 Commits
v0.0.12 ... dev

  1. 310
      configHelper.go
  2. 16
      redisHelper.go
  3. 7
      unitHelper.go

310
configHelper.go

@ -4,6 +4,7 @@ import (
"context"
"gitea.anxinyun.cn/container/common_models/constant/settlementParam"
"strings"
"sync"
//"encoding/json"
"errors"
@ -18,31 +19,26 @@ import (
var ProtoCache map[string]common_models.Proto
var FormulaCache map[int]common_models.Formula
var DeviceInfoCache map[string]common_models.DeviceInfo
var DeviceMetaCache map[string]common_models.DeviceMeta
var deviceStationIdsCache map[string][]int
var deviceFactorProtoMap map[string]common_models.DeviceFactorProto
var stationCache map[int]common_models.Station //todo 新类型
var IotaDeviceCache map[string]common_models.IotaDevice
var ThingStructCache map[string]common_models.ThingStruct
var DeviceNodeCache map[string]common_models.IotaInstances
var AlarmCodeCache map[string]common_models.AlarmCode
var StructureCache map[int]common_models.Structure
// var stationThreshold map[int]common_models.Threshold
// var aggThreshold map[string]common_models.AggThreshold
var taskTime *time.Ticker
type ConfigHelper struct {
redisHelper *RedisHelper //普通缓存用
chainedCache *ChainedCache
ctx context.Context
mu sync.RWMutex
}
func NewConfigHelper(redisAddr string) *ConfigHelper {
initDeviceInfoMapCache()
return &ConfigHelper{
redisHelper: NewRedisHelper("", redisAddr),
chainedCache: NewChainedCache(redisAddr),
@ -59,14 +55,6 @@ func initDeviceInfoMapCache() {
IotaDeviceCache = make(map[string]common_models.IotaDevice)
}
if ThingStructCache == nil {
ThingStructCache = make(map[string]common_models.ThingStruct)
}
if DeviceMetaCache == nil {
DeviceMetaCache = make(map[string]common_models.DeviceMeta)
}
if deviceStationIdsCache == nil {
deviceStationIdsCache = make(map[string][]int)
}
@ -94,23 +82,61 @@ func initDeviceInfoMapCache() {
}
}
// GetDeviceInfo 通过
func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceInfo, error) {
deviceInfo, ok := DeviceInfoCache[deviceId]
if !ok { //去redis查询
device, err := the.GetIotaDevice(deviceId)
if err != nil {
return nil, err
}
thingStruct, err := the.GetThingStruct(device.ThingId)
if err != nil {
return nil, err
}
iotaMeta, err := the.GetIotaMeta(device.DeviceMetaId)
if err != nil {
return nil, err
func (the *ConfigHelper) GetDeviceInfo(deviceId string, forceUpdate bool) (*common_models.DeviceInfo, error) {
if !forceUpdate {
the.mu.RLock()
deviceInfo, ok := DeviceInfoCache[deviceId]
the.mu.RUnlock()
if ok {
return &deviceInfo, nil
}
s := common_models.Structure{
}
// 首先获取 device
device, err := the.GetIotaDevice(deviceId)
if err != nil {
return nil, err
}
if device.ThingId == "" || device.DeviceMetaId == "" {
return nil, errors.New("IotaDevice 的 ThingId 或者 DeviceMetaId 为空")
}
// 使用 goroutines 并发加载数据
var wg sync.WaitGroup
var thingStruct common_models.ThingStruct
var iotaMeta common_models.DeviceMeta
var thingErr, metaErr error
// 获取 thingStruct
wg.Add(1)
go func() {
defer wg.Done()
thingStruct, thingErr = the.GetThingStruct(device.ThingId)
}()
// 获取 iotaMeta
wg.Add(1)
go func() {
defer wg.Done()
iotaMeta, metaErr = the.GetIotaMeta(device.DeviceMetaId)
}()
wg.Wait()
// 检查是否有错误
if thingErr != nil {
return nil, thingErr
}
if metaErr != nil {
return nil, metaErr
}
// 构建 DeviceInfo
result := common_models.DeviceInfo{
Id: device.Id,
Name: device.Name,
Structure: common_models.Structure{
ThingId: thingStruct.ThingId,
Id: thingStruct.Id,
Name: thingStruct.Name,
@ -118,20 +144,103 @@ func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceIn
OrgId: thingStruct.OrgId,
Latitude: 0,
Longitude: 0,
}
},
DeviceMeta: iotaMeta,
}
//缓存 deviceInfo
the.mu.Lock()
DeviceInfoCache[deviceId] = result
the.mu.Unlock()
return &result, nil
}
deviceInfo = common_models.DeviceInfo{
Id: device.Id,
Name: device.Name,
Structure: s,
DeviceMeta: iotaMeta,
// 定时更新设备信息
func (the *ConfigHelper) StartUpdateDeviceInfo(interval time.Duration, batchSize int) {
log.Printf("==== 启动定时更新 DeviceInfo 服务,interval = %v, batchSize = %d", interval, batchSize)
ticker := time.NewTicker(interval)
go func() {
for {
select {
case <-ticker.C:
func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("[StartUpdateDeviceInfo] Recovered from panic:", r)
}
}()
the.updateDeviceInfoCache(batchSize)
}()
}
}
//缓存deviceInfo
DeviceInfoCache[deviceId] = deviceInfo
}()
}
var updatesPool = sync.Pool{
New: func() interface{} {
return make(map[string]common_models.DeviceInfo)
},
}
func (the *ConfigHelper) updateDeviceInfoCache(batchSize int) {
if batchSize < 0 {
panic("batchSize 不能为负数。")
}
return &deviceInfo, nil
deviceIds := make([]string, 0, len(DeviceInfoCache))
for deviceId := range DeviceInfoCache {
deviceIds = append(deviceIds, deviceId)
}
// 批次处理设备
for i := 0; i < len(deviceIds); i += batchSize {
end := i + batchSize
if end > len(deviceIds) {
end = len(deviceIds)
}
batch := deviceIds[i:end]
var wg sync.WaitGroup
updates := updatesPool.Get().(map[string]common_models.DeviceInfo)
for k := range updates {
delete(updates, k)
}
var mu sync.Mutex
for _, deviceId := range batch {
wg.Add(1)
go func(id string) {
defer wg.Done()
// 强制更新设备信息
deviceInfo, err := the.GetDeviceInfo(id, true)
if err == nil {
mu.Lock()
updates[id] = *deviceInfo
mu.Unlock()
} else {
fmt.Printf("Failed to update cache for deviceId %s: %v\n", id, err)
}
}(deviceId)
}
wg.Wait()
// 批量更新 DeviceInfoCache
the.mu.Lock()
for id, info := range updates {
DeviceInfoCache[id] = info // 更新缓存
}
the.mu.Unlock()
// 将 updates 放回对象池
updatesPool.Put(updates)
}
}
func (the *ConfigHelper) GetFormulaInfo(formulaId int) (common_models.Formula, error) {
var err error
result, ok := FormulaCache[formulaId]
@ -407,7 +516,7 @@ func (the *ConfigHelper) GetStationGroup(groupId int) (common_models.StationGrou
}
err = json.Unmarshal([]byte(v), &group)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("【redisKey.Group】【%s】json unmarshal error:%s \n", k, err.Error())
}
}
@ -429,7 +538,7 @@ func (the *ConfigHelper) GetStationGroupInfo(stationId int) (common_models.Stati
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &info)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("【redisKey.Station_group】【%s】json unmarshal error:%s \n", k, err.Error())
}
}
return info, err
@ -448,7 +557,7 @@ func (the *ConfigHelper) GetStationCorrGroups(stationId int) ([]common_models.St
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &groupIds)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
log.Printf("【redisKey.Station_corr_group】【%s】json unmarshal error:%s \n", k, err.Error())
}
}
if err != nil {
@ -509,27 +618,75 @@ func (the *ConfigHelper) GetAggThreshold(structId int, factorId int) (*common_mo
}
func (the *ConfigHelper) GetIotaMeta(deviceId string) (common_models.DeviceMeta, error) {
var err error
result, ok := DeviceMetaCache[deviceId]
if !ok { //去redis查询
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
//var deviceMeta common_models.DeviceMeta
err = the.redisHelper.GetObj(k, &result)
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
result := common_models.DeviceMeta{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := DeviceMetaCache[deviceId]
//if !ok { //去redis查询
// //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
// k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
// //var deviceMeta common_models.DeviceMeta
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
}
func (the *ConfigHelper) GetIotaDevice(deviceId string) (common_models.IotaDevice, error) {
var err error
result, ok := IotaDeviceCache[deviceId]
if !ok { //去redis查询
//iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
err = the.redisHelper.GetObj(k, &result)
result := common_models.IotaDevice{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := IotaDeviceCache[deviceId]
//if !ok { //去redis查询
// //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
// k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
}
func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string) (common_models.DeviceFactorProto, error) {
@ -545,16 +702,41 @@ func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string
}
func (the *ConfigHelper) GetThingStruct(deviceId string) (common_models.ThingStruct, error) {
var err error
result, ok := ThingStructCache[deviceId]
if !ok { //去redis查询
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
err = the.redisHelper.GetObj(k, &result)
ThingStructCache[deviceId] = result
result := common_models.ThingStruct{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := ThingStructCache[deviceId]
//if !ok { //去redis查询
// //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
// k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
//
// err = the.redisHelper.GetObj(k, &result)
// ThingStructCache[deviceId] = result
//}
//return result, err
}
func (the *ConfigHelper) GetDataUnit() ([]common_models.DataUnit, error) {

16
redisHelper.go

@ -31,7 +31,23 @@ func NewRedisHelper(master string, address ...string) *RedisHelper {
}
func (the *RedisHelper) InitialCluster(master string, address ...string) {
var opts *redis.UniversalOptions
if master != "" {
opts = &redis.UniversalOptions{Addrs: address, MasterName: "mymaster", PoolSize: 10}
} else {
opts = &redis.UniversalOptions{Addrs: address, PoolSize: 10}
}
the.rdb = redis.NewUniversalClient(opts)
if the.rdb == nil {
log.Fatal("Failed to initialize Redis client")
}
log.Printf("Redis client initialized with addresses: %s", address)
the.isReady = true
}
func (the *RedisHelper) InitialCluster_old(master string, address ...string) {
if master != "" {
the.rdb = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: address,

7
unitHelper.go

@ -2,7 +2,6 @@ package common_utils
import (
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"strings"
)
@ -12,10 +11,10 @@ type UnitHelper struct {
unitsGroup map[string][]common_models.DataUnit
}
func NewUnitHelper() *UnitHelper {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
func NewUnitHelper(configHelper *ConfigHelper) *UnitHelper {
//redisAddr := configLoad.LoadConfig().GetString("redis.address")
unitHelper := &UnitHelper{
configHelper: NewConfigHelper(redisAddr),
configHelper: configHelper, //NewConfigHelper(redisAddr),
unitsGroup: map[string][]common_models.DataUnit{},
}
unitHelper.getUnitsGroup()

Loading…
Cancel
Save