et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

58 lines
1.5 KiB

package node_manager
import (
"et_rpc"
"log"
)
// NodeManager 和 rpcPool 提供了高效的节点管理和 RPC 连接管理功能,支持高并发和连接复用,提升系统性能和稳定性。
type NodeManager struct {
loadBalancer *LoadBalancer
}
func NewNodeManager(lb *LoadBalancer) *NodeManager {
return &NodeManager{
loadBalancer: lb,
}
}
func (m *NodeManager) AddNode(args *et_rpc.NodeArgs) error {
nodeConn, err := NewNodeConnection(args)
if err != nil {
log.Printf("添加Node节点失败:%s\n", err)
} else {
m.loadBalancer.AddNode(nodeConn)
log.Printf("添加Node节点: %s\n", args.Addr)
}
nodeConn.rpcPool.GetNumIdle()
log.Printf("master共有 %d 个节点", m.NodesCount())
for _, node := range m.loadBalancer.nodes {
log.Printf("master -> Node[%s] 的空闲连接有 %d 个。", node.Addr, node.rpcPool.GetNumIdle())
}
return err
}
func (m *NodeManager) RemoveNode(addr string) bool {
return m.loadBalancer.RemoveNode(addr)
//log.Printf("删除Node节点: %s\n", addr)
}
// UpdateNode 更新节点信息,isError 立即更新
func (m *NodeManager) UpdateNode(nodeArgs *et_rpc.NodeArgs, isError bool) error {
return m.loadBalancer.UpdateNode(nodeArgs, isError)
}
func (m *NodeManager) NodeExists(nodeAddr string) bool {
return m.loadBalancer.NodeExists(nodeAddr)
}
func (m *NodeManager) GetNodeConnection() (*NodeConnection, error) {
return m.loadBalancer.SelectNode()
}
func (m *NodeManager) NodesCount() int {
return len(m.loadBalancer.nodes)
}