package common_utils import ( "context" "gitea.anxinyun.cn/container/common_models/constant/settlementParam" "strings" "sync" //"encoding/json" "errors" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models/constant/redisKey" json "github.com/bytedance/sonic" "github.com/eko/gocache/lib/v4/store" "log" "time" ) var ProtoCache map[string]common_models.Proto var FormulaCache map[int]common_models.Formula var DeviceInfoCache map[string]common_models.DeviceInfo var DeviceMetaCache map[string]common_models.DeviceMeta var deviceStationIdsCache map[string][]int var deviceFactorProtoMap map[string]common_models.DeviceFactorProto var stationCache map[int]common_models.Station //todo 新类型 var IotaDeviceCache map[string]common_models.IotaDevice var ThingStructCache map[string]common_models.ThingStruct var DeviceNodeCache map[string]common_models.IotaInstances var AlarmCodeCache map[string]common_models.AlarmCode var StructureCache map[int]common_models.Structure // var stationThreshold map[int]common_models.Threshold // var aggThreshold map[string]common_models.AggThreshold // var taskTime *time.Ticker type ConfigHelper struct { redisHelper *RedisHelper //普通缓存用 chainedCache *ChainedCache ctx context.Context mu sync.RWMutex } func NewConfigHelper(redisAddr string) *ConfigHelper { initDeviceInfoMapCache() return &ConfigHelper{ redisHelper: NewRedisHelper("", redisAddr), chainedCache: NewChainedCache(redisAddr), ctx: context.Background(), } } func initDeviceInfoMapCache() { if DeviceInfoCache == nil { DeviceInfoCache = make(map[string]common_models.DeviceInfo) } if IotaDeviceCache == nil { IotaDeviceCache = make(map[string]common_models.IotaDevice) } if ThingStructCache == nil { ThingStructCache = make(map[string]common_models.ThingStruct) } if DeviceMetaCache == nil { DeviceMetaCache = make(map[string]common_models.DeviceMeta) } if deviceStationIdsCache == nil { deviceStationIdsCache = make(map[string][]int) } if stationCache == nil { stationCache = make(map[int]common_models.Station) } if deviceFactorProtoMap == nil { deviceFactorProtoMap = make(map[string]common_models.DeviceFactorProto) } if FormulaCache == nil { FormulaCache = make(map[int]common_models.Formula) } if ProtoCache == nil { ProtoCache = make(map[string]common_models.Proto) } if AlarmCodeCache == nil { AlarmCodeCache = make(map[string]common_models.AlarmCode) } if StructureCache == nil { StructureCache = make(map[int]common_models.Structure) } } func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceInfo, error) { the.mu.Lock() defer the.mu.Unlock() deviceInfo, ok := DeviceInfoCache[deviceId] if !ok { //去redis查询 device, err := the.GetIotaDevice(deviceId) if err != nil { return nil, err } thingStruct, err := the.GetThingStruct(device.ThingId) if err != nil { return nil, err } iotaMeta, err := the.GetIotaMeta(device.DeviceMetaId) if err != nil { return nil, err } s := common_models.Structure{ ThingId: thingStruct.ThingId, Id: thingStruct.Id, Name: thingStruct.Name, SType: thingStruct.Type, OrgId: thingStruct.OrgId, Latitude: 0, Longitude: 0, } deviceInfo = common_models.DeviceInfo{ Id: device.Id, Name: device.Name, Structure: s, DeviceMeta: iotaMeta, } //缓存deviceInfo DeviceInfoCache[deviceId] = deviceInfo } return &deviceInfo, nil } func (the *ConfigHelper) GetFormulaInfo(formulaId int) (common_models.Formula, error) { var err error result, ok := FormulaCache[formulaId] //去redis查询 //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf if !ok { k := fmt.Sprintf("%s:%d", redisKey.Formula, formulaId) err = the.redisHelper.GetObj(k, &result) } return result, err } func (the *ConfigHelper) GetProto(protoCode string) (common_models.Proto, error) { var err error resultProto, ok := ProtoCache[protoCode] //去redis查询 //proto:4004 if !ok { k := fmt.Sprintf("%s:%s", redisKey.Proto, protoCode) err = the.redisHelper.GetObj(k, &resultProto) } return resultProto, err } func (the *ConfigHelper) GetDeviceStationIds(deviceId string) ([]int, error) { var err error result, ok := deviceStationIdsCache[deviceId] if !ok { //去redis查询 result = make([]int, 0) //result = append(result, 1) //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf k := fmt.Sprintf("%s:%s", redisKey.Device_stationIds, deviceId) //var deviceMeta common_models.DeviceMeta s := the.redisHelper.Get(k) if s == "" { return result, errors.New(fmt.Sprintf("redis 中无key=[%s] 缓存", k)) } err = json.Unmarshal([]byte(s), &result) //err = the.redisHelper.GetObj(k, &result) } return result, err } func (the *ConfigHelper) SetChainedCacheObj(k string, obj any) error { var value string var err error switch obj.(type) { case string: value = obj.(string) case []byte: value = string(obj.([]byte)) default: bs, err := json.Marshal(obj) if err != nil { return err } value = string(bs) } err = the.chainedCache.LoadableChinCache.Set(the.ctx, k, value) return err } func (the *ConfigHelper) SetChainedCacheObjWithExpiration(k string, obj any, duration time.Duration) error { var value string if v, ok := obj.(string); !ok { v, err := json.Marshal(obj) if err != nil { return err } value = string(v) } else { value = v } err := the.chainedCache.LoadableChinCache.Set(the.ctx, k, value, store.WithExpiration(duration)) return err } func (the *ConfigHelper) DeleteChainedCacheObj(k string) error { err := the.chainedCache.LoadableChinCache.Delete(the.ctx, k) return err } func (the *ConfigHelper) GetCacheWindowObj(key_cacheWindow string) (common_models.CacheWindow, error) { var redisCacheWin common_models.CacheWinSave value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, key_cacheWindow) if err != nil { return common_models.CacheWindow{}, err } //首次正常读取为空 if value == nil || value == "" { return common_models.CacheWindow{}, errors.New("无缓存") } if v, ok := value.(string); ok { if len(v) > 0 { err := json.Unmarshal([]byte(v), &redisCacheWin) if err != nil { return common_models.CacheWindow{}, err } } } //ring重新初始化 redisCacheWin.CacheWindow.ReInitialRing() for _, datum := range redisCacheWin.AllData { redisCacheWin.CacheWindow.EnQueue(datum) } return *redisCacheWin.CacheWindow, err } func (the *ConfigHelper) GetDeviceStationObjs(deviceId string) ([]common_models.Station, error) { var result common_models.StationArrayObj k := fmt.Sprintf("%s:%s", redisKey.Device_stationObjs, deviceId) //err = the.redisHelper.GetObj(k, &result) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { if v == "" { log.Printf("LoadableChinCache[%s]=空", k) return result, err } err = json.Unmarshal([]byte(v), &result) if err != nil { log.Printf("json unmarshal error:%s", err.Error()) } } return result, err } func (the *ConfigHelper) SetDeviceStationObjs(deviceId string, stations common_models.StationArrayObj) error { var err error k := fmt.Sprintf("%s:%s", redisKey.Device_stationObjs, deviceId) bytes, err := json.Marshal(stations) if err != nil { return err } go the.SetChainedCacheObj(k, bytes) return err } func (the *ConfigHelper) SetFactorInfo(factorId int, factor common_models.Factor) error { //factor:105 k := fmt.Sprintf("%s:%d", redisKey.Factor, factorId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &factor) } func (the *ConfigHelper) GetFactorInfo(factorId int) (common_models.Factor, error) { var result common_models.Factor k := fmt.Sprintf("%s:%d", redisKey.Factor, factorId) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { err = json.Unmarshal([]byte(v), &result) if err != nil { log.Printf("json unmarshal error:%s \n", err.Error()) } } return result, err } func (the *ConfigHelper) SetStationInfo(stationId int, station common_models.StationInfo) error { //station:105 k := fmt.Sprintf("%s:%d", redisKey.Station, stationId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &station) } func (the *ConfigHelper) GetStationInfo(stationId int) (common_models.StationInfo, error) { var result common_models.StationInfo k := fmt.Sprintf("%s:%d", redisKey.Station, stationId) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { err = json.Unmarshal([]byte(v), &result) if err != nil { log.Printf("json unmarshal error:%s \n", err.Error()) } } return result, err } func (the *ConfigHelper) getStation(stationId int) (common_models.Station, error) { var err error result, ok := stationCache[stationId] if !ok { //去redis查询 k := fmt.Sprintf("%s:%d", redisKey.Station, stationId) err = the.redisHelper.GetObj(k, &result.Info) } return result, err } func (the *ConfigHelper) GetStations(stationIds ...int) ([]common_models.Station, error) { var err error result := make([]common_models.Station, 0) //去redis查询 for _, stationId := range stationIds { sd, _ := the.getStation(stationId) sd.Info.Factor, _ = the.GetFactorInfo(sd.Info.FactorId) sd.Info.Proto, _ = the.GetProto(sd.Info.ProtoCode) sd.Threshold, _ = the.GetStationThreshold(stationId) // 设置测点分组信息 stationGroup, _ := the.GetStationGroupInfo(stationId) sd.Info.Group, _ = the.GetStationGroup(stationGroup.GroupId) sd.Info.CorrGroups, _ = the.GetStationCorrGroups(stationId) result = append(result, sd) } return result, err } // RedisKey.group func (the *ConfigHelper) SetStationGroup(groupId int, group common_models.StationGroup) error { k := fmt.Sprintf("%s:%d", redisKey.Group, groupId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &group) } // 最大级联层级 const max_corr_depth int = 15 // 关联分组(e.g 沉降级联) // depth: 递归查询深度(防止错误配置导致基点互相参考;亦限制级联的最大层级) func (the *ConfigHelper) corrGroups(gp common_models.StationGroup, depth int) { if gp.GroupType == "202" && gp.Params != nil && len(gp.Params) == 2 { ref_base_stationId, ok := gp.Params[settlementParam.Ref_base].(float64) if !ok { log.Println("ref_base 转换异常。", gp.Params[settlementParam.Ref_base]) return } ref_point_stationId, ok := gp.Params[settlementParam.Ref_point].(float64) if !ok { log.Println("ref_base 转换异常。", gp.Params[settlementParam.Ref_point]) return } basePtr := gp.GetSettlementBaseItem() if basePtr == nil { log.Println("无基点。", gp.R()) return } sg, err := the.GetStationGroupInfo(int(ref_base_stationId)) subBase := common_models.GroupItem{ StationId: int(ref_base_stationId), ParamsValue: map[string]interface{}{"base": true}, SubItems: nil, } if err == nil { refGroup, err := the.GetStationGroup(sg.GroupId) if err == nil { if depth+1 <= max_corr_depth { the.corrGroups(refGroup, depth+1) } } subBase = *refGroup.GetItem(int(ref_base_stationId)) } subPoint := common_models.GroupItem{ StationId: int(ref_point_stationId), ParamsValue: map[string]interface{}{"base": false}, SubItems: nil, } basePtr.SubItems = map[string]common_models.GroupItem{ settlementParam.Ref_base: subBase, settlementParam.Ref_point: subPoint, } //log.Println(basePtr) } } func (the *ConfigHelper) GetStationGroup(groupId int) (common_models.StationGroup, error) { var group common_models.StationGroup // group:35 k := fmt.Sprintf("%s:%d", redisKey.Group, groupId) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { if v == "" { return group, err } err = json.Unmarshal([]byte(v), &group) if err != nil { log.Printf("【redisKey.Group】【%s】json unmarshal error:%s \n", k, err.Error()) } } the.corrGroups(group, 0) return group, err } // RedisKey.station_group func (the *ConfigHelper) SetStationGroupInfo(stationId int, info common_models.StationGroupInfo) error { k := fmt.Sprintf("%s:%d", redisKey.Station_group, stationId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &info) } func (the *ConfigHelper) GetStationGroupInfo(stationId int) (common_models.StationGroupInfo, error) { // sg:193 k := fmt.Sprintf("%s:%d", redisKey.Station_group, stationId) var info common_models.StationGroupInfo value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { err = json.Unmarshal([]byte(v), &info) if err != nil { log.Printf("【redisKey.Station_group】【%s】json unmarshal error:%s \n", k, err.Error()) } } return info, err } // RedisKey.station_corr_group func (the *ConfigHelper) SetStationCorrGroup(stationId int, groupIds []int) error { k := fmt.Sprintf("%s:%d", redisKey.Station_corr_group, stationId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, groupIds) } func (the *ConfigHelper) GetStationCorrGroups(stationId int) ([]common_models.StationGroup, error) { // scg:193 k := fmt.Sprintf("%s:%d", redisKey.Station_corr_group, stationId) var groupIds []int value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { err = json.Unmarshal([]byte(v), &groupIds) if err != nil { log.Printf("【redisKey.Station_corr_group】【%s】json unmarshal error:%s \n", k, err.Error()) } } if err != nil { return []common_models.StationGroup{}, err } // id -> StationGroup var groups []common_models.StationGroup if groupIds != nil && len(groupIds) > 0 { for _, id := range groupIds { g, err1 := the.GetStationGroup(id) if err1 == nil { groups = append(groups, g) } else { log.Printf("[ConfigHelper] stationId[%d] corrGroupIds:[%v], get corrGroup[%d] Error: %s\n", stationId, groupIds, id, err1.Error()) } } } return groups, err } func (the *ConfigHelper) SetStationThreshold(stationId int, threshold common_models.Threshold) error { k := fmt.Sprintf("%s:%d", redisKey.Threshold, stationId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &threshold) } func (the *ConfigHelper) GetStationThreshold(stationId int) (*common_models.Threshold, error) { var result common_models.Threshold //threshold:198 k := fmt.Sprintf("%s:%d", redisKey.Threshold, stationId) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { if v != "" { err = json.Unmarshal([]byte(v), &result.Items) if err != nil { log.Printf("json unmarshal error:%s \n", err.Error()) } } } return &result, err } func (the *ConfigHelper) SetAggThreshold(structId int, factorId int, threshold common_models.AggThreshold) error { k := fmt.Sprintf("%s:%d:%d", redisKey.Agg_threshold, structId, factorId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &threshold) } func (the *ConfigHelper) GetAggThreshold(structId int, factorId int) (*common_models.AggThreshold, error) { var result common_models.AggThreshold k := fmt.Sprintf("%s:%d:%d", redisKey.Agg_threshold, structId, factorId) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { err = json.Unmarshal([]byte(v), &result.Items) if err != nil { log.Printf("json unmarshal error:%s \n", err.Error()) } } return &result, err } func (the *ConfigHelper) GetIotaMeta(deviceId string) (common_models.DeviceMeta, error) { var err error result, ok := DeviceMetaCache[deviceId] if !ok { //去redis查询 //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId) //var deviceMeta common_models.DeviceMeta err = the.redisHelper.GetObj(k, &result) } return result, err } func (the *ConfigHelper) GetIotaDevice(deviceId string) (common_models.IotaDevice, error) { var err error result, ok := IotaDeviceCache[deviceId] if !ok { //去redis查询 //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0 k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId) err = the.redisHelper.GetObj(k, &result) } return result, err } func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string) (common_models.DeviceFactorProto, error) { var err error result, ok := deviceFactorProtoMap[deviceMetaId] if !ok { //去redis查询 //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0 k := fmt.Sprintf("%s:%s:%s", redisKey.Device_proto, factorProtoId, deviceMetaId) err = the.redisHelper.GetObj(k, &result) } return result, err } func (the *ConfigHelper) GetThingStruct(deviceId string) (common_models.ThingStruct, error) { var err error result, ok := ThingStructCache[deviceId] if !ok { //去redis查询 //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId) err = the.redisHelper.GetObj(k, &result) ThingStructCache[deviceId] = result } return result, err } func (the *ConfigHelper) GetDataUnit() ([]common_models.DataUnit, error) { var err error result := common_models.DataUnitArray{} //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd k := fmt.Sprintf("%s", redisKey.Transform_units) err = the.redisHelper.GetObj(k, &result) r := []common_models.DataUnit(result) return r, err } func (the *ConfigHelper) GetFilter(stationId int) (common_models.Filter, error) { var result common_models.Filter //filter:198 k := fmt.Sprintf("%s:%d", redisKey.Filter, stationId) err := the.redisHelper.GetObj(k, &result.Items) return result, err } func (the *ConfigHelper) GetFilterItem(stationId int, item string) (common_models.FilterItem, error) { result, err := the.GetFilter(stationId) if err == nil { for _, filterItem := range result.Items { if filterItem.FieldName == item { return filterItem, err } } } else { log.Printf("获取GetFilterItem err=%s", err.Error()) } return common_models.FilterItem{}, err } func (the *ConfigHelper) GetAlarmCode(alarmCode string) (common_models.AlarmCode, error) { var err error result, ok := AlarmCodeCache[alarmCode] if !ok { //去redis查询 k := fmt.Sprintf("%s:%s", redisKey.Alarm_code, alarmCode) err = the.redisHelper.GetObj(k, &result) AlarmCodeCache[alarmCode] = result } return result, err } // RedisKey.Scheme func (the *ConfigHelper) SetIotaScheme(dimensionId string, scheme common_models.IotaScheme) error { k := fmt.Sprintf("%s:%s", redisKey.Scheme, dimensionId) return the.chainedCache.LoadableChinCache.Set(the.ctx, k, &scheme) } func (the *ConfigHelper) GetIotaScheme(dimensionId string) (common_models.IotaScheme, error) { // scheme:01dc6242-84ab-4690-b640-8c21cdffcf39 k := fmt.Sprintf("%s:%s", redisKey.Scheme, dimensionId) var scheme common_models.IotaScheme value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { formattedStr := strings.Replace(v, "+0800", "+08:00", -1) err = json.Unmarshal([]byte(formattedStr), &scheme) if err != nil { log.Printf("【GetIotaScheme】json unmarshal error:%s \n", err.Error()) } } return scheme, err } // SAddAlarm 添加Alarm缓存 func (the *ConfigHelper) SAddAlarm(key string, alarmTypes ...string) int64 { return the.redisHelper.SAdd(key, alarmTypes...) } func (the *ConfigHelper) SRemAlarm(key string, alarmTypes ...string) int64 { return the.redisHelper.SRem(key, alarmTypes...) } // 获取指定设备节点下的级联设备(递归)(不包含当前设备节点) func (the *ConfigHelper) GetSubDeviceNext(deviceId, thingId string) (subDeviceIds []string) { iotaInstances, ok := DeviceNodeCache[deviceId] if !ok { //去redis查询 //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd k := fmt.Sprintf("%s:%s", redisKey.Deploy, thingId) //i := common_models.IotaInstances{} err := the.redisHelper.GetObj(k, &iotaInstances) if err != nil { log.Printf("the.redisHelper.GetObj(%s) error:%s \n", k, err.Error()) } } //tree := &common_models.DeviceTree{} for id, instance := range iotaInstances.Instances { if instance.Type == "s.iota" { iotaRootId := id tree := the.mapToTree(iotaInstances.Instances, iotaRootId, 0) subDeviceIds = tree.SearchSub(deviceId) } } return subDeviceIds } func (the *ConfigHelper) GetSubDeviceAll(deviceId, thingId string) (subDeviceIds []string) { iotaInstances, ok := DeviceNodeCache[deviceId] if !ok { //去redis查询 //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd k := fmt.Sprintf("%s:%s", redisKey.Deploy, thingId) //i := common_models.IotaInstances{} err := the.redisHelper.GetObj(k, &iotaInstances) if err != nil { log.Printf("the.redisHelper.GetObj(%s) error:%s \n", k, err.Error()) } } //tree := &common_models.DeviceTree{} for id, instance := range iotaInstances.Instances { if instance.Type == "s.iota" { iotaRootId := id tree := the.mapToTree(iotaInstances.Instances, iotaRootId, 0) subDeviceIds = tree.SearchSubAll(deviceId) } } return subDeviceIds } func (the *ConfigHelper) mapToTree(source map[string]common_models.IotaInstance, pid string, depth int) (newArr common_models.DeviceNode) { deviceNode := common_models.DeviceNode{ Id: pid, Name: source[pid].Instance.Name, Depth: depth, Child: nil, } for _, v := range source { //log.Printf("[%s]%s %s", v.Type, k, v.Instance.Name) if v.Instance.To.OwnerSvgId == pid { childId := v.Instance.From.OwnerSvgId //log.Printf("查找[%s]的子设备[%s] ", deviceNode.Name, childId) deviceNode.Child = append(deviceNode.Child, the.mapToTree(source, childId, depth+1)) } } return deviceNode } func (the *ConfigHelper) GetStructure(structId int) (common_models.Structure, error) { var result common_models.Structure // structure:1 k := fmt.Sprintf("%s:%d", redisKey.Structure, structId) value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) if v, ok := value.(string); ok { err = json.Unmarshal([]byte(v), &result) if err != nil { log.Printf("json unmarshal error:%s \n", err.Error()) } } return result, err }