|
|
|
@ -142,23 +142,23 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { |
|
|
|
//}
|
|
|
|
adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} |
|
|
|
|
|
|
|
esTimeStr, newTimeStr := the.judgeTime(msg) |
|
|
|
esAtime, newAtime, judgeErr := the.judgeTime(msg) |
|
|
|
|
|
|
|
judgeBool := false |
|
|
|
|
|
|
|
esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", esTimeStr) |
|
|
|
/*esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000-0700", esTimeStr) |
|
|
|
if esErr != nil { |
|
|
|
log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr) |
|
|
|
} |
|
|
|
esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间
|
|
|
|
|
|
|
|
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", newTimeStr) |
|
|
|
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000-0700", newTimeStr) |
|
|
|
if newErr != nil { |
|
|
|
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) |
|
|
|
} |
|
|
|
}*/ |
|
|
|
|
|
|
|
// 只有在两个时间解析成功时,且new时间在es时间之后,或者相等的情况下,才进行推送
|
|
|
|
if esErr == nil && newErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { |
|
|
|
if judgeErr == nil && (newAtime.After(esAtime) || newAtime.Equal(esAtime)) { |
|
|
|
judgeBool = true |
|
|
|
} |
|
|
|
|
|
|
|
@ -182,22 +182,43 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { |
|
|
|
|
|
|
|
return true |
|
|
|
} |
|
|
|
func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (string, string) { |
|
|
|
func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { |
|
|
|
theme := models.AXYSavoirTheme{} |
|
|
|
err := json.Unmarshal([]byte(rawMsg), &theme) |
|
|
|
if err != nil { |
|
|
|
log.Printf("反序列化 异常 dev数据=%s", rawMsg) |
|
|
|
return time.Time{}, time.Time{}, err |
|
|
|
} |
|
|
|
|
|
|
|
queryStr := the.getESTimeQueryStr(theme.Station.Structure.Id, theme.Station.Id) |
|
|
|
TimeTheme, err := the.OutEs.SearchThemeData("anxincloud_last_theme", queryStr) |
|
|
|
//如果es里面没有这个数据时间,呢就返回测点的时间
|
|
|
|
if len(TimeTheme) > 0 { |
|
|
|
log.Printf("判断 esTimeStr:%s,newTimeStr:%s", TimeTheme[0].CollectTime, theme.AcqTime) |
|
|
|
return TimeTheme[0].CollectTime, theme.AcqTime |
|
|
|
cTime := TimeTheme[0].CollectTime |
|
|
|
acqTime := theme.AcqTime |
|
|
|
log.Printf("判断 esTimeStr:%s,newTimeStr:%s", cTime, acqTime) |
|
|
|
esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000Z", cTime) |
|
|
|
if esErr != nil { |
|
|
|
log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", cTime, esErr) |
|
|
|
return time.Time{}, time.Time{}, esErr |
|
|
|
} |
|
|
|
esAtime := esAtime1.Add(8 * time.Hour) // 转为北京时间
|
|
|
|
|
|
|
|
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) |
|
|
|
if newErr != nil { |
|
|
|
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) |
|
|
|
return time.Time{}, time.Time{}, newErr |
|
|
|
} |
|
|
|
return esAtime, newAtime, nil |
|
|
|
} else { |
|
|
|
log.Printf("esTime 为空, 新时间:%s", theme.AcqTime) |
|
|
|
return theme.AcqTime, theme.AcqTime |
|
|
|
acqTime := theme.AcqTime |
|
|
|
log.Printf("esTime 为空, 新时间:%s", acqTime) |
|
|
|
newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", acqTime) |
|
|
|
if newErr != nil { |
|
|
|
log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", acqTime, newErr) |
|
|
|
return time.Time{}, time.Time{}, newErr |
|
|
|
} |
|
|
|
return newAtime, newAtime, nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|