diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index bfb292b..ca9046f 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -244,6 +244,10 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { needPush := adaptor.Transform(topic, msg) if needPush != nil && needPush.Data != nil { + // === 修改:在数据通过时效性验证后,立即更新全局缓存 === + the.updateLatestData(needPush) + // === 结束 === + the.dataCache <- needPush } else { s, _ := json.Marshal(needPush) @@ -293,6 +297,20 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } + + // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== + if newAtime.After(esAtime) || newAtime.Equal(esAtime) { + // 构建一个临时对象用于缓存时间信息 + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] 缓存时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) + } + // === 结束 === + return esAtime, newAtime, nil } @@ -327,6 +345,19 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, the.updateLatestData(&TimeTheme[0]) } + // === 修改:在判断通过后立即更新缓存(即使稍后会在onData中用完整对象覆盖)=== + if newAtime.After(esAtime) || newAtime.Equal(esAtime) { + // 构建一个临时对象用于缓存时间信息 + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, // 注意:这里使用原始字符串,格式需与缓存读取时一致 + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] ES时效检查通过,已立即更新全局缓存时间为: %s", structId, sensorId, acqTime) + } + // === 结束 === + return esAtime, newAtime, nil } else { // ES中没有数据,使用新数据的时间 @@ -338,6 +369,17 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) return time.Time{}, time.Time{}, newErr } + + // === 修改:ES中没有数据时,也要更新缓存 === + tempTheme := &models.EsTheme{ + Sensor: sensorId, + Structure: structId, + CollectTime: acqTime, + } + the.updateLatestData(tempTheme) + log.Printf("测点[%d_%d] ES中无数据,已更新全局缓存时间为: %s", structId, sensorId, acqTime) + // === 结束 === + return newAtime, newAtime, nil } }