diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index 06296ba..c84a306 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -157,8 +157,8 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) } - // 只有在两个时间解析成功时才进行比较 - if esErr == nil && newErr == nil && newAtime.After(esAtime) { + // 只有在两个时间解析成功时,且new时间在es时间之后,或者相等的情况下,才进行推送 + if esErr == nil && newErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { judgeBool = true } @@ -191,8 +191,14 @@ func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (string, string) { queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id) TimeTheme, err := the.OutEs.SearchThemeData("anxincloud_last_theme", queryStr) - log.Printf("判断 esTimeStr:%s,newTimeStr:%s", TimeTheme[0].CollectTime, theme.AcqTime) - return TimeTheme[0].CollectTime, theme.AcqTime + //如果es里面没有这个数据时间,呢就返回测点的时间 + if len(TimeTheme) > 0 { + log.Printf("判断 esTimeStr:%s,newTimeStr:%s", TimeTheme[0].CollectTime, theme.AcqTime) + return TimeTheme[0].CollectTime, theme.AcqTime + } else { + log.Printf("esTime 为空, 新时间:%s", theme.AcqTime) + return theme.AcqTime, theme.AcqTime + } } func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string {