From 18715f86c9edbcb0a1f06092f920db6aa4e7986b Mon Sep 17 00:00:00 2001 From: yfh Date: Mon, 24 Feb 2025 22:09:12 +0800 Subject: [PATCH] =?UTF-8?q?Master=E7=9A=84Node=E7=AE=A1=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- master/node_manager/connection_pool_grpc.go | 112 ++++++++++++ master/node_manager/load_balancer.go | 128 ++++++++++++++ master/node_manager/node_connection_grpc.go | 186 ++++++++++++++++++++ master/node_manager/node_manager.go | 58 ++++++ master/node_manager/node_selector.go | 56 ++++++ 5 files changed, 540 insertions(+) create mode 100644 master/node_manager/connection_pool_grpc.go create mode 100644 master/node_manager/load_balancer.go create mode 100644 master/node_manager/node_connection_grpc.go create mode 100644 master/node_manager/node_manager.go create mode 100644 master/node_manager/node_selector.go diff --git a/master/node_manager/connection_pool_grpc.go b/master/node_manager/connection_pool_grpc.go new file mode 100644 index 0000000..28e131b --- /dev/null +++ b/master/node_manager/connection_pool_grpc.go @@ -0,0 +1,112 @@ +package node_manager + +import ( + "context" + "et_rpc/pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" + "log" + "time" + + pool "github.com/jolestar/go-commons-pool" +) + +type GRPCPoolObject struct { + Conn *grpc.ClientConn // 保存 gRPC 连接 + Client pb.NodeServiceClient // gRPC 客户端 +} + +type GRPCClientFactory struct { + address string +} + +// NewGRPCClientFactory 创建新的 gRPC 连接工厂 +func NewGRPCClientFactory(address string) *GRPCClientFactory { + return &GRPCClientFactory{ + address: address, + } +} + +func (f *GRPCClientFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // 定义重试策略 + serviceConfig := `{ + "methodConfig": [{ + "name": [{"service": "NodeService", "method": "*"}], + "retryPolicy": { + "maxAttempts": 2, + "initialBackoff": "1s", + "maxBackoff": "10s", + "backoffMultiplier": 2, + "retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"] + } + }] + }` + + conn, err := grpc.NewClient( + f.address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(serviceConfig), + ) + + if err != nil { + return nil, err // 如果3次都失败,返回错误 + } + + client := pb.NewNodeServiceClient(conn) + return pool.NewPooledObject( + &GRPCPoolObject{ + Conn: conn, + Client: client, + }, + ), nil +} + +// 销毁 gRPC 连接 +func (f *GRPCClientFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { + grpcPoolObj := object.Object.(*GRPCPoolObject) + if grpcPoolObj.Client != nil { + // 关闭连接 + grpcPoolObj.Conn.Close() // gRPC 客户端连接关闭 + } + return nil +} + +// 验证 gRPC 连接的有效性 +func (f *GRPCClientFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { + grpcPoolObj := object.Object.(*GRPCPoolObject) + + select { + case <-ctx.Done(): + return false // 如果上下文已经取消,返回无效 + default: + // 继续进行有效性检查 + } + + healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn) + resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{ + Service: "NodeService", + }) + + if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + log.Println("ValidateObject failed:", err) + return false + } + + return true +} + +// 激活 gRPC 连接 +func (f *GRPCClientFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { + // 可以在这里发送心跳请求以确保连接有效 + return nil +} + +// 非激活 gRPC 连接 +func (f *GRPCClientFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { + // 可以在这里进行连接的重置,例如清除状态或缓存 + return nil +} diff --git a/master/node_manager/load_balancer.go b/master/node_manager/load_balancer.go new file mode 100644 index 0000000..9b66736 --- /dev/null +++ b/master/node_manager/load_balancer.go @@ -0,0 +1,128 @@ +package node_manager + +import ( + "et_rpc" + "fmt" + "sync" + "time" +) + +const ( + Node_Load_Change_Threshold = 200 + Node_Load_Threshold = 200 // 节点数据积压阈值 + Node_Refresh_Interval = 120 * time.Second // 节点信息刷新间隔 +) + +type LoadBalancer struct { + nodes []*NodeConnection + nodeSelector INodeSelector // 节点选择器 + mu sync.RWMutex +} + +func NewLoadBalancer(selector INodeSelector) *LoadBalancer { + lb := &LoadBalancer{ + nodes: make([]*NodeConnection, 0), + nodeSelector: selector, + mu: sync.RWMutex{}, + } + + // TODO 启动健康度检查 + //go lb.HealthCheck() + + return lb +} + +func (b *LoadBalancer) AddNode(node *NodeConnection) { + b.mu.Lock() + defer b.mu.Unlock() + b.nodes = append(b.nodes, node) +} + +func (b *LoadBalancer) RemoveNode(addr string) bool { + b.mu.Lock() + defer b.mu.Unlock() + for i, node := range b.nodes { + if node.Addr == addr { + b.nodes = append(b.nodes[:i], b.nodes[i+1:]...) + return true + } + } + return false +} + +func (b *LoadBalancer) SetSelector(selector INodeSelector) { + b.mu.Lock() + defer b.mu.Unlock() + b.nodeSelector = selector +} + +// SelectNode 通过节点选择器选择节点 +func (b *LoadBalancer) SelectNode() (*NodeConnection, error) { + b.mu.RLock() + defer b.mu.RUnlock() + return b.nodeSelector.Select(b.nodes) +} + +// UpdateNode 事件驱动更新节点,isError 立即更新 +func (b *LoadBalancer) UpdateNode(nodeArgs *et_rpc.NodeArgs, isError bool) error { + b.mu.Lock() + defer b.mu.Unlock() + + // 不健康:status = NodeState_Unhealthy 或者 Load 超阈值 + for _, node := range b.nodes { + if node.Addr == nodeArgs.Addr { + isOverThreshold := abs(node.NArgs.Load-nodeArgs.Load) > Node_Load_Change_Threshold // 荷载变化超阈值 + isTimeout := time.Since(node.lastUpdate) > Node_Refresh_Interval // 超刷新间隔 + if isError || isOverThreshold || isTimeout { + node.NArgs.Load = nodeArgs.Load + node.NArgs.Status = et_rpc.NodeState_Healthy //TODO node.GetHealthStatus(nodeArgs) + node.lastUpdate = time.Now() + } + return nil + } + } + + return fmt.Errorf("未注册的节点: %s", nodeArgs.Addr) +} + +func (b *LoadBalancer) NodeExists(nodeAddr string) bool { + b.mu.RLock() + defer b.mu.RUnlock() + for _, node := range b.nodes { + if node.Addr == nodeAddr { + return true + } + } + return false +} + +func abs(x int) int { + if x < 0 { + return -x + } + return x +} + +// 定时健康检查、更新节点状态 +//func (b *LoadBalancer) HealthCheck() { +// for { +// b.mu.Lock() +// reply := new(et_rpc.NodeArgs) +// for _, node := range b.nodes { +// result := b.checkNodeHealth(node, reply) +// b.UpdateNode(reply, result) +// } +// b.mu.Unlock() +// +// time.Sleep(5 * time.Minute) +// } +//} +// +//// 健康检查的具体实现 +//func (b *LoadBalancer) checkNodeHealth(conn *NodeConnection, reply *et_rpc.NodeArgs) bool { +// // 健康检查,例如发送心跳请求等 +// err := conn.Call(context.Background(), et_rpc.RPCService_Node_Ping, &et_rpc.NodeArgs{}, reply) +// return err == nil +// +// // TODO 根据返回信息:节点的CPU、内存、硬盘使用情况、数据积压情况来判断节点的健康情况 +//} diff --git a/master/node_manager/node_connection_grpc.go b/master/node_manager/node_connection_grpc.go new file mode 100644 index 0000000..c7c741b --- /dev/null +++ b/master/node_manager/node_connection_grpc.go @@ -0,0 +1,186 @@ +package node_manager + +import ( + "context" + "et_rpc" + "et_rpc/pb" + "fmt" + "gitea.anxinyun.cn/container/common_models" + pool "github.com/jolestar/go-commons-pool" + "google.golang.org/grpc/health/grpc_health_v1" + "log" + "sync" + "time" +) + +type NodeConnection struct { + Addr string + NArgs *et_rpc.NodeArgs + rpcPool *pool.ObjectPool + lastUpdate time.Time // 节点信息更新时间 + ctx context.Context + mu sync.Mutex +} + +// NewNodeConnection 创建一个 NodeConnection +// TODO NewNodeConnection从配置文件中获取 pool 参数 +func NewNodeConnection(args *et_rpc.NodeArgs) (*NodeConnection, error) { + ctx := context.Background() + factory := NewGRPCClientFactory(args.Addr) + p := pool.NewObjectPoolWithDefaultConfig(ctx, factory) + p.Config.MaxTotal = 400 + p.Config.MinIdle = 200 + p.Config.TestOnBorrow = true + p.Config.TestOnReturn = false + p.Config.TestWhileIdle = true // 是否在空闲时检查连接有效性 + p.Config.MinEvictableIdleTime = 30 * time.Minute //空闲连接最小可驱逐时间 + //p.Config.SoftMinEvictableIdleTime = 15 * time.Minute //空闲连接软最小可驱逐时间 + + nodeConn := &NodeConnection{ + ctx: ctx, + Addr: args.Addr, + rpcPool: p, + NArgs: args, + } + + // 获取连接进行简单的测试 + obj, err := nodeConn.rpcPool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("建立RPC连接失败:%w", err) + } + defer nodeConn.rpcPool.ReturnObject(ctx, obj) + + grpcPoolObj, ok := obj.(*GRPCPoolObject) + if !ok { + log.Fatalf("类型断言失败,obj 不是 *GRPCPoolObject 类型") + } + + // 健康检查 + healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn) + resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{ + Service: "NodeService", + }) + + if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return nil, fmt.Errorf("健康检查失败: %v, 状态: %v", err, resp.Status) + } + + return nodeConn, nil +} + +func (n *NodeConnection) GetHealthStatus(args *common_models.NodeArgs) et_rpc.NodeState { + // TODO CPU/Memory/Disk 使用是否超过阈值 + //resourcesIsOver := true + + return et_rpc.NodeState_Healthy + + // 荷载是否超过阈值、访问RPC失败 + //if args.Load > Node_Load_Threshold || args.Status == et_rpc.NodeState_Unhealthy { + // return et_rpc.NodeState_Unhealthy + //} else { + // return et_rpc.NodeState_Healthy + //} +} + +// 更新节点信息 +//func (n *NodeConnection) UpdateNodeArgs(args *et_rpc.NodeArgs, forceUpdate bool) { +// n.mu.Lock() +// defer n.mu.Unlock() +// +// // 检查是否需要更新节点信息 +// isOverThreshold := abs(n.NArgs.Load-args.Load) > Node_Load_Change_Threshold // 荷载变化超阈值 +// isTimeout := time.Since(n.lastUpdate) > Node_Refresh_Interval // 超刷新间隔 +// +// if forceUpdate || isOverThreshold || isTimeout { +// // 更新节点信息 +// n.NArgs.Load = args.Load +// n.NArgs.Status = n.GetHealthStatus(args) +// n.lastUpdate = time.Now() +// } +//} + +func (n *NodeConnection) CallHandleIotaData(id string, messages []string) error { + // 创建新的上下文并设置超时 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // 从连接池中借用一个连接 + obj1, err := n.rpcPool.BorrowObject(ctx) + if err != nil { + return fmt.Errorf("gRPC[HandleIotaData] 借用对象错误: %w", err) + } + + // 使用连接相关处理 + rpcPoolObj, ok := obj1.(*GRPCPoolObject) + if !ok { + log.Fatalf("类型断言失败,obj1 不是 *GRPCPoolObject 类型") + } + + defer func() { + if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { + log.Printf("gRPC[HandleIotaData] 归还对象到连接池失败: %v", err) + } + }() + + // 进行 RPC 调用 + request := &pb.HandleDataRequest{ + Id: id, + Messages: messages, + } + + startTime := time.Now() + _, err = rpcPoolObj.Client.HandleIotaData(ctx, request) + duration := time.Since(startTime) + + if err != nil { + log.Printf("调用失败。gRPC[HandleIotaData] 错误: %v, 耗时: %v", err, duration) + return fmt.Errorf("调用失败。gRPC[HandleIotaData] 错误: %w", err) + } + + //log.Printf("调用成功。gRPC[HandleIotaData] resp=%+v, 耗时: %v", resp, duration) + + return nil +} + +func (n *NodeConnection) CallHandleAggData(id string, messages []string) error { + // 创建新的上下文并设置超时 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // 从连接池中借用一个连接 + obj1, err := n.rpcPool.BorrowObject(ctx) + if err != nil { + return fmt.Errorf("gRPC[HandleAggData] 借用对象错误: %w", err) + } + + // 使用连接相关处理 + rpcPoolObj, ok := obj1.(*GRPCPoolObject) + if !ok { + log.Fatalf("类型断言失败,obj1 不是 *GRPCPoolObject 类型") + } + + defer func() { + if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { + log.Printf("gRPC[HandleAggData] 归还对象到连接池失败: %v", err) + } + }() + + // 进行 RPC 调用 + request := &pb.HandleDataRequest{ + Id: id, + Messages: messages, + } + + startTime := time.Now() + _, err = rpcPoolObj.Client.HandleAggData(ctx, request) + duration := time.Since(startTime) + + if err != nil { + log.Printf("调用失败。gRPC[HandleAggData] 错误: %v, 耗时: %v", err, duration) + return fmt.Errorf("调用失败。gRPC[HandleAggData] 错误: %w", err) + } + + //log.Printf("调用成功。gRPC[HandleAggData] resp=%+v, 耗时: %v", resp, duration) + + return nil +} diff --git a/master/node_manager/node_manager.go b/master/node_manager/node_manager.go new file mode 100644 index 0000000..1ea678c --- /dev/null +++ b/master/node_manager/node_manager.go @@ -0,0 +1,58 @@ +package node_manager + +import ( + "et_rpc" + "log" +) + +// NodeManager 和 rpcPool 提供了高效的节点管理和 RPC 连接管理功能,支持高并发和连接复用,提升系统性能和稳定性。 +type NodeManager struct { + loadBalancer *LoadBalancer +} + +func NewNodeManager(lb *LoadBalancer) *NodeManager { + return &NodeManager{ + loadBalancer: lb, + } +} + +func (m *NodeManager) AddNode(args *et_rpc.NodeArgs) error { + nodeConn, err := NewNodeConnection(args) + if err != nil { + log.Printf("添加Node节点失败:%s\n", err) + } else { + m.loadBalancer.AddNode(nodeConn) + log.Printf("添加Node节点: %s\n", args.Addr) + } + + nodeConn.rpcPool.GetNumIdle() + + log.Printf("master共有 %d 个节点", m.NodesCount()) + for _, node := range m.loadBalancer.nodes { + log.Printf("master -> Node[%s] 的空闲连接有 %d 个。", node.Addr, node.rpcPool.GetNumIdle()) + } + + return err +} + +func (m *NodeManager) RemoveNode(addr string) bool { + return m.loadBalancer.RemoveNode(addr) + //log.Printf("删除Node节点: %s\n", addr) +} + +// UpdateNode 更新节点信息,isError 立即更新 +func (m *NodeManager) UpdateNode(nodeArgs *et_rpc.NodeArgs, isError bool) error { + return m.loadBalancer.UpdateNode(nodeArgs, isError) +} + +func (m *NodeManager) NodeExists(nodeAddr string) bool { + return m.loadBalancer.NodeExists(nodeAddr) +} + +func (m *NodeManager) GetNodeConnection() (*NodeConnection, error) { + return m.loadBalancer.SelectNode() +} + +func (m *NodeManager) NodesCount() int { + return len(m.loadBalancer.nodes) +} diff --git a/master/node_manager/node_selector.go b/master/node_manager/node_selector.go new file mode 100644 index 0000000..126db73 --- /dev/null +++ b/master/node_manager/node_selector.go @@ -0,0 +1,56 @@ +package node_manager + +import ( + "fmt" + "sync/atomic" +) + +type INodeSelector interface { + Select(nodes []*NodeConnection) (*NodeConnection, error) +} + +type RoundRobinSelector struct { + index int32 +} + +func (s *RoundRobinSelector) Select(nodes []*NodeConnection) (*NodeConnection, error) { + if len(nodes) == 0 { + return nil, fmt.Errorf("没有可用的节点") + } + + // 原子读取当前索引 + currentIndex := atomic.LoadInt32(&s.index) + selectedNode := nodes[currentIndex%int32(len(nodes))] + + // TODO 检查节点状态, 暂时先不检查节点健康状态 + s.UpdateIndex() // 如果节点健康,更新索引 + if selectedNode == nil { + return nil, fmt.Errorf("无此索引的节点。%d", currentIndex) + } + + return selectedNode, nil + + //if selectedNode.NArgs.Status == et_rpc.NodeState_Healthy { + // s.UpdateIndex() // 如果节点健康,更新索引 + // return selectedNode, nil + //} + + // 如果当前节点不健康,尝试查找下一个健康节点 + //for i := 1; i < len(nodes); i++ { // 从下一个节点开始查找 + // nextIndex := (currentIndex + int32(i)) % int32(len(nodes)) + // selectedNode = nodes[nextIndex] + // if selectedNode.NArgs.Status == et_rpc.NodeState_Healthy { + // s.UpdateIndex() // 找到健康节点,更新索引 + // return selectedNode, nil + // } + //} + // + //// 如果没有健康节点,重置索引并返回错误 + //atomic.StoreInt32(&s.index, 0) + //return nil, fmt.Errorf("所有节点都不健康") +} + +// 更新索引的单独方法 +func (s *RoundRobinSelector) UpdateIndex() { + atomic.AddInt32(&s.index, 1) // 原子增加索引 +}