From c61ea64bc1770b53346947d1238d6fe6adb2abf7 Mon Sep 17 00:00:00 2001 From: yfh Date: Thu, 3 Oct 2024 20:49:17 +0800 Subject: [PATCH] =?UTF-8?q?odeList=E6=94=B9=E4=B8=BAsync.Map;=20=E7=BB=86?= =?UTF-8?q?=E5=8C=96node=E8=AE=BF=E9=97=AE=E5=BC=82=E5=B8=B8=E6=83=85?= =?UTF-8?q?=E5=86=B5:0=E6=AD=A3=E5=B8=B8|100=E5=86=85=E9=83=A8=E9=94=99?= =?UTF-8?q?=E8=AF=AF|200=E7=BD=91=E7=BB=9C=E5=BC=82=E5=B8=B8|300=E8=AE=BF?= =?UTF-8?q?=E9=97=AE=E8=B6=85=E6=97=B65s;=20=E4=BF=AE=E5=A4=8D=E7=9B=B8?= =?UTF-8?q?=E5=90=8C=E5=9C=B0=E5=9D=80Node=E6=B3=A8=E5=86=8C=EF=BC=8Cclien?= =?UTF-8?q?t=E4=B8=8D=E6=9B=B4=E6=96=B0=E9=97=AE=E9=A2=98;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- master/app/et_master.go | 279 +++++++++++++++++++++++++--------------- 1 file changed, 175 insertions(+), 104 deletions(-) diff --git a/master/app/et_master.go b/master/app/et_master.go index 38a266e..61b1755 100644 --- a/master/app/et_master.go +++ b/master/app/et_master.go @@ -9,21 +9,21 @@ import ( "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" "log" + "math" "net" "net/rpc" - "sort" + "sync" "time" ) type EtMaster struct { - nodeList []*NodeRpc + nodeMap sync.Map exporter et_prometheus_exporter.PrometheusExporter sleepCH chan bool } func NewEtMaster() *EtMaster { master := EtMaster{ - nodeList: make([]*NodeRpc, 0), exporter: et_prometheus_exporter.NewPrometheusExporter(), sleepCH: make(chan bool, 1), } @@ -31,9 +31,9 @@ func NewEtMaster() *EtMaster { } type NodeRpc struct { - args *common_models.NodeArgs // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输入参数 - resultCH chan bool // 注册节点参数:RPC服务名为 master, 服务方法 NodeRegister 的输出结果 - aggResultCH chan bool // 聚集数据被处理后的返回结果 对应 Replay 参数 + args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数 + resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果 + aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数 client *rpc.Client } @@ -70,7 +70,7 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { //数据类型注册 gob.Register([]interface{}{}) for { - if len(the.nodeList) == 0 { + if the.nodeMapCount() == 0 { log.Printf("nodeList is empty!") time.Sleep(time.Second * 10) continue @@ -101,24 +101,27 @@ func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { thingId := data.GetThingId() isMatch := false - for _, nodeRpc := range the.nodeList { - if nodeRpc != nil { - if contains(nodeRpc.args.ThingIds, thingId) { - isMatch = true - go callNodeFunc(nodeRpc, data) + the.nodeMap.Range(func(address, value interface{}) bool { + if nodePtr, ok := value.(*NodeRpc); ok { + if nodePtr != nil { + if contains(nodePtr.args.ThingIds, thingId) { + isMatch = true + go callNodeFunc(nodePtr, data) + return false + } } } - } + + return true + }) //无匹配触发 reBalance if !isMatch { - if len(the.nodeList) > 0 { - the.sortNodeListByThingCount() - if the.nodeList[0] != nil { - the.nodeList[0].args.ThingIds = append(the.nodeList[0].args.ThingIds, thingId) - log.Printf("thingId:[%s] 分配到node:[%s]", thingId, the.nodeList[0].args.Addr) - go callNodeFunc(the.nodeList[0], data) - } + nodePtr := the.getNodeWithMinThings() + if nodePtr != nil { + nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId) + log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr) + go callNodeFunc(nodePtr, data) } } } @@ -131,7 +134,7 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac } var serviceMethod = "" - var resultCH chan bool + var resultCH chan int var v interface{} switch data.(type) { @@ -145,161 +148,223 @@ func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrac serviceMethod = "etNode.AggDataHandler" resultCH = node.aggResultCH default: - log.Printf("Unknown type:%v", v) + log.Printf("Unknown kafka data type:%v", v) return } - log.Printf("RPC[%s]node待处理的数据:%+v, \n", serviceMethod, v) + log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v) go func() { defer timeCost(node.args.ID, data.Q(), time.Now()) var reply bool err := node.client.Call(serviceMethod, data, &reply) + result := boolToInt(reply) if err != nil { - log.Printf("rpc 调用node, err:%s", err.Error()) + // rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host. + log.Printf("master调用node异常。Error:%s", err.Error()) + result = 2 } - resultCH <- reply + resultCH <- result }() // RPC调用结果 - var result bool - timeoutMills := 1000 * time.Millisecond + errorCode := 0 + timeoutMills := 5 * 1000 * time.Millisecond select { case reply := <-resultCH: - result = reply - log.Printf("RPC[%s]node处理后回复:%v。已处理的数据:%+v \n", serviceMethod, reply, v) + // reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常 + if reply == 2 { + log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v) + errorCode = 200 + } else if reply == 0 { + //log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v) + errorCode = 100 + } case <-time.After(timeoutMills): - log.Printf("RPC[%s]node调用超时退出,timeout:%v。需要处理的数据:%+v \n", serviceMethod, timeoutMills, v) - result = false + log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v) + errorCode = 300 } - log.Printf("node[%s]处理数据结果:%v。%s %s", node.args.Addr, result, data.R(), data.T()) - if result == false { - //发送 stop 信号 - the.sleepCH <- true - log.Println("=============================================") - log.Printf("node[%s]处理[%s|%s]异常,触发nodesTidy", node.args.Addr, data.R(), data.T()) - time.Sleep(time.Second * 5) - node = nil - the.nodesTidy() + // 100 故障:程序内部问题 + // 200 故障:网络通信问题 + // 300 故障:处理超时 + if errorCode >= 200 { + the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T())) + } else { + //log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T()) + log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v) } } // NodeRegister 是 RPC 服务方法,由 et_node 远程调用 -func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { +func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error { node := &NodeRpc{ args: nodeArgs, - resultCH: make(chan bool, 1), - aggResultCH: make(chan bool, 1), + resultCH: make(chan int, 1), + aggResultCH: make(chan int, 1), client: nil, } //master 初始化 node client - client, err := rpc.Dial("tcp", node.args.Addr) + client, err := rpc.Dial("tcp", nodeArgs.Addr) if err != nil { - log.Printf("链接node失败-> node[%v]", node.args.Addr) + log.Printf("链接node失败-> node[%v]", nodeArgs.Addr) return err } - node.client = client - the.nodeList = append(the.nodeList, node) + node.client = client + the.addOrUpdate(nodeArgs.Addr, node) log.Printf("node服务[%v] 注册成功", nodeArgs) - printNodesInfo(the.nodeList) - *replay = true + the.printNodes() + *reply = true return nil } -func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, replay *bool) error { - isRegister := false - for _, nodeRpc := range the.nodeList { - if nodeRpc.args.Addr == nodeArgs.Addr { - isRegister = true - } - } - if !isRegister { +func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error { + if !the.clientIsValid(nodeArgs.Addr) { log.Printf("收到-未注册的node[%v] 心跳", nodeArgs) - *replay = false + *reply = false return errors.New("未注册的node") } log.Printf("收到-node[%v] 心跳", nodeArgs) - *replay = true + *reply = true return nil } // NodeUnRegister 节点RPC 注销 -func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, replay *bool) error { - - for i, node := range the.nodeList { - log.Printf("节点[%s] 注销", node.args.Addr) - if node.args.Addr == nodeArgs.Addr { - err := node.client.Close() - if err != nil { - log.Printf("节点[%s] client关闭异常 %s", node.args.Addr, err.Error()) - } - the.nodeList[i] = nil +func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error { + value, ok := the.nodeMap.Load(nodeArgs.Addr) + node := value.(*NodeRpc) + if ok && node.client != nil { + err := node.client.Close() + if err != nil { + log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error()) } + the.nodeMap.Delete(nodeArgs.Addr) } - the.nodesTidy() + log.Printf("node服务[%v] 注销成功", nodeArgs) - *replay = true + *reply = true return nil } -func (the *EtMaster) nodesTidy() { - the.nodeList = updateNodeList(the.nodeList) - printNodesInfo(the.nodeList) -} - -func (the *EtMaster) sortNodeListByThingCount() { - sort.Slice(the.nodeList, func(i, j int) bool { - if the.nodeList[i] != nil && the.nodeList[j] != nil { - return len(the.nodeList[i].args.ThingIds) < len(the.nodeList[j].args.ThingIds) - } else { - return false - } - }) -} func (the *EtMaster) WaitNodeRegister() { log.Println("等待 node进行注册") for { - if len(the.nodeList) > 0 { + if the.nodeMapCount() > 0 { break } time.Sleep(time.Second * 10) } } + func (the *EtMaster) ConnectNode() { - for i := range the.nodeList { - nodeAddr := the.nodeList[i].args.Addr - if the.nodeList[i].client == nil { + the.nodeMap.Range(func(key, value interface{}) bool { + node := value.(*NodeRpc) + nodeAddr := key.(string) + + if node.client == nil { client, err := rpc.Dial("tcp", nodeAddr) if err != nil { log.Printf("链接node失败-> node[%v]", nodeAddr) - continue + return true } - the.nodeList[i].client = client + + node.client = client + the.nodeMap.Store(nodeAddr, node) } + + return true + }) +} + +func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) { + if val, ok := the.nodeMap.Load(key); ok { + hisNode := val.(*NodeRpc) + hisNode.client = newNode.client + the.nodeMap.Store(key, hisNode) + } else { + the.nodeMap.Store(key, newNode) } } +func (the *EtMaster) nodeMapCount() int { + count := 0 + the.nodeMap.Range(func(key, value interface{}) bool { + count++ + return true + }) + return count +} +func (the *EtMaster) clientIsValid(address string) bool { + val, ok := the.nodeMap.Load(address) + if !ok { + return false + } -// app 包内公用方法 -func updateNodeList(nodes []*NodeRpc) []*NodeRpc { - var newNodes []*NodeRpc - for _, node := range nodes { - if node != nil && node.client != nil { - newNodes = append(newNodes, node) - } + if val.(*NodeRpc).client == nil { + return false } - return newNodes + return true +} + +// 获取最少things的节点 +func (the *EtMaster) getNodeWithMinThings() *NodeRpc { + var minNode *NodeRpc + minThings := math.MaxInt64 // 初始化为最大值 + + the.nodeMap.Range(func(key, value interface{}) bool { + node := value.(*NodeRpc) + if len(node.args.ThingIds) < minThings { + minThings = len(node.args.ThingIds) + minNode = node + } + + return true + }) + + return minNode } -func printNodesInfo(nodes []*NodeRpc) { - info := fmt.Sprintf("共[%d]个节点:\n ", len(nodes)) - for _, node := range nodes { +func (the *EtMaster) printNodes() { + count := 0 + info := "" + the.nodeMap.Range(func(key, value interface{}) bool { + count++ + node := value.(*NodeRpc) info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) + return true + }) + countInfo := fmt.Sprintf("共[%d]个节点:\n ", count) + log.Printf("%s %s", countInfo, info) +} +func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) { + val, ok := the.nodeMap.Load(address) + if !ok { + log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address) + return + } + node := val.(*NodeRpc) + + //发送 stop 信号 + the.sleepCH <- true + log.Println("=============================================") + + // 100 故障:程序内部错误 + // 200 故障:网络通信问题 + // 300 故障:处理超时 + if errCode == 200 { + log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc) + time.Sleep(time.Second * 5) + the.nodeMap.Delete(address) + } else if errCode == 300 { + log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc) + time.Sleep(time.Second * 5) + the.nodeMap.Delete(address) } - log.Println(info) + + the.printNodes() } + func contains(arr []string, target string) bool { for _, value := range arr { if value == target { @@ -310,5 +375,11 @@ func contains(arr []string, target string) bool { } func timeCost(nodeId, deviceId string, start time.Time) { tc := time.Since(start) - log.Printf("调用node[%s],[%s]耗时 = %v", nodeId, deviceId, tc) + log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc) +} +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 }