From 6864b5e94ee8de23384eb7f1a6be4309ab84d4bc Mon Sep 17 00:00:00 2001 From: yfh Date: Tue, 12 Nov 2024 15:13:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=BE=E5=AE=BD=E8=B6=85=E6=97=B6=E6=97=B6?= =?UTF-8?q?=E9=95=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- master/app/et_master.go | 8 ++++++-- node/stages/stage.go | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/master/app/et_master.go b/master/app/et_master.go index 5ef31f7..4f9db91 100644 --- a/master/app/et_master.go +++ b/master/app/et_master.go @@ -71,6 +71,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { //数据类型注册 gob.Register([]interface{}{}) for { + log.Println("L74 nodeCount: %d", the.nodeMapCount()) if the.nodeMapCount() == 0 { log.Printf("nodeList is empty!") time.Sleep(time.Second * 10) @@ -79,6 +80,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { select { case stopEnable := <-the.sleepCH: + log.Println("L83 nodeCount: %d", the.nodeMapCount()) if stopEnable { stopTime := time.Second * 10 log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime) @@ -91,8 +93,10 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { select { case data := <-dataChannels.RawDataChan: + log.Println("L96 nodeCount: %d", the.nodeMapCount()) the.notifyData(&data, the.callNodeService) case data := <-dataChannels.AggDataChan: + log.Println("L99 nodeCount: %d", the.nodeMapCount()) the.notifyData(&data, the.callNodeService) //default: // time.Sleep(100 * time.Millisecond) @@ -175,7 +179,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac // RPC调用结果 errorCode := 0 - timeoutMills := 5 * 1000 * time.Millisecond + timeoutMills := 300 * 1000 * time.Millisecond // 5分组 select { case reply := <-resultCH: // reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常 @@ -349,7 +353,7 @@ func (the *EtMaster) printNodes() { return true }) countInfo := fmt.Sprintf("共[%d]个节点:\n ", count) - log.Printf("%s %s", countInfo, info) + log.Printf("%s %s\n", countInfo, info) } func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) { val, ok := the.nodeMap.Load(address) diff --git a/node/stages/stage.go b/node/stages/stage.go index 85ca6a6..3ca1860 100644 --- a/node/stages/stage.go +++ b/node/stages/stage.go @@ -61,7 +61,7 @@ func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessD return data } func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) { - defaultTimeout := 10 * time.Second + defaultTimeout := 240 * time.Second // 4分钟 select { case <-s.execOver: case <-time.After(defaultTimeout):