package app import ( "context" "et_rpc" "et_rpc/pb" "fmt" pool "github.com/jolestar/go-commons-pool" "google.golang.org/grpc/health/grpc_health_v1" "log" "sync" "time" ) type MasterConnection struct { MasterAddr string Addr string NArgs *et_rpc.NodeArgs rpcPool *pool.ObjectPool lastUpdate time.Time // 节点信息更新时间 ctx context.Context mu sync.Mutex factory *MasterGRPCClientFactory } // TODO NewMasterConnection 从配置文件中获取 pool 参数 func NewMasterConnection(masterAddr string) (*MasterConnection, error) { ctx := context.Background() factory := NewMasterGRPCClientFactory(masterAddr) p := pool.NewObjectPoolWithDefaultConfig(ctx, factory) p.Config.MaxTotal = 10 // 最大连接数 p.Config.MaxIdle = 5 // 最大空闲连接数据 p.Config.MinIdle = 1 // 最小空闲连接数据 p.Config.TestOnBorrow = true p.Config.TestOnReturn = false p.Config.TestWhileIdle = true // 是否在空闲时检查连接有效性 p.Config.MinEvictableIdleTime = 30 * time.Minute //空闲连接最小可驱逐时间 //p.Config.SoftMinEvictableIdleTime = 15 * time.Minute //空闲连接软最小可驱逐时间 conn := &MasterConnection{ ctx: ctx, MasterAddr: masterAddr, rpcPool: p, } // 获取连接进行简单的测试 obj, err := conn.rpcPool.BorrowObject(ctx) if err != nil { return nil, fmt.Errorf("建立RPC连接失败:%w", err) } defer conn.rpcPool.ReturnObject(ctx, obj) grpcPoolObj, ok := obj.(*MasterGRPCPoolObject) if !ok { log.Fatalf("类型断言失败,obj 不是 *MasterGRPCPoolObject 类型") } // 健康检查 healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn) resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{ Service: "MasterService", }) if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { return nil, fmt.Errorf("健康检查失败: %v, 状态: %v", err, resp.Status) } conn.factory = factory return conn, nil } func (n *MasterConnection) BorrowValidConnection(ctx context.Context) (*pool.PooledObject, error) { var obj1 interface{} var err error // 尝试借用连接,最多重试 3 次 for attempts := 0; attempts < 3; attempts++ { obj1, err = n.rpcPool.BorrowObject(ctx) if err == nil { break } log.Printf("Attempt %d: Failed to borrow object from pool: %v", attempts+1, err) time.Sleep(1 * time.Second) } if err != nil { return nil, fmt.Errorf("borrow object error after 3 attempts: %w", err) } pooledObject, ok := obj1.(*pool.PooledObject) if !ok { return nil, log.Output(2, "Invalid object type from pool") // 类型不匹配,返回错误 } if !n.factory.ValidateObject(ctx, pooledObject) { err = n.factory.DestroyObject(ctx, pooledObject) if err != nil { return nil, err } obj1, err = n.factory.MakeObject(ctx) if err != nil { return nil, err } pooledObject, ok = obj1.(*pool.PooledObject) if !ok { return nil, log.Output(2, "Invalid object type from pool after recreation") // 类型不匹配,返回错误 } } return pooledObject, nil } func (n *MasterConnection) CallRegister(nodeInfo *et_rpc.NodeArgs) error { n.NArgs = nodeInfo // 创建新的上下文并设置超时 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() // 从连接池中借用一个连接 obj1, err := n.rpcPool.BorrowObject(ctx) if err != nil { return fmt.Errorf("gRPC[CallRegister] 借用对象错误: %w", err) } // 使用连接相关处理 rpcPoolObj := obj1.(*MasterGRPCPoolObject) defer func() { if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { log.Printf("gRPC[CallRegister] 归还对象到连接池失败: %v", err) } }() // 进行 RPC 调用 request := &pb.NodeRequest{ Id: fmt.Sprintf("master-%s", n.MasterAddr), Address: nodeInfo.Addr, ThingIds: make([]string, 0), } resp, err := rpcPoolObj.Client.RegisterNode(ctx, request) if err != nil { return fmt.Errorf("调用 gRPC[CallRegister] 错误: %w", err) } log.Printf("调用 gRPC[CallRegister] resp=%+v, err=%+v\n", resp, err) // 归还连接 //err = n.rpcPool.ReturnObject(ctx, obj1) //if err != nil { // log.Printf("归还对象到连接池失败: %v", err) // return fmt.Errorf("归还对象错误: %w", err) //} return nil } func (n *MasterConnection) CallUnregister() error { // 创建新的上下文并设置超时 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() // 从连接池中借用一个连接 obj1, err := n.rpcPool.BorrowObject(ctx) if err != nil { return fmt.Errorf("gRPC[CallUnregister] 借用对象错误: %w", err) } // 使用连接相关处理 rpcPoolObj := obj1.(*MasterGRPCPoolObject) defer func() { if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { log.Printf("gRPC[CallUnregister] 归还对象到连接池失败: %v", err) } }() // 进行 RPC 调用 request := &pb.NodeRequest{ Id: "", Address: n.Addr, ThingIds: make([]string, 0), } resp, err := rpcPoolObj.Client.UnregisterNode(ctx, request) if err != nil { return fmt.Errorf("调用 gRPC[CallUnregister] 错误: %w", err) } log.Printf("调用 gRPC[CallUnregister] resp=%+v, err=%+v\n", resp, err) // 归还连接 err = n.rpcPool.ReturnObject(ctx, obj1) if err != nil { log.Printf("归还对象到连接池失败: %v", err) return fmt.Errorf("归还对象错误: %w", err) } return nil } func (n *MasterConnection) CallHeartbeatNode(nodeAddr string) error { // 创建新的上下文并设置超时 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() // 从连接池中借用一个连接 obj1, err := n.rpcPool.BorrowObject(ctx) if err != nil { return fmt.Errorf("gRPC[CallHeartbeatNode] 借用对象错误: %w", err) } // 使用连接相关处理 rpcPoolObj := obj1.(*MasterGRPCPoolObject) defer func() { if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { log.Printf("gRPC[CallHeartbeatNode] 归还对象到连接池失败: %v", err) } log.Printf("gRPC[CallHeartbeatNode] 已归还对象 obj1 。") }() // 进行 RPC 调用 request := &pb.NodeRequest{ Id: "", Address: nodeAddr, ThingIds: make([]string, 0), } resp, err := rpcPoolObj.Client.HeartbeatNode(ctx, request) if err != nil { return fmt.Errorf("调用 gRPC[CallHeartbeatNode] 错误: %w", err) } log.Printf("调用 gRPC[CallHeartbeatNode] resp=%+v, err=%+v\n", resp, err) // 归还连接 //err = n.rpcPool.ReturnObject(ctx, obj1) //if err != nil { // log.Printf("归还对象到连接池失败: %v", err) // return fmt.Errorf("归还对象错误: %w", err) //} return nil }