Browse Source

Master的Node管理器

dev
yfh 1 month ago
parent
commit
18715f86c9
  1. 112
      master/node_manager/connection_pool_grpc.go
  2. 128
      master/node_manager/load_balancer.go
  3. 186
      master/node_manager/node_connection_grpc.go
  4. 58
      master/node_manager/node_manager.go
  5. 56
      master/node_manager/node_selector.go

112
master/node_manager/connection_pool_grpc.go

@ -0,0 +1,112 @@
package node_manager
import (
"context"
"et_rpc/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
"log"
"time"
pool "github.com/jolestar/go-commons-pool"
)
type GRPCPoolObject struct {
Conn *grpc.ClientConn // 保存 gRPC 连接
Client pb.NodeServiceClient // gRPC 客户端
}
type GRPCClientFactory struct {
address string
}
// NewGRPCClientFactory 创建新的 gRPC 连接工厂
func NewGRPCClientFactory(address string) *GRPCClientFactory {
return &GRPCClientFactory{
address: address,
}
}
func (f *GRPCClientFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 定义重试策略
serviceConfig := `{
"methodConfig": [{
"name": [{"service": "NodeService", "method": "*"}],
"retryPolicy": {
"maxAttempts": 2,
"initialBackoff": "1s",
"maxBackoff": "10s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"]
}
}]
}`
conn, err := grpc.NewClient(
f.address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(serviceConfig),
)
if err != nil {
return nil, err // 如果3次都失败,返回错误
}
client := pb.NewNodeServiceClient(conn)
return pool.NewPooledObject(
&GRPCPoolObject{
Conn: conn,
Client: client,
},
), nil
}
// 销毁 gRPC 连接
func (f *GRPCClientFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {
grpcPoolObj := object.Object.(*GRPCPoolObject)
if grpcPoolObj.Client != nil {
// 关闭连接
grpcPoolObj.Conn.Close() // gRPC 客户端连接关闭
}
return nil
}
// 验证 gRPC 连接的有效性
func (f *GRPCClientFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {
grpcPoolObj := object.Object.(*GRPCPoolObject)
select {
case <-ctx.Done():
return false // 如果上下文已经取消,返回无效
default:
// 继续进行有效性检查
}
healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn)
resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{
Service: "NodeService",
})
if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING {
log.Println("ValidateObject failed:", err)
return false
}
return true
}
// 激活 gRPC 连接
func (f *GRPCClientFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {
// 可以在这里发送心跳请求以确保连接有效
return nil
}
// 非激活 gRPC 连接
func (f *GRPCClientFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {
// 可以在这里进行连接的重置,例如清除状态或缓存
return nil
}

128
master/node_manager/load_balancer.go

@ -0,0 +1,128 @@
package node_manager
import (
"et_rpc"
"fmt"
"sync"
"time"
)
const (
Node_Load_Change_Threshold = 200
Node_Load_Threshold = 200 // 节点数据积压阈值
Node_Refresh_Interval = 120 * time.Second // 节点信息刷新间隔
)
type LoadBalancer struct {
nodes []*NodeConnection
nodeSelector INodeSelector // 节点选择器
mu sync.RWMutex
}
func NewLoadBalancer(selector INodeSelector) *LoadBalancer {
lb := &LoadBalancer{
nodes: make([]*NodeConnection, 0),
nodeSelector: selector,
mu: sync.RWMutex{},
}
// TODO 启动健康度检查
//go lb.HealthCheck()
return lb
}
func (b *LoadBalancer) AddNode(node *NodeConnection) {
b.mu.Lock()
defer b.mu.Unlock()
b.nodes = append(b.nodes, node)
}
func (b *LoadBalancer) RemoveNode(addr string) bool {
b.mu.Lock()
defer b.mu.Unlock()
for i, node := range b.nodes {
if node.Addr == addr {
b.nodes = append(b.nodes[:i], b.nodes[i+1:]...)
return true
}
}
return false
}
func (b *LoadBalancer) SetSelector(selector INodeSelector) {
b.mu.Lock()
defer b.mu.Unlock()
b.nodeSelector = selector
}
// SelectNode 通过节点选择器选择节点
func (b *LoadBalancer) SelectNode() (*NodeConnection, error) {
b.mu.RLock()
defer b.mu.RUnlock()
return b.nodeSelector.Select(b.nodes)
}
// UpdateNode 事件驱动更新节点,isError 立即更新
func (b *LoadBalancer) UpdateNode(nodeArgs *et_rpc.NodeArgs, isError bool) error {
b.mu.Lock()
defer b.mu.Unlock()
// 不健康:status = NodeState_Unhealthy 或者 Load 超阈值
for _, node := range b.nodes {
if node.Addr == nodeArgs.Addr {
isOverThreshold := abs(node.NArgs.Load-nodeArgs.Load) > Node_Load_Change_Threshold // 荷载变化超阈值
isTimeout := time.Since(node.lastUpdate) > Node_Refresh_Interval // 超刷新间隔
if isError || isOverThreshold || isTimeout {
node.NArgs.Load = nodeArgs.Load
node.NArgs.Status = et_rpc.NodeState_Healthy //TODO node.GetHealthStatus(nodeArgs)
node.lastUpdate = time.Now()
}
return nil
}
}
return fmt.Errorf("未注册的节点: %s", nodeArgs.Addr)
}
func (b *LoadBalancer) NodeExists(nodeAddr string) bool {
b.mu.RLock()
defer b.mu.RUnlock()
for _, node := range b.nodes {
if node.Addr == nodeAddr {
return true
}
}
return false
}
func abs(x int) int {
if x < 0 {
return -x
}
return x
}
// 定时健康检查、更新节点状态
//func (b *LoadBalancer) HealthCheck() {
// for {
// b.mu.Lock()
// reply := new(et_rpc.NodeArgs)
// for _, node := range b.nodes {
// result := b.checkNodeHealth(node, reply)
// b.UpdateNode(reply, result)
// }
// b.mu.Unlock()
//
// time.Sleep(5 * time.Minute)
// }
//}
//
//// 健康检查的具体实现
//func (b *LoadBalancer) checkNodeHealth(conn *NodeConnection, reply *et_rpc.NodeArgs) bool {
// // 健康检查,例如发送心跳请求等
// err := conn.Call(context.Background(), et_rpc.RPCService_Node_Ping, &et_rpc.NodeArgs{}, reply)
// return err == nil
//
// // TODO 根据返回信息:节点的CPU、内存、硬盘使用情况、数据积压情况来判断节点的健康情况
//}

186
master/node_manager/node_connection_grpc.go

@ -0,0 +1,186 @@
package node_manager
import (
"context"
"et_rpc"
"et_rpc/pb"
"fmt"
"gitea.anxinyun.cn/container/common_models"
pool "github.com/jolestar/go-commons-pool"
"google.golang.org/grpc/health/grpc_health_v1"
"log"
"sync"
"time"
)
type NodeConnection struct {
Addr string
NArgs *et_rpc.NodeArgs
rpcPool *pool.ObjectPool
lastUpdate time.Time // 节点信息更新时间
ctx context.Context
mu sync.Mutex
}
// NewNodeConnection 创建一个 NodeConnection
// TODO NewNodeConnection从配置文件中获取 pool 参数
func NewNodeConnection(args *et_rpc.NodeArgs) (*NodeConnection, error) {
ctx := context.Background()
factory := NewGRPCClientFactory(args.Addr)
p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)
p.Config.MaxTotal = 400
p.Config.MinIdle = 200
p.Config.TestOnBorrow = true
p.Config.TestOnReturn = false
p.Config.TestWhileIdle = true // 是否在空闲时检查连接有效性
p.Config.MinEvictableIdleTime = 30 * time.Minute //空闲连接最小可驱逐时间
//p.Config.SoftMinEvictableIdleTime = 15 * time.Minute //空闲连接软最小可驱逐时间
nodeConn := &NodeConnection{
ctx: ctx,
Addr: args.Addr,
rpcPool: p,
NArgs: args,
}
// 获取连接进行简单的测试
obj, err := nodeConn.rpcPool.BorrowObject(ctx)
if err != nil {
return nil, fmt.Errorf("建立RPC连接失败:%w", err)
}
defer nodeConn.rpcPool.ReturnObject(ctx, obj)
grpcPoolObj, ok := obj.(*GRPCPoolObject)
if !ok {
log.Fatalf("类型断言失败,obj 不是 *GRPCPoolObject 类型")
}
// 健康检查
healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn)
resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{
Service: "NodeService",
})
if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING {
return nil, fmt.Errorf("健康检查失败: %v, 状态: %v", err, resp.Status)
}
return nodeConn, nil
}
func (n *NodeConnection) GetHealthStatus(args *common_models.NodeArgs) et_rpc.NodeState {
// TODO CPU/Memory/Disk 使用是否超过阈值
//resourcesIsOver := true
return et_rpc.NodeState_Healthy
// 荷载是否超过阈值、访问RPC失败
//if args.Load > Node_Load_Threshold || args.Status == et_rpc.NodeState_Unhealthy {
// return et_rpc.NodeState_Unhealthy
//} else {
// return et_rpc.NodeState_Healthy
//}
}
// 更新节点信息
//func (n *NodeConnection) UpdateNodeArgs(args *et_rpc.NodeArgs, forceUpdate bool) {
// n.mu.Lock()
// defer n.mu.Unlock()
//
// // 检查是否需要更新节点信息
// isOverThreshold := abs(n.NArgs.Load-args.Load) > Node_Load_Change_Threshold // 荷载变化超阈值
// isTimeout := time.Since(n.lastUpdate) > Node_Refresh_Interval // 超刷新间隔
//
// if forceUpdate || isOverThreshold || isTimeout {
// // 更新节点信息
// n.NArgs.Load = args.Load
// n.NArgs.Status = n.GetHealthStatus(args)
// n.lastUpdate = time.Now()
// }
//}
func (n *NodeConnection) CallHandleIotaData(id string, messages []string) error {
// 创建新的上下文并设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 从连接池中借用一个连接
obj1, err := n.rpcPool.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("gRPC[HandleIotaData] 借用对象错误: %w", err)
}
// 使用连接相关处理
rpcPoolObj, ok := obj1.(*GRPCPoolObject)
if !ok {
log.Fatalf("类型断言失败,obj1 不是 *GRPCPoolObject 类型")
}
defer func() {
if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil {
log.Printf("gRPC[HandleIotaData] 归还对象到连接池失败: %v", err)
}
}()
// 进行 RPC 调用
request := &pb.HandleDataRequest{
Id: id,
Messages: messages,
}
startTime := time.Now()
_, err = rpcPoolObj.Client.HandleIotaData(ctx, request)
duration := time.Since(startTime)
if err != nil {
log.Printf("调用失败。gRPC[HandleIotaData] 错误: %v, 耗时: %v", err, duration)
return fmt.Errorf("调用失败。gRPC[HandleIotaData] 错误: %w", err)
}
//log.Printf("调用成功。gRPC[HandleIotaData] resp=%+v, 耗时: %v", resp, duration)
return nil
}
func (n *NodeConnection) CallHandleAggData(id string, messages []string) error {
// 创建新的上下文并设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 从连接池中借用一个连接
obj1, err := n.rpcPool.BorrowObject(ctx)
if err != nil {
return fmt.Errorf("gRPC[HandleAggData] 借用对象错误: %w", err)
}
// 使用连接相关处理
rpcPoolObj, ok := obj1.(*GRPCPoolObject)
if !ok {
log.Fatalf("类型断言失败,obj1 不是 *GRPCPoolObject 类型")
}
defer func() {
if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil {
log.Printf("gRPC[HandleAggData] 归还对象到连接池失败: %v", err)
}
}()
// 进行 RPC 调用
request := &pb.HandleDataRequest{
Id: id,
Messages: messages,
}
startTime := time.Now()
_, err = rpcPoolObj.Client.HandleAggData(ctx, request)
duration := time.Since(startTime)
if err != nil {
log.Printf("调用失败。gRPC[HandleAggData] 错误: %v, 耗时: %v", err, duration)
return fmt.Errorf("调用失败。gRPC[HandleAggData] 错误: %w", err)
}
//log.Printf("调用成功。gRPC[HandleAggData] resp=%+v, 耗时: %v", resp, duration)
return nil
}

