|
|
@ -71,6 +71,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { |
|
|
|
//数据类型注册
|
|
|
|
gob.Register([]interface{}{}) |
|
|
|
for { |
|
|
|
log.Println("L74 nodeCount: %d", the.nodeMapCount()) |
|
|
|
if the.nodeMapCount() == 0 { |
|
|
|
log.Printf("nodeList is empty!") |
|
|
|
time.Sleep(time.Second * 10) |
|
|
@ -79,6 +80,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { |
|
|
|
|
|
|
|
select { |
|
|
|
case stopEnable := <-the.sleepCH: |
|
|
|
log.Println("L83 nodeCount: %d", the.nodeMapCount()) |
|
|
|
if stopEnable { |
|
|
|
stopTime := time.Second * 10 |
|
|
|
log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime) |
|
|
@ -91,8 +93,10 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { |
|
|
|
|
|
|
|
select { |
|
|
|
case data := <-dataChannels.RawDataChan: |
|
|
|
log.Println("L96 nodeCount: %d", the.nodeMapCount()) |
|
|
|
the.notifyData(&data, the.callNodeService) |
|
|
|
case data := <-dataChannels.AggDataChan: |
|
|
|
log.Println("L99 nodeCount: %d", the.nodeMapCount()) |
|
|
|
the.notifyData(&data, the.callNodeService) |
|
|
|
//default:
|
|
|
|
// time.Sleep(100 * time.Millisecond)
|
|
|
@ -175,7 +179,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac |
|
|
|
|
|
|
|
// RPC调用结果
|
|
|
|
errorCode := 0 |
|
|
|
timeoutMills := 5 * 1000 * time.Millisecond |
|
|
|
timeoutMills := 300 * 1000 * time.Millisecond // 5分组
|
|
|
|
select { |
|
|
|
case reply := <-resultCH: |
|
|
|
// reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
|
|
|
@ -349,7 +353,7 @@ func (the *EtMaster) printNodes() { |
|
|
|
return true |
|
|
|
}) |
|
|
|
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count) |
|
|
|
log.Printf("%s %s", countInfo, info) |
|
|
|
log.Printf("%s %s\n", countInfo, info) |
|
|
|
} |
|
|
|
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) { |
|
|
|
val, ok := the.nodeMap.Load(address) |
|
|
|