From 1b6798e301a77d60c00621cc9e649d5823f1c6b6 Mon Sep 17 00:00:00 2001 From: 18209 Date: Tue, 21 Oct 2025 09:56:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=89=E5=BF=83=E4=BA=91=E6=9C=80=E5=90=8E?= =?UTF-8?q?=E4=B8=80=E6=9D=A1=E6=B5=8B=E7=82=B9=E6=95=B0=E6=8D=AE=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E6=97=B6=E9=97=B4=E4=BF=9D=E6=8A=A4=EF=BC=88=E6=96=B0?= =?UTF-8?q?=E7=9A=84kafka=E6=95=B0=E6=8D=AE=E6=97=B6=E9=97=B4=E6=97=A9?= =?UTF-8?q?=E4=BA=8E=E5=BD=93=E5=89=8Des=E9=87=8C=E9=9D=A2=E7=9A=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=81=9A=E4=B8=8D=E8=A6=86=E7=9B=96=E5=AD=98?= =?UTF-8?q?=E5=85=A5=E5=A4=84=E7=90=86=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 83 +++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 9 deletions(-) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index aa3010c..dca3deb 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -2,6 +2,7 @@ package consumers import ( "encoding/json" + "fmt" "goInOut/adaptors" "goInOut/consumers/AXYTheme" "goInOut/dbOperate" @@ -141,20 +142,84 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { //} adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} - needPush := adaptor.Transform(topic, msg) + esTimeStr, newTimeStr := the.judgeTime(msg) - if needPush != nil && needPush.Data != nil { - the.dataCache <- needPush - } else { - s, _ := json.Marshal(needPush) + judgeBool := false - if needPush != nil { - log.Printf("onData 测点[%d] needPush= %s", needPush.Sensor, s) - if needPush.Sensor == the.logTagId { - log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) + esAtime1, esErr := time.Parse("2006-01-02T15:04:05.000+0800", esTimeStr) + esAtime := esAtime1.Add(8 * time.Hour) + if esErr != nil { + log.Printf("安心云 esAtime数据时间 %s 解析错误: %v", esTimeStr, esErr) + } + + newAtime, newErr := time.Parse("2006-01-02T15:04:05.000+0800", newTimeStr) + if newErr != nil { + log.Printf("安心云 newAtime数据时间 %s 解析错误: %v", newTimeStr, newErr) + } + + // 只有在两个时间解析成功时才进行比较 + if esErr == nil && newErr == nil && newAtime.After(esAtime) { + judgeBool = true + } + + if judgeBool { + + needPush := adaptor.Transform(topic, msg) + + if needPush != nil && needPush.Data != nil { + the.dataCache <- needPush + } else { + s, _ := json.Marshal(needPush) + + if needPush != nil { + log.Printf("onData 测点[%d] needPush= %s", needPush.Sensor, s) + if needPush.Sensor == the.logTagId { + log.Printf("onData 测点[%d] 异常needPush= %s", needPush.Sensor, s) + } } } } return true } +func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (string, string) { + theme := models.AXYSavoirTheme{} + err := json.Unmarshal([]byte(rawMsg), &theme) + if err != nil { + log.Printf("反序列化 异常 dev数据=%s", rawMsg) + } + + queryStr := the.getESTimeQueryStr(theme.Station.Id, theme.Station.Id) + TimeTheme, err := the.OutEs.SearchThemeData("savoir_last_theme", queryStr) + log.Printf("判断 esTimeStr:%s,newTimeStr:%s", TimeTheme[0].CollectTime, theme.AcqTime) + return TimeTheme[0].CollectTime, theme.AcqTime +} + +func (the *consumerAXYThemeToES) getESTimeQueryStr(structId, sensorId int) string { + + esQuery := fmt.Sprintf(` +{ + "query": { + "bool": { + "must": [ + { + "term": { + "structure": { + "value": %d + } + } + }, + { + "term": { + "sensor": { + "value": %d + } + } + } + ] + } + } +} +`, structId, sensorId) + return esQuery +}