diff --git a/configHelper.go b/configHelper.go index 9bc21ed..a293c4f 100644 --- a/configHelper.go +++ b/configHelper.go @@ -19,21 +19,17 @@ import ( var ProtoCache map[string]common_models.Proto var FormulaCache map[int]common_models.Formula + var DeviceInfoCache map[string]common_models.DeviceInfo -var DeviceMetaCache map[string]common_models.DeviceMeta + var deviceStationIdsCache map[string][]int var deviceFactorProtoMap map[string]common_models.DeviceFactorProto var stationCache map[int]common_models.Station //todo 新类型 var IotaDeviceCache map[string]common_models.IotaDevice -var ThingStructCache map[string]common_models.ThingStruct var DeviceNodeCache map[string]common_models.IotaInstances var AlarmCodeCache map[string]common_models.AlarmCode var StructureCache map[int]common_models.Structure -// var stationThreshold map[int]common_models.Threshold -// var aggThreshold map[string]common_models.AggThreshold -// var taskTime *time.Ticker - type ConfigHelper struct { redisHelper *RedisHelper //普通缓存用 chainedCache *ChainedCache @@ -59,14 +55,6 @@ func initDeviceInfoMapCache() { IotaDeviceCache = make(map[string]common_models.IotaDevice) } - if ThingStructCache == nil { - ThingStructCache = make(map[string]common_models.ThingStruct) - } - - if DeviceMetaCache == nil { - DeviceMetaCache = make(map[string]common_models.DeviceMeta) - } - if deviceStationIdsCache == nil { deviceStationIdsCache = make(map[string][]int) } @@ -94,25 +82,61 @@ func initDeviceInfoMapCache() { } } -func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceInfo, error) { - the.mu.Lock() - defer the.mu.Unlock() +func (the *ConfigHelper) GetDeviceInfo(deviceId string, forceUpdate bool) (*common_models.DeviceInfo, error) { + if !forceUpdate { + the.mu.RLock() + deviceInfo, ok := DeviceInfoCache[deviceId] + the.mu.RUnlock() - deviceInfo, ok := DeviceInfoCache[deviceId] - if !ok { //去redis查询 - device, err := the.GetIotaDevice(deviceId) - if err != nil { - return nil, err - } - thingStruct, err := the.GetThingStruct(device.ThingId) - if err != nil { - return nil, err - } - iotaMeta, err := the.GetIotaMeta(device.DeviceMetaId) - if err != nil { - return nil, err + if ok { + return &deviceInfo, nil } - s := common_models.Structure{ + } + + // 首先获取 device + device, err := the.GetIotaDevice(deviceId) + if err != nil { + return nil, err + } + if device.ThingId == "" || device.DeviceMetaId == "" { + return nil, errors.New("IotaDevice 的 ThingId 或者 DeviceMetaId 为空") + } + + // 使用 goroutines 并发加载数据 + var wg sync.WaitGroup + var thingStruct common_models.ThingStruct + var iotaMeta common_models.DeviceMeta + var thingErr, metaErr error + + // 获取 thingStruct + wg.Add(1) + go func() { + defer wg.Done() + thingStruct, thingErr = the.GetThingStruct(device.ThingId) + }() + + // 获取 iotaMeta + wg.Add(1) + go func() { + defer wg.Done() + iotaMeta, metaErr = the.GetIotaMeta(device.DeviceMetaId) + }() + + wg.Wait() + + // 检查是否有错误 + if thingErr != nil { + return nil, thingErr + } + if metaErr != nil { + return nil, metaErr + } + + // 构建 DeviceInfo + result := common_models.DeviceInfo{ + Id: device.Id, + Name: device.Name, + Structure: common_models.Structure{ ThingId: thingStruct.ThingId, Id: thingStruct.Id, Name: thingStruct.Name, @@ -120,18 +144,101 @@ func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceIn OrgId: thingStruct.OrgId, Latitude: 0, Longitude: 0, + }, + DeviceMeta: iotaMeta, + } + + //缓存 deviceInfo + the.mu.Lock() + DeviceInfoCache[deviceId] = result + the.mu.Unlock() + + return &result, nil +} + +// 定时更新设备信息 +func (the *ConfigHelper) StartUpdateDeviceInfo(interval time.Duration, batchSize int) { + log.Printf("==== 启动定时更新 DeviceInfo 服务,interval = %v, batchSize = %d", interval, batchSize) + + ticker := time.NewTicker(interval) + go func() { + for { + select { + case <-ticker.C: + func() { + defer func() { + if r := recover(); r != nil { + fmt.Println("[StartUpdateDeviceInfo] Recovered from panic:", r) + } + }() + the.updateDeviceInfoCache(batchSize) + }() + + } + } + }() +} + +var updatesPool = sync.Pool{ + New: func() interface{} { + return make(map[string]common_models.DeviceInfo) + }, +} + +func (the *ConfigHelper) updateDeviceInfoCache(batchSize int) { + if batchSize < 0 { + panic("batchSize 不能为负数。") + } + + deviceIds := make([]string, 0, len(DeviceInfoCache)) + for deviceId := range DeviceInfoCache { + deviceIds = append(deviceIds, deviceId) + } + + // 批次处理设备 + for i := 0; i < len(deviceIds); i += batchSize { + end := i + batchSize + if end > len(deviceIds) { + end = len(deviceIds) } + batch := deviceIds[i:end] - deviceInfo = common_models.DeviceInfo{ - Id: device.Id, - Name: device.Name, - Structure: s, - DeviceMeta: iotaMeta, + var wg sync.WaitGroup + updates := updatesPool.Get().(map[string]common_models.DeviceInfo) + for k := range updates { + delete(updates, k) } - //缓存deviceInfo - DeviceInfoCache[deviceId] = deviceInfo + + var mu sync.Mutex + + for _, deviceId := range batch { + wg.Add(1) + go func(id string) { + defer wg.Done() + + // 强制更新设备信息 + deviceInfo, err := the.GetDeviceInfo(id, true) + if err == nil { + mu.Lock() + updates[id] = *deviceInfo + mu.Unlock() + } else { + fmt.Printf("Failed to update cache for deviceId %s: %v\n", id, err) + } + }(deviceId) + } + wg.Wait() + + // 批量更新 DeviceInfoCache + the.mu.Lock() + for id, info := range updates { + DeviceInfoCache[id] = info // 更新缓存 + } + the.mu.Unlock() + + // 将 updates 放回对象池 + updatesPool.Put(updates) } - return &deviceInfo, nil } func (the *ConfigHelper) GetFormulaInfo(formulaId int) (common_models.Formula, error) { @@ -511,27 +618,75 @@ func (the *ConfigHelper) GetAggThreshold(structId int, factorId int) (*common_mo } func (the *ConfigHelper) GetIotaMeta(deviceId string) (common_models.DeviceMeta, error) { - var err error - result, ok := DeviceMetaCache[deviceId] - if !ok { //去redis查询 - //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf - k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId) - //var deviceMeta common_models.DeviceMeta - err = the.redisHelper.GetObj(k, &result) + //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf + k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId) + + result := common_models.DeviceMeta{} + value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) + if err != nil { + return result, err + } + + if value == nil || value == "" { + log.Printf("LoadableChinCache[%s]=空", k) + return result, errors.New("无缓存") + } + + if v, ok := value.(string); ok { + if len(v) > 0 { + err := json.Unmarshal([]byte(v), &result) + if err != nil { + log.Printf("json unmarshal error:%s", err.Error()) + } + } } + return result, err + + //var err error + //result, ok := DeviceMetaCache[deviceId] + //if !ok { //去redis查询 + // //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf + // k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId) + // //var deviceMeta common_models.DeviceMeta + // err = the.redisHelper.GetObj(k, &result) + //} + //return result, err } func (the *ConfigHelper) GetIotaDevice(deviceId string) (common_models.IotaDevice, error) { - var err error - result, ok := IotaDeviceCache[deviceId] - if !ok { //去redis查询 - //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0 - k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId) + k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId) - err = the.redisHelper.GetObj(k, &result) + result := common_models.IotaDevice{} + value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) + if err != nil { + return result, err + } + + if value == nil || value == "" { + log.Printf("LoadableChinCache[%s]=空", k) + return result, errors.New("无缓存") } + + if v, ok := value.(string); ok { + if len(v) > 0 { + err := json.Unmarshal([]byte(v), &result) + if err != nil { + log.Printf("json unmarshal error:%s", err.Error()) + } + } + } + return result, err + + //var err error + //result, ok := IotaDeviceCache[deviceId] + //if !ok { //去redis查询 + // //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0 + // k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId) + // err = the.redisHelper.GetObj(k, &result) + //} + //return result, err } func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string) (common_models.DeviceFactorProto, error) { @@ -547,16 +702,41 @@ func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string } func (the *ConfigHelper) GetThingStruct(deviceId string) (common_models.ThingStruct, error) { - var err error - result, ok := ThingStructCache[deviceId] - if !ok { //去redis查询 - //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd - k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId) + //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd + k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId) - err = the.redisHelper.GetObj(k, &result) - ThingStructCache[deviceId] = result + result := common_models.ThingStruct{} + value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k) + if err != nil { + return result, err + } + + if value == nil || value == "" { + log.Printf("LoadableChinCache[%s]=空", k) + return result, errors.New("无缓存") } + + if v, ok := value.(string); ok { + if len(v) > 0 { + err := json.Unmarshal([]byte(v), &result) + if err != nil { + log.Printf("json unmarshal error:%s", err.Error()) + } + } + } + return result, err + + //var err error + //result, ok := ThingStructCache[deviceId] + //if !ok { //去redis查询 + // //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd + // k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId) + // + // err = the.redisHelper.GetObj(k, &result) + // ThingStructCache[deviceId] = result + //} + //return result, err } func (the *ConfigHelper) GetDataUnit() ([]common_models.DataUnit, error) {