Browse Source

最后一条测点数据全局变量生命周期优化

dev
18209 3 weeks ago
parent
commit
480700153a
  1. 42
      consumers/consumerAXYThemeToES.go

42
consumers/consumerAXYThemeToES.go

@ -244,6 +244,10 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool {
needPush := adaptor.Transform(topic, msg) needPush := adaptor.Transform(topic, msg)
if needPush != nil && needPush.Data != nil { if needPush != nil && needPush.Data != nil {
// === 修改:在数据通过时效性验证后,立即更新全局缓存 ===
the.updateLatestData(needPush)
// === 结束 ===
the.dataCache <- needPush the.dataCache <- needPush
} else { } else {
s, _ := json.Marshal(needPush) s, _ := json.Marshal(needPush)
@ -293,6 +297,20 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time,
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr)
return time.Time{}, time.Time{}, newErr return time.Time{}, time.Time{}, newErr
} }
// === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)===
if newAtime.After(esAtime) || newAtime.Equal(esAtime) {
// 构建一个临时对象用于缓存时间信息
tempTheme := &models.EsTheme{
Sensor: sensorId,
Structure: structId,
CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致
}
the.updateLatestData(tempTheme)
log.Printf("测点[%d_%d] 缓存时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime)
}
// === 结束 ===
return esAtime, newAtime, nil return esAtime, newAtime, nil
} }
@ -327,6 +345,19 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time,
the.updateLatestData(&TimeTheme[0]) the.updateLatestData(&TimeTheme[0])
} }
// === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)===
if newAtime.After(esAtime) || newAtime.Equal(esAtime) {
// 构建一个临时对象用于缓存时间信息
tempTheme := &models.EsTheme{
Sensor: sensorId,
Structure: structId,
CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致
}
the.updateLatestData(tempTheme)
log.Printf("测点[%d_%d] ES时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime)
}
// === 结束 ===
return esAtime, newAtime, nil return esAtime, newAtime, nil
} else { } else {
// ES中没有数据,使用新数据的时间 // ES中没有数据,使用新数据的时间
@ -338,6 +369,17 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time,
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr)
return time.Time{}, time.Time{}, newErr return time.Time{}, time.Time{}, newErr
} }
// === 修改:ES中没有数据时,也要更新缓存 ===
tempTheme := &models.EsTheme{
Sensor: sensorId,
Structure: structId,
CollectTime: acqTime,
}
the.updateLatestData(tempTheme)
log.Printf("测点[%d_%d] ES中无数据,已更新全局缓存时间为: %s", structId, sensorId, acqTime)
// === 结束 ===
return newAtime, newAtime, nil return newAtime, newAtime, nil
} }
} }

Loading…
Cancel
Save