58
master/node_manager/node_manager.go

@ -0,0 +1,58 @@
package node_manager
import (
"et_rpc"
"log"
)
// NodeManager 和 rpcPool 提供了高效的节点管理和 RPC 连接管理功能,支持高并发和连接复用,提升系统性能和稳定性。
type NodeManager struct {
loadBalancer *LoadBalancer
}
func NewNodeManager(lb *LoadBalancer) *NodeManager {
return &NodeManager{
loadBalancer: lb,
}
}
func (m *NodeManager) AddNode(args *et_rpc.NodeArgs) error {
nodeConn, err := NewNodeConnection(args)
if err != nil {
log.Printf("添加Node节点失败:%s\n", err)
} else {
m.loadBalancer.AddNode(nodeConn)
log.Printf("添加Node节点: %s\n", args.Addr)
}
nodeConn.rpcPool.GetNumIdle()
log.Printf("master共有 %d 个节点", m.NodesCount())
for _, node := range m.loadBalancer.nodes {
log.Printf("master -> Node[%s] 的空闲连接有 %d 个。", node.Addr, node.rpcPool.GetNumIdle())
}
return err
}
func (m *NodeManager) RemoveNode(addr string) bool {
return m.loadBalancer.RemoveNode(addr)
//log.Printf("删除Node节点: %s\n", addr)
}
// UpdateNode 更新节点信息,isError 立即更新
func (m *NodeManager) UpdateNode(nodeArgs *et_rpc.NodeArgs, isError bool) error {
return m.loadBalancer.UpdateNode(nodeArgs, isError)
}
func (m *NodeManager) NodeExists(nodeAddr string) bool {
return m.loadBalancer.NodeExists(nodeAddr)
}
func (m *NodeManager) GetNodeConnection() (*NodeConnection, error) {
return m.loadBalancer.SelectNode()
}
func (m *NodeManager) NodesCount() int {
return len(m.loadBalancer.nodes)
}

