From 23c9459b34fc13ca1e8354b38778610b83d3afde Mon Sep 17 00:00:00 2001 From: lucas <249324454@qq.com> Date: Thu, 19 Sep 2024 23:07:03 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E7=BB=9F=E4=B8=80ch=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E9=95=BF=E5=BA=A6=3D1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_cache/cacheHandler.go | 2 +- et_calc/group/groupCalc.go | 2 +- master/app/et_master.go | 2 +- master/main_test.go | 2 +- node/app/et_node.go | 39 +++++------ node/node_test.go | 139 +++++++++++++++++++++++++++++++++++-- node/stages/stage.go | 4 +- node/stages/stage_test.go | 4 +- 8 files changed, 160 insertions(+), 34 deletions(-) diff --git a/et_cache/cacheHandler.go b/et_cache/cacheHandler.go index 6d8ad9a..1940ac1 100644 --- a/et_cache/cacheHandler.go +++ b/et_cache/cacheHandler.go @@ -30,7 +30,7 @@ func (the *CacheHandler) GetStage() stages.Stage { return *the.stage } func (the *CacheHandler) enqueue(p *common_models.ProcessData) *common_models.ProcessData { - + for _, station := range p.Stations { for _, item := range station.Info.Proto.Items { //字符串类型不处理 diff --git a/et_calc/group/groupCalc.go b/et_calc/group/groupCalc.go index 12d5b9b..886546a 100644 --- a/et_calc/group/groupCalc.go +++ b/et_calc/group/groupCalc.go @@ -36,7 +36,7 @@ func NewGroupCalc() *GroupCalc { calcTaskManager := &GroupCalc{ stage: stages.NewStage("测点分组计算"), configHelper: GetConfigHelper(), - signCalc: make(chan bool), + signCalc: make(chan bool, 1), calcTasks: map[GroupCalcTaskKey]CalcTask{}, } diff --git a/master/app/et_master.go b/master/app/et_master.go index c2c8337..c2db4f9 100644 --- a/master/app/et_master.go +++ b/master/app/et_master.go @@ -175,7 +175,7 @@ func (the *EtMaster) call_etNode(node *NodeRpc, args *common_models.IotaData) { } the.exporter.OnIotaData2metricByPrometheus(args) - resultCH := make(chan bool) + resultCH := make(chan bool, 1) go func() { defer timeCost(node.args.ID, args.DeviceId, time.Now()) var rpcResult bool diff --git a/master/main_test.go b/master/main_test.go index 3889d09..2da3a54 100644 --- a/master/main_test.go +++ b/master/main_test.go @@ -194,7 +194,7 @@ func TestJobEnRawData(t *testing.T) { } func TestCH(t *testing.T) { - resultCH := make(chan bool) + resultCH := make(chan bool, 1) log.Printf("写入数据1") resultCH <- true log.Printf("写入数据2") diff --git a/node/app/et_node.go b/node/app/et_node.go index f97a0d7..c9feaf4 100644 --- a/node/app/et_node.go +++ b/node/app/et_node.go @@ -1,7 +1,6 @@ package app import ( - "context" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" @@ -27,7 +26,7 @@ type rpcMaster struct { addr string } -const chSize = 2 +const chSize = 1 func NewEtWorker() *EtNode { node := &EtNode{ @@ -71,24 +70,24 @@ func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { } // 实现源接口 -func (the *EtNode) Process(ctx context.Context) (<-chan any, error) { - source := make(chan any, chSize) - go func() { - defer close(source) - for { - select { - case a := <-the.ch: - source <- a - log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch)) - case <-ctx.Done(): - log.Println("退出[source] EtNode.Process") - return - } - - } - }() - return source, nil -} +//func (the *EtNode) Process(ctx context.Context) (<-chan any, error) { +// source := make(chan any, chSize) +// go func() { +// defer close(source) +// for { +// select { +// case a := <-the.ch: +// source <- a +// log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch)) +// case <-ctx.Done(): +// log.Println("退出[source] EtNode.Process") +// return +// } +// +// } +// }() +// return source, nil +//} // RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister func (the *EtNode) RegisterToMaster() { diff --git a/node/node_test.go b/node/node_test.go index e7d83b0..a5e5978 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -88,9 +88,9 @@ func TestNodeHandler_CZ(t *testing.T) { "taskId": "ec9410fa-a9f1-4ba2-831a-aa52331558d6", "jobId": 3, "jobRepeatId": 1, - "triggerTime": "2024-07-08T11:09:32+08:00", + "triggerTime": "2024-08-08T00:09:32+08:00", "realTime": "0001-01-01T00:00:00Z", - "finishTime": "2024-07-08T11:09:41.34+08:00", + "finishTime": "2024-08-08T00:09:41.34+08:00", "seq": 0, "released": false, "data": { @@ -99,9 +99,9 @@ func TestNodeHandler_CZ(t *testing.T) { "axieSpeed": "71", "axieWeight": "[\"0.9\",\"0.4\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\",\"0\"]", "axisnum": "2", - "cmsLength": "2457", - "crossRoad": "1", - "direction": "2", + "cmsLength": "3100", + "crossRoad": "22", + "direction": "1", "grossWeight": 6.3, "overload": "未超载", "totalBase": "0", @@ -131,6 +131,8 @@ func TestNodeHandler_CZ(t *testing.T) { nodeStageManage.AddStages(infoHandler.GetStage()) calcHandler := et_calc.NewCalcHandler() nodeStageManage.AddStages(calcHandler.GetStage()) + cacheHandler := et_cache.NewCacheHandler() + nodeStageManage.AddStages(cacheHandler.GetStage()) sinkThemeHandler := et_sink.NewSinkThemeHandler() nodeStageManage.AddStages(sinkThemeHandler.GetStage()) publishHandler := et_push.NewPushHandler() @@ -143,6 +145,131 @@ func TestNodeHandler_CZ(t *testing.T) { time.Sleep(time.Second * 20) log.Println("测试结束") } +func TestNodeHandler_CZ2(t *testing.T) { + + rawDataMsg_CZ := `{ + "userId": "77804162-837d-4ff9-96c0-beb8e8888f8e", + "dimensionId": "00000000-0000-0000-0000-000000000000", + "dimCapId": "74f90bfc-7eff-4efd-8f1e-9022626fb672", + "scheduleId": "c3e70765-61be-4068-8d69-6ab5bb3f1240", + "jobId": 1, + "jobRepeatId": 1, + "thingId": "8e3eec71-c924-47fd-ac8b-2f28c49ad4e9", + "deviceId": "1dd0f0d2-e802-487e-91fc-d0d3f24ddbfc", + "taskId": "1f9e5d75-a607-4c44-8188-e10c4ce49fed", + "triggerTime": "2024-09-12T17:34:01.0171735+08:00", + "finishTime": "2024-09-12T17:34:01.6526854+08:00", + "seq": 0, + "result": null, + "data": { + "data": { + "licence": "苏I55526", + "axisnum": "8", + "carType": "7", + "cmsLength": "764", + "crossRoad": "nil", + "direction": "4", + "grossWeight": "433", + "limitWeight": "345", + "limtTotalWeight": "575", + "overload": 1, + "serialNum": "nil", + "height": "nil", + "lane": 1, + "speed": "20", + "temp": "-25", + "totalBase": "792", + "axieSpeed": "100", + "width": "nil", + "weightK": 1, + "axieWeight": "[\"43\",\"31\",\"35\",\"7\",\"20\",\"37\",\"24\",\"41\",\"0\",\"0\"]" + }, + "result": { + "code": 0, + "detail": null, + "msg": null + }, + "type": 0 + } +}` + + rawDataMsg_CZ2 := `{ + "userId": "77804162-837d-4ff9-96c0-beb8e8888f8e", + "dimensionId": "00000000-0000-0000-0000-000000000000", + "dimCapId": "74f90bfc-7eff-4efd-8f1e-9022626fb672", + "scheduleId": "c3e70765-61be-4068-8d69-6ab5bb3f1240", + "jobId": 1, + "jobRepeatId": 1, + "thingId": "8e3eec71-c924-47fd-ac8b-2f28c49ad4e9", + "deviceId": "1dd0f0d2-e802-487e-91fc-d0d3f24ddbfc", + "taskId": "1f9e5d75-a607-4c44-8188-e10c4ce49fed", + "triggerTime": "2024-09-12T18:30:01.0171735+08:00", + "finishTime": "2024-09-12T18:30:01.6526854+08:00", + "seq": 0, + "result": null, + "data": { + "data": { + "licence": "苏I55526", + "axisnum": "8", + "carType": "7", + "cmsLength": "764", + "crossRoad": "nil", + "direction": "5", + "grossWeight": "433", + "limitWeight": "345", + "limtTotalWeight": "575", + "overload": 1, + "serialNum": "nil", + "height": "nil", + "lane": 2, + "speed": "20", + "temp": "-25", + "totalBase": "792", + "axieSpeed": "100", + "width": "nil", + "weightK": 1, + "axieWeight": "[\"43\",\"31\",\"35\",\"7\",\"20\",\"37\",\"24\",\"41\",\"0\",\"0\"]" + }, + "result": { + "code": 0, + "detail": null, + "msg": null + }, + "type": 0 + } +}` + rawData := &common_models.IotaData{} + err := json.Unmarshal([]byte(rawDataMsg_CZ), rawData) + if err != nil { + log.Panicf("测试异常%s", err.Error()) + } + + nodeWorker := app.NewEtWorker() + nodeStageManage := stages.NewStageManager() + nodeStageManage.AddSource(nodeWorker.LoadCh()) + sinkRawHandler := et_sink.NewSinkRawHandler() + nodeStageManage.AddStages(sinkRawHandler.GetStage()) + infoHandler := et_Info.NewInfoHandler() + nodeStageManage.AddStages(infoHandler.GetStage()) + calcHandler := et_calc.NewCalcHandler() + nodeStageManage.AddStages(calcHandler.GetStage()) + cacheHandler := et_cache.NewCacheHandler() + nodeStageManage.AddStages(cacheHandler.GetStage()) + sinkThemeHandler := et_sink.NewSinkThemeHandler() + nodeStageManage.AddStages(sinkThemeHandler.GetStage()) + publishHandler := et_push.NewPushHandler() + nodeStageManage.AddStages(publishHandler.GetStage()) + + nodeStageManage.Run() + + log.Println("测试开始") + nodeWorker.ConsumerProcess(rawData) + time.Sleep(time.Second * 2) + err = json.Unmarshal([]byte(rawDataMsg_CZ2), rawData) + nodeWorker.ConsumerProcess(rawData) + time.Sleep(time.Second * 200) + log.Println("测试结束") +} func TestNodeHandler_Formula303(t *testing.T) { rawDataMsg_YB := `{ @@ -2599,7 +2726,7 @@ func TestStageMode(t *testing.T) { stage1 := stages.NewStage("加") stageManager.AddStages(*stage1) - out := make(chan *common_models.ProcessData) + out := make(chan *common_models.ProcessData, 1) stageManager.AddOut(out) diff --git a/node/stages/stage.go b/node/stages/stage.go index dfec71c..1b4455a 100644 --- a/node/stages/stage.go +++ b/node/stages/stage.go @@ -18,8 +18,8 @@ func NewStage(name string) *Stage { return &Stage{ Name: name, processFuncs: make([]func(*common_models.ProcessData) *common_models.ProcessData, 0), - In: make(<-chan *common_models.ProcessData, 2), - Out: make(chan *common_models.ProcessData, 2), + In: make(<-chan *common_models.ProcessData, 1), + Out: make(chan *common_models.ProcessData, 1), } } diff --git a/node/stages/stage_test.go b/node/stages/stage_test.go index 91e4bb3..4b2d909 100644 --- a/node/stages/stage_test.go +++ b/node/stages/stage_test.go @@ -17,7 +17,7 @@ func TestStageProcess(t *testing.T) { stage2.AddProcess(printDeviceName) sm := NewStageManager() sm.AddStages(*stage1, *stage2) - source := make(chan *common_models.ProcessData, 2) + source := make(chan *common_models.ProcessData, 1) sm.AddSource(source) sm.Run() i := 0 @@ -73,7 +73,7 @@ func TestStageRaw(t *testing.T) { //stage2.AddProcess(Sub1) //stage2.AddProcess(Sub1000) // - //in := make(chan any, 3) + //in := make(chan any, 1) //out := stage2.StageRun(stage1.StageRun(in)) // //time.Sleep(time.Second * 1)