diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index aa3010c..dca3deb 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -2,6 +2,7 @@ package consumers import ( "encoding/json" + "fmt" "goInOut/adaptors" "goInOut/consumers/AXYTheme" "goInOut/dbOperate" @@ -141,20 +142,84 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { //} adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} - needPush := adaptor.Transform(topic, msg) + esTimeStr, newTimeStr := the.judgeTime(msg) - if needPush != nil && needPush.Data != nil { - the.dataCache <- needPush - } else { - s, _ := json.Marshal(needPush) + judgeBool := false - if needPush != nil { - log.Printf("onData 测点[%d] needPush= %s", needPush.Sensor, s) - if needPush.Sensor == the.logTagId { - log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000+0800", esTimeStr) + esAtime := esAtime1.Add(8 * time.Hour) + if esErr != nil { + log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr) + } + + newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", newTimeStr) + if newErr != nil { + log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) + } + + // 只有在两个时间解析成功时才进行比较 + if esErr == nil && newErr == nil && newAtime.After(esAtime) { + judgeBool = true + } + + if judgeBool { + + needPush := adaptor.Transform(topic, msg) + + if needPush != nil && needPush.Data != nil { + the.dataCache <- needPush + } else { + s, _ := json.Marshal(needPush) + + if needPush != nil { + log.Printf("onData 测点[%d] needPush= %s", needPush.Sensor, s) + if needPush.Sensor == the.logTagId { + log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) + } } } } return true } +func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (string, string) { + theme := models.AXYSavoirTheme{} + err := json.Unmarshal([]byte(rawMsg), &theme) + if err != nil { + log.Printf("反序列化 异常 dev数据=%s", rawMsg) + } + + queryStr := the.getESTimeQueryStr(theme.Station.Id, theme.Station.Id) + TimeTheme, err := the.OutEs.SearchThemeData("savoir_last_theme", queryStr) + log.Printf("判断 esTimeStr:%s,newTimeStr:%s", TimeTheme[0].CollectTime, theme.AcqTime) + return TimeTheme[0].CollectTime, theme.AcqTime +} + +func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { + + esQuery := fmt.Sprintf(` +{ + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "sensor": { + "value": %d + } + } + } + ] + } + } +} +`, structId, sensorId) + return esQuery +}