Compare commits

...

3 Commits

  1. 22
      config.yaml
  2. 4
      dataSource/kafka/aggData.go
  3. 65
      dataSource/kafka/aggData_test.go

22
config.yaml

@ -1,17 +1,19 @@
pprof:
enable: false
kafka:
groupId: lucas_et_go3
groupId: consumer_group_et_go3
brokers:
- 10.8.30.142:30992
- 10.8.30.160:30992
topics:
data_theme: native_theme
# 输入流
data_raw: RawData
data_agg: native_agg
# 输出流
data_theme: native_theme
alarm_iota: Alert
alarm_anxinyun: native_alarm
redis:
address: 10.8.30.142:30379
address: 10.8.30.160:30379
es:
enable: true
addresses:
@ -19,10 +21,10 @@ es:
user: ""
pwd: ""
index:
raw: go_native_raws
vib: go_native_vbraws
theme: go_native_themes
group: go_native_group_themes
raw: native_raws
vib: native_vbraws
theme: native_themes
group: native_group_themes
influxDB:
enable: true
address: http://10.8.30.160:30086
@ -40,7 +42,7 @@ prometheus:
push:
mqtt:
enable: false
host: 10.8.30.142
host: 10.8.30.160
port: 30883
clientIdPrefix: push_et_go
kafka:
@ -53,6 +55,6 @@ master:
port: 50000
hostNameTag: "0" #多状态副本 master 节点的 hostName 标记
node:
remoteMasterHost: 10.8.30.110 #用于 node Register -> master
remoteMasterHost: 10.8.30.104 #用于 node Register -> master
hostIpPrefix: 10.8. #多网卡筛选ip网段
port: 40000

4
dataSource/kafka/aggData.go

@ -29,8 +29,10 @@ func (h AggDataHandler) HandleMessage(message string) bool {
// 在进行 json.Unmarshal() 时报错
// 解决方案:先将 +0000 -> Z,然后再将 UTC 时间转换为中国时区时间("Asia/Shanghai")
// 2024-09-28T23:59:59.999+0800
// 将 2024-04-19T01:10:59.999+0000 -> 2024-04-19T01:10:59.999Z
utcTimeStr := strings.Replace(message, "+0000", "Z", 1)
utcTimeStr := strings.Replace(message, "+0800", "+08:00", 1)
utcTimeStr = strings.Replace(utcTimeStr, "+0000", "Z", 1)
aggData := common_models.AggData{}
err := json.Unmarshal([]byte(utcTimeStr), &aggData)

65
dataSource/kafka/aggData_test.go

@ -1,14 +1,77 @@
package kafka
import (
"encoding/json"
"fmt"
"testing"
"time"
)
func TestAggDataHandler_HandleMessage(t *testing.T) {
h := AggDataHandler{}
aggDataMsg := `
{"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
{"date":"2024-09-19T09:39:59.999+0800","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}}
`
h.HandleMessage(aggDataMsg)
}
func TestFormatTime(t *testing.T) {
now := time.Now()
nowWithSecond := now.Truncate(time.Second)
fmt.Println(nowWithSecond)
fmt.Println("nowWithSecond.UnixMilli", nowWithSecond.UnixMilli())
fmt.Println(nowWithSecond.Nanosecond())
fmt.Println(nowWithSecond.Second())
nowWithMill := nowWithSecond.Add(time.Millisecond)
fmt.Println(nowWithMill.Nanosecond())
fmt.Printf(" %v\n %v\n %v\n ", now, nowWithSecond, nowWithMill)
// 假设 collectTime 是一个 time.Time 类型的时间变量
collectTime := time.Date(2024, time.December, 1, 16, 30, 0, 123, time.UTC)
fmt.Println("collectTime: ", collectTime)
// 将时间截断到毫秒级别
truncatedTime := collectTime.Truncate(time.Millisecond)
fmt.Println("collectTime truncate millisecond : ", truncatedTime)
// 手动添加毫秒
if truncatedTime.Nanosecond() == 0 {
truncatedTime = truncatedTime.Add(time.Millisecond)
}
fmt.Println("保持毫秒部分为0的时间格式:", truncatedTime)
//自定义的 MarshalJSON 方法来实现,在将 EsTheme 结构体序列化为JSON时,格式化时间的输出格式为 2024-10-01T10:36:10.226+08:00。
now = time.Now()
fmt.Println(now)
theme := ThemeForTest{
CollectTime: collectTime,
Sensor: 0,
CreateTime: now,
}
themeBytes, err := theme.MarshalJSON()
if err != nil {
fmt.Printf("theme序列化异常,err=%s", err.Error())
}
fmt.Println(string(themeBytes))
}
type ThemeForTest struct {
CollectTime time.Time `json:"collect_time"`
Sensor int `json:"sensor"`
CreateTime time.Time `json:"create_time"`
}
func (e ThemeForTest) MarshalJSON() ([]byte, error) {
type Alias ThemeForTest
data := struct {
CollectTime string `json:"collect_time"`
CreateTime string `json:"create_time"`
*Alias
}{
CollectTime: e.CollectTime.Format("2006-01-02T15:04:05.000+08:00"),
CreateTime: e.CreateTime.Format("2006-01-02T15:04:05.000+08:00"),
Alias: (*Alias)(&e),
}
return json.Marshal(&data)
}

Loading…
Cancel
Save