From 62ded4e686643cce0d18ba3bbbffa73cfa58fa18 Mon Sep 17 00:00:00 2001 From: 18209 Date: Fri, 16 Jan 2026 15:42:55 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E5=A2=9E=E5=8A=A0=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E4=BF=9D=E6=8A=A4=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consumers/consumerAXYThemeToES.go | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/consumers/consumerAXYThemeToES.go b/consumers/consumerAXYThemeToES.go index ca9046f..47ccdfa 100644 --- a/consumers/consumerAXYThemeToES.go +++ b/consumers/consumerAXYThemeToES.go @@ -231,6 +231,23 @@ func (the *consumerAXYThemeToES) Work() { func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} + // 首先解析消息检查时间格式 + theme := models.AXYSavoirTheme{} + if err := json.Unmarshal([]byte(msg), &theme); err == nil { + // 检查并转换时间为北京时间 + if len(theme.AcqTime) > 0 && theme.AcqTime[len(theme.AcqTime)-1] == 'Z' { + beijingTimeStr, err := convertUTCToBeijing(theme.AcqTime) + if err == nil { + // 更新消息中的时间为北京时间 + theme.AcqTime = beijingTimeStr + // 重新序列化消息 + if newMsg, err := json.Marshal(theme); err == nil { + msg = string(newMsg) + } + } + } + } + esAtime, newAtime, judgeErr := the.judgeTime(msg) judgeBool := false @@ -264,6 +281,21 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { return true } +// convertUTCToBeijing 将UTC时间转换为北京时间(+8小时) +func convertUTCToBeijing(utcTimeStr string) (string, error) { + // 解析UTC时间 + utcTime, err := time.Parse("2006-01-02T15:04:05.000Z", utcTimeStr) + if err != nil { + return "", err + } + + // 加8小时转换为北京时间 + beijingTime := utcTime.Add(8 * time.Hour) + + // 格式化为北京时间字符串 + return beijingTime.Format("2006-01-02T15:04:05.000+0800"), nil +} + func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { theme := models.AXYSavoirTheme{} err := json.Unmarshal([]byte(rawMsg), &theme)