Browse Source

定时更新设备信息

dev
yfh 3 weeks ago
parent
commit
a9d1c2d0f0
  1. 298
      configHelper.go

298
configHelper.go

@ -19,21 +19,17 @@ import (
var ProtoCache map[string]common_models.Proto
var FormulaCache map[int]common_models.Formula
var DeviceInfoCache map[string]common_models.DeviceInfo
var DeviceMetaCache map[string]common_models.DeviceMeta
var deviceStationIdsCache map[string][]int
var deviceFactorProtoMap map[string]common_models.DeviceFactorProto
var stationCache map[int]common_models.Station //todo 新类型
var IotaDeviceCache map[string]common_models.IotaDevice
var ThingStructCache map[string]common_models.ThingStruct
var DeviceNodeCache map[string]common_models.IotaInstances
var AlarmCodeCache map[string]common_models.AlarmCode
var StructureCache map[int]common_models.Structure
// var stationThreshold map[int]common_models.Threshold
// var aggThreshold map[string]common_models.AggThreshold
// var taskTime *time.Ticker
type ConfigHelper struct {
redisHelper *RedisHelper //普通缓存用
chainedCache *ChainedCache
@ -59,14 +55,6 @@ func initDeviceInfoMapCache() {
IotaDeviceCache = make(map[string]common_models.IotaDevice)
}
if ThingStructCache == nil {
ThingStructCache = make(map[string]common_models.ThingStruct)
}
if DeviceMetaCache == nil {
DeviceMetaCache = make(map[string]common_models.DeviceMeta)
}
if deviceStationIdsCache == nil {
deviceStationIdsCache = make(map[string][]int)
}
@ -94,25 +82,61 @@ func initDeviceInfoMapCache() {
}
}
func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceInfo, error) {
the.mu.Lock()
defer the.mu.Unlock()
func (the *ConfigHelper) GetDeviceInfo(deviceId string, forceUpdate bool) (*common_models.DeviceInfo, error) {
if !forceUpdate {
the.mu.RLock()
deviceInfo, ok := DeviceInfoCache[deviceId]
the.mu.RUnlock()
deviceInfo, ok := DeviceInfoCache[deviceId]
if !ok { //去redis查询
device, err := the.GetIotaDevice(deviceId)
if err != nil {
return nil, err
}
thingStruct, err := the.GetThingStruct(device.ThingId)
if err != nil {
return nil, err
}
iotaMeta, err := the.GetIotaMeta(device.DeviceMetaId)
if err != nil {
return nil, err
if ok {
return &deviceInfo, nil
}
s := common_models.Structure{
}
// 首先获取 device
device, err := the.GetIotaDevice(deviceId)
if err != nil {
return nil, err
}
if device.ThingId == "" || device.DeviceMetaId == "" {
return nil, errors.New("IotaDevice 的 ThingId 或者 DeviceMetaId 为空")
}
// 使用 goroutines 并发加载数据
var wg sync.WaitGroup
var thingStruct common_models.ThingStruct
var iotaMeta common_models.DeviceMeta
var thingErr, metaErr error
// 获取 thingStruct
wg.Add(1)
go func() {
defer wg.Done()
thingStruct, thingErr = the.GetThingStruct(device.ThingId)
}()
// 获取 iotaMeta
wg.Add(1)
go func() {
defer wg.Done()
iotaMeta, metaErr = the.GetIotaMeta(device.DeviceMetaId)
}()
wg.Wait()
// 检查是否有错误
if thingErr != nil {
return nil, thingErr
}
if metaErr != nil {
return nil, metaErr
}
// 构建 DeviceInfo
result := common_models.DeviceInfo{
Id: device.Id,
Name: device.Name,
Structure: common_models.Structure{
ThingId: thingStruct.ThingId,
Id: thingStruct.Id,
Name: thingStruct.Name,
@ -120,18 +144,101 @@ func (the *ConfigHelper) GetDeviceInfo(deviceId string) (*common_models.DeviceIn
OrgId: thingStruct.OrgId,
Latitude: 0,
Longitude: 0,
},
DeviceMeta: iotaMeta,
}
//缓存 deviceInfo
the.mu.Lock()
DeviceInfoCache[deviceId] = result
the.mu.Unlock()
return &result, nil
}
// 定时更新设备信息
func (the *ConfigHelper) StartUpdateDeviceInfo(interval time.Duration, batchSize int) {
log.Printf("==== 启动定时更新 DeviceInfo 服务,interval = %v, batchSize = %d", interval, batchSize)
ticker := time.NewTicker(interval)
go func() {
for {
select {
case <-ticker.C:
func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("[StartUpdateDeviceInfo] Recovered from panic:", r)
}
}()
the.updateDeviceInfoCache(batchSize)
}()
}
}
}()
}
var updatesPool = sync.Pool{
New: func() interface{} {
return make(map[string]common_models.DeviceInfo)
},
}
func (the *ConfigHelper) updateDeviceInfoCache(batchSize int) {
if batchSize < 0 {
panic("batchSize 不能为负数。")
}
deviceIds := make([]string, 0, len(DeviceInfoCache))
for deviceId := range DeviceInfoCache {
deviceIds = append(deviceIds, deviceId)
}
// 批次处理设备
for i := 0; i < len(deviceIds); i += batchSize {
end := i + batchSize
if end > len(deviceIds) {
end = len(deviceIds)
}
batch := deviceIds[i:end]
deviceInfo = common_models.DeviceInfo{
Id: device.Id,
Name: device.Name,
Structure: s,
DeviceMeta: iotaMeta,
var wg sync.WaitGroup
updates := updatesPool.Get().(map[string]common_models.DeviceInfo)
for k := range updates {
delete(updates, k)
}
//缓存deviceInfo
DeviceInfoCache[deviceId] = deviceInfo
var mu sync.Mutex
for _, deviceId := range batch {
wg.Add(1)
go func(id string) {
defer wg.Done()
// 强制更新设备信息
deviceInfo, err := the.GetDeviceInfo(id, true)
if err == nil {
mu.Lock()
updates[id] = *deviceInfo
mu.Unlock()
} else {
fmt.Printf("Failed to update cache for deviceId %s: %v\n", id, err)
}
}(deviceId)
}
wg.Wait()
// 批量更新 DeviceInfoCache
the.mu.Lock()
for id, info := range updates {
DeviceInfoCache[id] = info // 更新缓存
}
the.mu.Unlock()
// 将 updates 放回对象池
updatesPool.Put(updates)
}
return &deviceInfo, nil
}
func (the *ConfigHelper) GetFormulaInfo(formulaId int) (common_models.Formula, error) {
@ -511,27 +618,75 @@ func (the *ConfigHelper) GetAggThreshold(structId int, factorId int) (*common_mo
}
func (the *ConfigHelper) GetIotaMeta(deviceId string) (common_models.DeviceMeta, error) {
var err error
result, ok := DeviceMetaCache[deviceId]
if !ok { //去redis查询
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
//var deviceMeta common_models.DeviceMeta
err = the.redisHelper.GetObj(k, &result)
//iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
result := common_models.DeviceMeta{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := DeviceMetaCache[deviceId]
//if !ok { //去redis查询
// //iota_meta:003540d0-616c-4611-92c1-1cd31005eabf
// k := fmt.Sprintf("%s:%s", redisKey.Iota_meta, deviceId)
// //var deviceMeta common_models.DeviceMeta
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
}
func (the *ConfigHelper) GetIotaDevice(deviceId string) (common_models.IotaDevice, error) {
var err error
result, ok := IotaDeviceCache[deviceId]
if !ok { //去redis查询
//iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
err = the.redisHelper.GetObj(k, &result)
result := common_models.IotaDevice{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := IotaDeviceCache[deviceId]
//if !ok { //去redis查询
// //iota_device:1b06d870-09a8-45e0-a86e-dba539d5edd0
// k := fmt.Sprintf("%s:%s", redisKey.Iota_device, deviceId)
// err = the.redisHelper.GetObj(k, &result)
//}
//return result, err
}
func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string) (common_models.DeviceFactorProto, error) {
@ -547,16 +702,41 @@ func (the *ConfigHelper) GetDeviceFactorProto(factorProtoId, deviceMetaId string
}
func (the *ConfigHelper) GetThingStruct(deviceId string) (common_models.ThingStruct, error) {
var err error
result, ok := ThingStructCache[deviceId]
if !ok { //去redis查询
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
//thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
err = the.redisHelper.GetObj(k, &result)
ThingStructCache[deviceId] = result
result := common_models.ThingStruct{}
value, err := the.chainedCache.LoadableChinCache.Get(the.ctx, k)
if err != nil {
return result, err
}
if value == nil || value == "" {
log.Printf("LoadableChinCache[%s]=空", k)
return result, errors.New("无缓存")
}
if v, ok := value.(string); ok {
if len(v) > 0 {
err := json.Unmarshal([]byte(v), &result)
if err != nil {
log.Printf("json unmarshal error:%s", err.Error())
}
}
}
return result, err
//var err error
//result, ok := ThingStructCache[deviceId]
//if !ok { //去redis查询
// //thing_struct:5da9aa1b-05b7-4943-be57-dedb34f7a1bd
// k := fmt.Sprintf("%s:%s", redisKey.Thing_struct, deviceId)
//
// err = the.redisHelper.GetObj(k, &result)
// ThingStructCache[deviceId] = result
//}
//return result, err
}
func (the *ConfigHelper) GetDataUnit() ([]common_models.DataUnit, error) {

Loading…
Cancel
Save