From 8ee59c1985e51a7cdc0de1250cfd7d71a1850252 Mon Sep 17 00:00:00 2001 From: lucas Date: Thu, 20 Mar 2025 16:49:26 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=B7=BB=E5=8A=A0=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adaptors/安心云最新设备数据toES.go | 15 +++++++++++++-- consumers/consumerAXYraw.go | 5 +++-- main_test.go | 8 ++++---- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/adaptors/安心云最新设备数据toES.go b/adaptors/安心云最新设备数据toES.go index b300d35..0a26405 100644 --- a/adaptors/安心云最新设备数据toES.go +++ b/adaptors/安心云最新设备数据toES.go @@ -14,12 +14,20 @@ import ( // Adaptor_AXY_LastRAW 安心云 kafka iota数据 转换 es设备数据 type Adaptor_AXY_LastRAW struct { AXYraw.Info - Redis *dbOperate.RedisHelper + Redis *dbOperate.RedisHelper + TagDeviceId string } func (the Adaptor_AXY_LastRAW) Transform(topic, rawMsg string) *models.EsRaw { iotaData := models.IotaData{} - json.Unmarshal([]byte(rawMsg), &iotaData) + err := json.Unmarshal([]byte(rawMsg), &iotaData) + if err != nil { + log.Printf("反序列化 异常 dev数据=%s", rawMsg) + return nil + } + if iotaData.DeviceId == the.TagDeviceId { + log.Printf("--> Transform 标记设备数据 [%s] %s ", iotaData.DeviceId, rawMsg) + } return the.raw2es(iotaData) } @@ -60,6 +68,9 @@ func (the Adaptor_AXY_LastRAW) raw2es(iotaData models.IotaData) *models.EsRaw { DimensionId: iotaData.DimensionId, DataType: dataType, } + if iotaData.DeviceId == the.TagDeviceId { + log.Printf("--> raw2es 标记设备[%s]数据 %v ", iotaData.DeviceId, iotaData.Data.Data) + } EsRaws := toEsRaw(devdata) return EsRaws } diff --git a/consumers/consumerAXYraw.go b/consumers/consumerAXYraw.go index 2cda1cd..a0c61dd 100644 --- a/consumers/consumerAXYraw.go +++ b/consumers/consumerAXYraw.go @@ -36,7 +36,7 @@ func (the *consumerAXYraw) LoadConfigJson(cfgStr string) { func (the *consumerAXYraw) Initial(cfg string) error { the.sinkRawMap = sync.Map{} - the.dataCache = make(chan *models.EsRaw, 200) + the.dataCache = make(chan *models.EsRaw, 500) the.LoadConfigJson(cfg) err := the.inputInitial() @@ -147,7 +147,8 @@ func (the *consumerAXYraw) onData(topic string, msg string) bool { // log.Printf("recv:[%s]:%s ...", topic, msg[:80]) //} adaptor := adaptors.Adaptor_AXY_LastRAW{ - Redis: the.infoRedis, + Redis: the.infoRedis, + TagDeviceId: logTagDeviceId, } needPush := adaptor.Transform(topic, msg) diff --git a/main_test.go b/main_test.go index c4cded0..10b51ea 100644 --- a/main_test.go +++ b/main_test.go @@ -6,7 +6,7 @@ import ( "encoding/hex" "encoding/json" "fmt" - "goInOut/consumers/GZG2ZJHL/protoFiles_zjhl" + "goInOut/consumers/GZG2ZJHL/protoFiles_zjhl/protoFiles_zjhl_v3" "goInOut/utils" "google.golang.org/protobuf/proto" "log" @@ -97,7 +97,7 @@ func TestDeRC4(b *testing.T) { println("无crc", hex.EncodeToString(RawBytesNoCrc)) println("=========") - complexData := &protoFiles_zjhl.ComplexData{} + complexData := &protoFiles_zjhl_v3.ComplexData{} err := proto.Unmarshal(RawBytesNoCrc, complexData) if err != nil { println(err.Error()) @@ -128,7 +128,7 @@ func TestDeRC4_2(b *testing.T) { println("无crc", hex.EncodeToString(RawBytesNoCrc)) println("=========") - complexData := &protoFiles_zjhl.ComplexData{} + complexData := &protoFiles_zjhl_v3.ComplexData{} err := proto.Unmarshal(RawBytesNoCrc, complexData) if err != nil { println(err.Error()) @@ -138,7 +138,7 @@ func TestDeRC4_2(b *testing.T) { func TestProtobufDe(t *testing.T) { s := "0a3508011a1c10c7e84318e6b4e3b6d332220f0dcdcc8c3f15333353401dcdcc0c40721331313131313131313131313131313131313131" bs, _ := hex.DecodeString(s) - complexData := &protoFiles_zjhl.ComplexData{} + complexData := &protoFiles_zjhl_v3.ComplexData{} err := proto.Unmarshal(bs, complexData) if err != nil { println(err.Error())