56
master/node_manager/node_selector.go

@ -0,0 +1,56 @@
package node_manager
import (
"fmt"
"sync/atomic"
)
type INodeSelector interface {
Select(nodes []*NodeConnection) (*NodeConnection, error)
}
type RoundRobinSelector struct {
index int32
}
func (s *RoundRobinSelector) Select(nodes []*NodeConnection) (*NodeConnection, error) {
if len(nodes) == 0 {
return nil, fmt.Errorf("没有可用的节点")
}
// 原子读取当前索引
currentIndex := atomic.LoadInt32(&s.index)
selectedNode := nodes[currentIndex%int32(len(nodes))]
// TODO 检查节点状态, 暂时先不检查节点健康状态
s.UpdateIndex() // 如果节点健康,更新索引
if selectedNode == nil {
return nil, fmt.Errorf("无此索引的节点。%d", currentIndex)
}
return selectedNode, nil
//if selectedNode.NArgs.Status == et_rpc.NodeState_Healthy {
// s.UpdateIndex() // 如果节点健康,更新索引
// return selectedNode, nil
//}
// 如果当前节点不健康,尝试查找下一个健康节点
//for i := 1; i < len(nodes); i++ { // 从下一个节点开始查找
// nextIndex := (currentIndex + int32(i)) % int32(len(nodes))
// selectedNode = nodes[nextIndex]
// if selectedNode.NArgs.Status == et_rpc.NodeState_Healthy {
// s.UpdateIndex() // 找到健康节点,更新索引
// return selectedNode, nil
// }
//}
//
//// 如果没有健康节点,重置索引并返回错误
//atomic.StoreInt32(&s.index, 0)
//return nil, fmt.Errorf("所有节点都不健康")
}
// 更新索引的单独方法
func (s *RoundRobinSelector) UpdateIndex() {
atomic.AddInt32(&s.index, 1) // 原子增加索引
}
Loading…
Cancel
Save