重建 common_utils
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

896 lines
25 KiB

package common_utils
import (
"context"
"gitea.anxinyun.cn/container/common_models/constant/settlementParam"
"strings"
"sync"
//"encoding/json"
"errors"
"fmt"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_models/constant/redisKey"
json "github.com/bytedance/sonic"
"github.com/eko/gocache/lib/v4/store"
"log"
"time"
)
var ProtoCache map[string]common_models.Proto
var FormulaCache map[int]common_models.Formula
var DeviceInfoCache map[string]common_models.DeviceInfo
var deviceStationIdsCache map[string][]int
var deviceFactorProtoMap map[string]common_models.DeviceFactorProto
var stationCache map[int]common_models.Station //todo 新类型
var IotaDeviceCache map[string]common_models.IotaDevice
var DeviceNodeCache map[string]common_models.IotaInstances
var AlarmCodeCache map[string]common_models.AlarmCode
var StructureCache map[int]common_models.Structure
type ConfigHelper struct {
redisHelper *RedisHelper //普通缓存用
chainedCache *ChainedCache
ctx context.Context
mu sync.RWMutex
}
func NewConfigHelper(redisAddr string) *ConfigHelper {
initDeviceInfoMapCache()
return &ConfigHelper{
redisHelper: NewRedisHelper("", redisAddr),
chainedCache: NewChainedCache(redisAddr),
ctx: context.Background(),
}
}
func initDeviceInfoMapCache() {
if DeviceInfoCache == nil {
DeviceInfoCache = make(map[string]common_models.DeviceInfo)
}
if IotaDeviceCache == nil {
IotaDeviceCache = make(map[string]common_models.IotaDevice)
}
if deviceStationIdsCache == nil {
deviceStationIdsCache = make(map[string][]int)
}
if stationCache == nil {
stationCache = make(map[int]common_models.Station)
}
if deviceFactorProtoMap == nil {
deviceFactorProtoMap = make(map[string]common_models.DeviceFactorProto)
}
if FormulaCache == nil {
FormulaCache = make(map[int]common_models.Formula)
}
if ProtoCache == nil {
ProtoCache = make(map[string]common_models.Proto)
}
if AlarmCodeCache == nil {
AlarmCodeCache = make(map[string]common_models.AlarmCode)
}
if StructureCache == nil {
StructureCache = make(map[int]common_models.Structure)
}
}
func (the *ConfigHelper) GetDeviceInfo(deviceId string, forceUpdate bool) (*common_models.DeviceInfo, error) {
if !forceUpdate {
the.mu.RLock()
deviceInfo, ok := DeviceInfoCache[deviceId]
the.mu.RUnlock()
if ok {
return &deviceInfo, nil
}
}
// 首先获取 device
device, err := the.GetIotaDevice(deviceId)
if err != nil {
return nil, err
}
if device.ThingId == "" || device.DeviceMetaId == "" {
return nil, errors.New("IotaDevice 的 ThingId 或者 DeviceMetaId 为空")
}
// 使用 goroutines 并发加载数据
var wg sync.WaitGroup
var thingStruct common_models.ThingStruct
var iotaMeta common_models.DeviceMeta
var thingErr, metaErr error
// 获取 thingStruct
wg.Add(1)
go func() {
defer wg.Done()
thingStruct, thingErr = the.GetThingStruct(device.ThingId)
}()
// 获取 iotaMeta
wg.Add(1)
go func() {
defer wg.Done()
iotaMeta, metaErr = the.GetIotaMeta(device.DeviceMetaId)
}()
wg.Wait()
// 检查是否有错误
if thingErr != nil {
return nil, thingErr
}
if metaErr != nil {
return nil, metaErr
}
// 构建 DeviceInfo
result := common_models.DeviceInfo{
Id: device.Id,
Name: device.Name,
Structure: common_models.Structure{
ThingId: thingStruct.ThingId,
Id: thingStruct.Id,
Name: thingStruct.Name,
SType: thingStruct.Type,
OrgId: thingStruct.OrgId,
Latitude: 0,
Longitude: 0,
},
DeviceMeta: iotaMeta,
}
//缓存 deviceInfo
the.mu.Lock()
DeviceInfoCache[deviceId] = result
the.mu.Unlock()
return &result, nil
}
// 定时更新设备信息
func (the *ConfigHelper) StartUpdateDeviceInfo(interval time.Duration, batchSize int) {
log.Printf("==== 启动定时更新 DeviceInfo 服务,interval = %v, batchSize = %d", interval, batchSize)
ticker := time.NewTicker(interval)
go func() {
for {
select {
case <-ticker.C:
func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("[StartUpdateDeviceInfo] Recovered from panic:", r)
}
}()
the.updateDeviceInfoCache(batchSize)
}()
}
}
}()
}
var updatesPool = sync.Pool{
New: func() interface{} {
return make(map[string]common_models.DeviceInfo)
},
}
func (the *ConfigHelper) updateDeviceInfoCache(batchSize int) {
if batchSize < 0 {
panic("batchSize 不能为负数。")
}
deviceIds := make([]string, 0, len(DeviceInfoCache))
for deviceId := range DeviceInfoCache {
deviceIds = append(deviceIds, deviceId)
}
// 批次处理设备
for i := 0; i < len(deviceIds); i += batchSize {
end := i + batchSize
if end > len(deviceIds) {
end = len(deviceIds)
}
batch := deviceIds[i:end]
var wg sync.WaitGroup
updates := updatesPool.Get().(map[string]common_models.DeviceInfo)
for k := range updates {
delete(updates, k)
}
var mu sync.Mutex
for _, deviceId := range batch {
wg.Add(1)
go func(id string) {
defer wg.Done()
// 强制更新设备信息
deviceInfo, err := the.GetDeviceInfo(id, true)
if err == nil {
mu.Lock()
updates[id] = *deviceInfo
mu.Unlock()
} else {
fmt.Printf("Failed to update cache for deviceId %s: %v\n", id, err)
}
}(deviceId)
}
wg.Wait()
// 批量更新 DeviceInfoCache
the.mu.Lock()
for id, info := range updates {
DeviceInfoCache[id] = info // 更新缓存
}
the.mu.Unlock()
// 将 updates 放回对象池
updatesPool.Put(updates)
}
}
func (the *ConfigHelper) GetFormulaInfo(formulaId int) (common_models.Formula, error) {
var err error
result, ok := FormulaCache[formulaId]
//去redis查询
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
if !ok {
k := fmt.Sprintf("%s:%d", redisKey.Formula, formulaId)
err = the.redisHelper.GetObj(k, &result)
}
return result, err
}
func (the *ConfigHelper) GetProto(protoCode string) (common_models.Proto, error) {
var err error
resultProto, ok := ProtoCache[protoCode]
//去redis查询
//proto:4004
if !ok {
k := fmt.Sprintf("%s:%s", redisKey.Proto, protoCode)
err = the.redisHelper.GetObj(k, &resultProto)
}
return resultProto, err
}
func (the *ConfigHelper) GetDeviceStationIds(deviceId string) ([]int, error) {
var err error
result, ok := deviceStationIdsCache[deviceId]
if !ok { //去redis查询
result = make([]int, 0)
//result = append(result, 1)
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
k := fmt.Sprintf("%s:%s", redisKey.Device_stationIds, deviceId)
//var deviceMeta common_models.DeviceMeta
s := the.redisHelper.Get(k)
if s == "" {
return result, errors.New(fmt.Sprintf("redis 中无key=[%s] 缓存", k))
}
err = json.Unmarshal([]byte(s), &result)
//err = the.redisHelper.GetObj(k, &result)
}
return result, err
}
func (the *ConfigHelper) SetChainedCacheObj(k string, obj any) error {
var value string
var err error
switch obj.(type) {
case string:
value = obj.(string)
case []byte:
value = string(obj.([]byte))
default:
bs, err := json.Marshal(obj)
if err != nil {
return err
}
value = string(bs)
}
err = the.chainedCache.LoadableChinCache.Set(the.ctx, k, value)
return err
}
func (the *ConfigHelper) SetChainedCacheObjWithExpiration(k string, obj any, duration time.Duration) error {
var value string
if v, ok := obj.(string); !ok {
v, err := json.Marshal(obj)
if err != nil {
return err
}
value = string(v)
} else {
value = v
}
err := the.chainedCache.LoadableChinCache.Set(the.ctx, k, value, store.WithExpiration(duration))
return err
}
func (the *ConfigHelper) DeleteChainedCacheObj(k string) error {
err := the.chainedCache.LoadableChinCache.Delete(the.ctx, k)
return err
}
func (the *ConfigHelper) GetCacheWindowObj(key_cacheWindow string) (common_models.CacheWindow, error) {
var redisCacheWin common_models.CacheWinSave
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, key_cacheWindow)
if err != nil {
return common_models.CacheWindow{}, err
}
//首次正常读取为空
if value == nil || value == "" {
return common_models.CacheWindow{}, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &redisCacheWin)
if err != nil {
return common_models.CacheWindow{}, err
}
}
}
//ring重新初始化
redisCacheWin.CacheWindow.ReInitialRing()
for _, datum := range redisCacheWin.AllData {
redisCacheWin.CacheWindow.EnQueue(datum)
}
return *redisCacheWin.CacheWindow, err
}
func (the *ConfigHelper) GetDeviceStationObjs(deviceId string) ([]common_models.Station, error) {
var result common_models.StationArrayObj
k := fmt.Sprintf("%s:%s", redisKey.Device_stationObjs, deviceId)
//err = the.redisHelper.GetObj(k, &result)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
if v == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, err
}
err = json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
return result, err
}
func (the *ConfigHelper) SetDeviceStationObjs(deviceId string, stations common_models.StationArrayObj) error {
var err error
k := fmt.Sprintf("%s:%s", redisKey.Device_stationObjs, deviceId)
bytes, err := json.Marshal(stations)
if err != nil {
return err
}
go the.SetChainedCacheObj(k, bytes)
return err
}
func (the *ConfigHelper) SetFactorInfo(factorId int, factor common_models.Factor) error {
//factor:105
k := fmt.Sprintf("%s:%d", redisKey.Factor, factorId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &factor)
}
func (the *ConfigHelper) GetFactorInfo(factorId int) (common_models.Factor, error) {
var result common_models.Factor
k := fmt.Sprintf("%s:%d", redisKey.Factor, factorId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
}
}
return result, err
}
func (the *ConfigHelper) SetStationInfo(stationId int, station common_models.StationInfo) error {
//station:105
k := fmt.Sprintf("%s:%d", redisKey.Station, stationId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &station)
}
func (the *ConfigHelper) GetStationInfo(stationId int) (common_models.StationInfo, error) {
var result common_models.StationInfo
k := fmt.Sprintf("%s:%d", redisKey.Station, stationId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
}
}
return result, err
}
func (the *ConfigHelper) getStation(stationId int) (common_models.Station, error) {
var err error
result, ok := stationCache[stationId]
if !ok { //去redis查询
k := fmt.Sprintf("%s:%d", redisKey.Station, stationId)
err = the.redisHelper.GetObj(k, &result.Info)
}
return result, err
}
func (the *ConfigHelper) GetStations(stationIds ...int) ([]common_models.Station, error) {
var err error
result := make([]common_models.Station, 0)
//去redis查询
for _, stationId := range stationIds {
sd, _ := the.getStation(stationId)
sd.Info.Factor, _ = the.GetFactorInfo(sd.Info.FactorId)
sd.Info.Proto, _ = the.GetProto(sd.Info.ProtoCode)
sd.Threshold, _ = the.GetStationThreshold(stationId)
// 设置测点分组信息
stationGroup, _ := the.GetStationGroupInfo(stationId)
sd.Info.Group, _ = the.GetStationGroup(stationGroup.GroupId)
sd.Info.CorrGroups, _ = the.GetStationCorrGroups(stationId)
result = append(result, sd)
}
return result, err
}
// RedisKey.group
func (the *ConfigHelper) SetStationGroup(groupId int, group common_models.StationGroup) error {
k := fmt.Sprintf("%s:%d", redisKey.Group, groupId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &group)
}
// 最大级联层级
const max_corr_depth int = 15
// 关联分组(e.g 沉降级联)
// depth: 递归查询深度(防止错误配置导致基点互相参考;亦限制级联的最大层级)
func (the *ConfigHelper) corrGroups(gp common_models.StationGroup, depth int) {
if gp.GroupType == "202" && gp.Params != nil && len(gp.Params) == 2 {
ref_base_stationId, ok := gp.Params[settlementParam.Ref_base].(float64)
if !ok {
log.Println("ref_base 转换异常。", gp.Params[settlementParam.Ref_base])
return
}
ref_point_stationId, ok := gp.Params[settlementParam.Ref_point].(float64)
if !ok {
log.Println("ref_base 转换异常。", gp.Params[settlementParam.Ref_point])
return
}
basePtr := gp.GetSettlementBaseItem()
if basePtr == nil {
log.Println("无基点。", gp.R())
return
}
sg, err := the.GetStationGroupInfo(int(ref_base_stationId))
subBase := common_models.GroupItem{
StationId: int(ref_base_stationId),
ParamsValue: map[string]interface{}{"base": true},
SubItems: nil,
}
if err == nil {
refGroup, err := the.GetStationGroup(sg.GroupId)
if err == nil {
if depth+1 <= max_corr_depth {
the.corrGroups(refGroup, depth+1)
}
}
subBase = *refGroup.GetItem(int(ref_base_stationId))
}
subPoint := common_models.GroupItem{
StationId: int(ref_point_stationId),
ParamsValue: map[string]interface{}{"base": false},
SubItems: nil,
}
basePtr.SubItems = map[string]common_models.GroupItem{
settlementParam.Ref_base: subBase,
settlementParam.Ref_point: subPoint,
}
//log.Println(basePtr)
}
}
func (the *ConfigHelper) GetStationGroup(groupId int) (common_models.StationGroup, error) {
var group common_models.StationGroup
// group:35
k := fmt.Sprintf("%s:%d", redisKey.Group, groupId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
if v == "" {
return group, err
}
err = json.Unmarshal([]byte(v), &group)
if err != nil {
log.Printf("【redisKey.Group】【%s】json unmarshal error:%s \n", k, err.Error())
}
}
the.corrGroups(group, 0)
return group, err
}
// RedisKey.station_group
func (the *ConfigHelper) SetStationGroupInfo(stationId int, info common_models.StationGroupInfo) error {
k := fmt.Sprintf("%s:%d", redisKey.Station_group, stationId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &info)
}
func (the *ConfigHelper) GetStationGroupInfo(stationId int) (common_models.StationGroupInfo, error) {
// sg:193
k := fmt.Sprintf("%s:%d", redisKey.Station_group, stationId)
var info common_models.StationGroupInfo
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &info)
if err != nil {
log.Printf("【redisKey.Station_group】【%s】json unmarshal error:%s \n", k, err.Error())
}
}
return info, err
}
// RedisKey.station_corr_group
func (the *ConfigHelper) SetStationCorrGroup(stationId int, groupIds []int) error {
k := fmt.Sprintf("%s:%d", redisKey.Station_corr_group, stationId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, groupIds)
}
func (the *ConfigHelper) GetStationCorrGroups(stationId int) ([]common_models.StationGroup, error) {
// scg:193
k := fmt.Sprintf("%s:%d", redisKey.Station_corr_group, stationId)
var groupIds []int
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &groupIds)
if err != nil {
log.Printf("【redisKey.Station_corr_group】【%s】json unmarshal error:%s \n", k, err.Error())
}
}
if err != nil {
return []common_models.StationGroup{}, err
}
// id -> StationGroup
var groups []common_models.StationGroup
if groupIds != nil && len(groupIds) > 0 {
for _, id := range groupIds {
g, err1 := the.GetStationGroup(id)
if err1 == nil {
groups = append(groups, g)
} else {
log.Printf("[ConfigHelper] stationId[%d] corrGroupIds:[%v], get corrGroup[%d] Error: %s\n", stationId, groupIds, id, err1.Error())
}
}
}
return groups, err
}
func (the *ConfigHelper) SetStationThreshold(stationId int, threshold common_models.Threshold) error {
k := fmt.Sprintf("%s:%d", redisKey.Threshold, stationId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &threshold)
}
func (the *ConfigHelper) GetStationThreshold(stationId int) (*common_models.Threshold, error) {
var result common_models.Threshold
//threshold:198
k := fmt.Sprintf("%s:%d", redisKey.Threshold, stationId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
if v != "" {
err = json.Unmarshal([]byte(v), &result.Items)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
}
}
}
return &result, err
}
func (the *ConfigHelper) SetAggThreshold(structId int, factorId int, threshold common_models.AggThreshold) error {
k := fmt.Sprintf("%s:%d:%d", redisKey.Agg_threshold, structId, factorId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &threshold)
}
func (the *ConfigHelper) GetAggThreshold(structId int, factorId int) (*common_models.AggThreshold, error) {
var result common_models.AggThreshold
k := fmt.Sprintf("%s:%d:%d", redisKey.Agg_threshold, structId, factorId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &result.Items)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
}
}
return &result, err
}
func (the *ConfigHelper) GetIotaMeta(deviceId string) (common_models.DeviceMeta, error) {
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
result := common_models.DeviceMeta{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := DeviceMetaCache[deviceId]
//if !ok { //去redis查询
// //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
// k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
// //var deviceMeta common_models.DeviceMeta
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
}
func (the *ConfigHelper) GetIotaDevice(deviceId string) (common_models.IotaDevice, error) {
k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
result := common_models.IotaDevice{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := IotaDeviceCache[deviceId]
//if !ok { //去redis查询
// //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
// k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
}
func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string) (common_models.DeviceFactorProto, error) {
var err error
result, ok := deviceFactorProtoMap[deviceMetaId]
if !ok { //去redis查询
//iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
k := fmt.Sprintf("%s:%s:%s", redisKey.Device_proto, factorProtoId, deviceMetaId)
err = the.redisHelper.GetObj(k, &result)
}
return result, err
}
func (the *ConfigHelper) GetThingStruct(deviceId string) (common_models.ThingStruct, error) {
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
result := common_models.ThingStruct{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := ThingStructCache[deviceId]
//if !ok { //去redis查询
// //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
// k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
//
// err = the.redisHelper.GetObj(k, &result)
// ThingStructCache[deviceId] = result
//}
//return result, err
}
func (the *ConfigHelper) GetDataUnit() ([]common_models.DataUnit, error) {
var err error
result := common_models.DataUnitArray{}
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s", redisKey.Transform_units)
err = the.redisHelper.GetObj(k, &result)
r := []common_models.DataUnit(result)
return r, err
}
func (the *ConfigHelper) GetFilter(stationId int) (common_models.Filter, error) {
var result common_models.Filter
//filter:198
k := fmt.Sprintf("%s:%d", redisKey.Filter, stationId)
err := the.redisHelper.GetObj(k, &result.Items)
return result, err
}
func (the *ConfigHelper) GetFilterItem(stationId int, item string) (common_models.FilterItem, error) {
result, err := the.GetFilter(stationId)
if err == nil {
for _, filterItem := range result.Items {
if filterItem.FieldName == item {
return filterItem, err
}
}
} else {
log.Printf("获取GetFilterItem err=%s", err.Error())
}
return common_models.FilterItem{}, err
}
func (the *ConfigHelper) GetAlarmCode(alarmCode string) (common_models.AlarmCode, error) {
var err error
result, ok := AlarmCodeCache[alarmCode]
if !ok { //去redis查询
k := fmt.Sprintf("%s:%s", redisKey.Alarm_code, alarmCode)
err = the.redisHelper.GetObj(k, &result)
AlarmCodeCache[alarmCode] = result
}
return result, err
}
// RedisKey.Scheme
func (the *ConfigHelper) SetIotaScheme(dimensionId string, scheme common_models.IotaScheme) error {
k := fmt.Sprintf("%s:%s", redisKey.Scheme, dimensionId)
return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &scheme)
}
func (the *ConfigHelper) GetIotaScheme(dimensionId string) (common_models.IotaScheme, error) {
// scheme:01dc6242-84ab-4690-b640-8c21cdffcf39
k := fmt.Sprintf("%s:%s", redisKey.Scheme, dimensionId)
var scheme common_models.IotaScheme
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
formattedStr := strings.Replace(v, "+0800", "+08:00", -1)
err = json.Unmarshal([]byte(formattedStr), &scheme)
if err != nil {
5 months ago
log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error())
}
}
return scheme, err
}
// SAddAlarm 添加Alarm缓存
func (the *ConfigHelper) SAddAlarm(key string, alarmTypes ...string) int64 {
return the.redisHelper.SAdd(key, alarmTypes...)
}
func (the *ConfigHelper) SRemAlarm(key string, alarmTypes ...string) int64 {
return the.redisHelper.SRem(key, alarmTypes...)
}
// 获取指定设备节点下的级联设备(递归)(不包含当前设备节点)
func (the *ConfigHelper) GetSubDeviceNext(deviceId, thingId string) (subDeviceIds []string) {
iotaInstances, ok := DeviceNodeCache[deviceId]
if !ok { //去redis查询
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Deploy, thingId)
//i := common_models.IotaInstances{}
err := the.redisHelper.GetObj(k, &iotaInstances)
if err != nil {
log.Printf("the.redisHelper.GetObj(%s) error:%s \n", k, err.Error())
}
}
//tree := &common_models.DeviceTree{}
for id, instance := range iotaInstances.Instances {
if instance.Type == "s.iota" {
iotaRootId := id
tree := the.mapToTree(iotaInstances.Instances, iotaRootId, 0)
subDeviceIds = tree.SearchSub(deviceId)
}
}
return subDeviceIds
}
func (the *ConfigHelper) GetSubDeviceAll(deviceId, thingId string) (subDeviceIds []string) {
iotaInstances, ok := DeviceNodeCache[deviceId]
if !ok { //去redis查询
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Deploy, thingId)
//i := common_models.IotaInstances{}
err := the.redisHelper.GetObj(k, &iotaInstances)
if err != nil {
log.Printf("the.redisHelper.GetObj(%s) error:%s \n", k, err.Error())
}
}
//tree := &common_models.DeviceTree{}
for id, instance := range iotaInstances.Instances {
if instance.Type == "s.iota" {
iotaRootId := id
tree := the.mapToTree(iotaInstances.Instances, iotaRootId, 0)
subDeviceIds = tree.SearchSubAll(deviceId)
}
}
return subDeviceIds
}
func (the *ConfigHelper) mapToTree(source map[string]common_models.IotaInstance, pid string, depth int) (newArr common_models.DeviceNode) {
deviceNode := common_models.DeviceNode{
Id: pid,
Name: source[pid].Instance.Name,
Depth: depth,
Child: nil,
}
for _, v := range source {
//log.Printf("[%s]%s %s", v.Type, k, v.Instance.Name)
if v.Instance.To.OwnerSvgId == pid {
childId := v.Instance.From.OwnerSvgId
//log.Printf("查找[%s]的子设备[%s] ", deviceNode.Name, childId)
deviceNode.Child = append(deviceNode.Child, the.mapToTree(source, childId, depth+1))
}
}
return deviceNode
}
func (the *ConfigHelper) GetStructure(structId int) (common_models.Structure, error) {
var result common_models.Structure
// structure:1
k := fmt.Sprintf("%s:%d", redisKey.Structure, structId)
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if v, ok := value.(string); ok {
err = json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s \n", err.Error())
}
}
return result, err
}