et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

36 lines
1.1 KiB

package kafka
import (
"dataSource"
"encoding/json"
"gitea.anxinyun.cn/container/common_models"
"log"
"strings"
"time"
)
type AggDataHandler struct{}
func (h AggDataHandler) HandleMessage(message string) bool {
// aggDataMsg: {"date":"2024-04-19T01:10:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
// aggDataMsg 中的时间为UTC格式 2024-04-19T01:10:59.999+0000,
// 在进行 json.Unmarshal() 时报错
// 解决方案:先将 +0000 -> +00:00,然后再将 UTC 时间转换为 +08:00 时区时间
// 将 "+0000" 替换为 "+00:00"
replacedStr := strings.Replace(message, "+0000", "+00:00", 1)
aggData := common_models.AggData{}
err := json.Unmarshal([]byte(replacedStr), &aggData)
if err != nil {
log.Printf("json parse error: %v", err)
return false
}
// 将 UTC 时间加上8小时得到中国的本地时间
aggData.Date = aggData.Date.Add(8 * time.Hour)
log.Printf("handler 处理[%d]消息", aggData.SensorId)
dataSource.GetChannels().AggDataChan <- aggData
return true
}