Browse Source

kafka增加时间格式保护机制

dev
18209 2 weeks ago
parent
commit
62ded4e686
  1. 32
      consumers/consumerAXYThemeToES.go

32
consumers/consumerAXYThemeToES.go

@ -231,6 +231,23 @@ func (the *consumerAXYThemeToES) Work() {
func (the *consumerAXYThemeToES) onData(topic string, msg string) bool { func (the *consumerAXYThemeToES) onData(topic string, msg string) bool {
adaptor := adaptors.Adaptor_Anxinyun_LastTheme{} adaptor := adaptors.Adaptor_Anxinyun_LastTheme{}
// 首先解析消息检查时间格式
theme := models.AXYSavoirTheme{}
if err := json.Unmarshal([]byte(msg), &theme); err == nil {
// 检查并转换时间为北京时间
if len(theme.AcqTime) > 0 && theme.AcqTime[len(theme.AcqTime)-1] == 'Z' {
beijingTimeStr, err := convertUTCToBeijing(theme.AcqTime)
if err == nil {
// 更新消息中的时间为北京时间
theme.AcqTime = beijingTimeStr
// 重新序列化消息
if newMsg, err := json.Marshal(theme); err == nil {
msg = string(newMsg)
}
}
}
}
esAtime, newAtime, judgeErr := the.judgeTime(msg) esAtime, newAtime, judgeErr := the.judgeTime(msg)
judgeBool := false judgeBool := false
@ -264,6 +281,21 @@ func (the *consumerAXYThemeToES) onData(topic string, msg string) bool {
return true return true
} }
// convertUTCToBeijing 将UTC时间转换为北京时间(+8小时)
func convertUTCToBeijing(utcTimeStr string) (string, error) {
// 解析UTC时间
utcTime, err := time.Parse("2006-01-02T15:04:05.000Z", utcTimeStr)
if err != nil {
return "", err
}
// 加8小时转换为北京时间
beijingTime := utcTime.Add(8 * time.Hour)
// 格式化为北京时间字符串
return beijingTime.Format("2006-01-02T15:04:05.000+0800"), nil
}
func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) { func (the *consumerAXYThemeToES) judgeTime(rawMsg string) (time.Time, time.Time, error) {
theme := models.AXYSavoirTheme{} theme := models.AXYSavoirTheme{}
err := json.Unmarshal([]byte(rawMsg), &theme) err := json.Unmarshal([]byte(rawMsg), &theme)

Loading…
Cancel
Save