You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
186 lines
5.3 KiB
186 lines
5.3 KiB
package node_manager
|
|
|
|
import (
|
|
"context"
|
|
"et_rpc"
|
|
"et_rpc/pb"
|
|
"fmt"
|
|
"gitea.anxinyun.cn/container/common_models"
|
|
pool "github.com/jolestar/go-commons-pool"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type NodeConnection struct {
|
|
Addr string
|
|
NArgs *et_rpc.NodeArgs
|
|
rpcPool *pool.ObjectPool
|
|
lastUpdate time.Time // 节点信息更新时间
|
|
ctx context.Context
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewNodeConnection 创建一个 NodeConnection
|
|
// TODO NewNodeConnection从配置文件中获取 pool 参数
|
|
func NewNodeConnection(args *et_rpc.NodeArgs) (*NodeConnection, error) {
|
|
ctx := context.Background()
|
|
factory := NewGRPCClientFactory(args.Addr)
|
|
p := pool.NewObjectPoolWithDefaultConfig(ctx, factory)
|
|
p.Config.MaxTotal = 400
|
|
p.Config.MinIdle = 200
|
|
p.Config.TestOnBorrow = true
|
|
p.Config.TestOnReturn = false
|
|
p.Config.TestWhileIdle = true // 是否在空闲时检查连接有效性
|
|
p.Config.MinEvictableIdleTime = 30 * time.Minute //空闲连接最小可驱逐时间
|
|
//p.Config.SoftMinEvictableIdleTime = 15 * time.Minute //空闲连接软最小可驱逐时间
|
|
|
|
nodeConn := &NodeConnection{
|
|
ctx: ctx,
|
|
Addr: args.Addr,
|
|
rpcPool: p,
|
|
NArgs: args,
|
|
}
|
|
|
|
// 获取连接进行简单的测试
|
|
obj, err := nodeConn.rpcPool.BorrowObject(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("建立RPC连接失败:%w", err)
|
|
}
|
|
defer nodeConn.rpcPool.ReturnObject(ctx, obj)
|
|
|
|
grpcPoolObj, ok := obj.(*GRPCPoolObject)
|
|
if !ok {
|
|
log.Fatalf("类型断言失败,obj 不是 *GRPCPoolObject 类型")
|
|
}
|
|
|
|
// 健康检查
|
|
healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn)
|
|
resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{
|
|
Service: "NodeService",
|
|
})
|
|
|
|
if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING {
|
|
return nil, fmt.Errorf("健康检查失败: %v, 状态: %v", err, resp.Status)
|
|
}
|
|
|
|
return nodeConn, nil
|
|
}
|
|
|
|
func (n *NodeConnection) GetHealthStatus(args *common_models.NodeArgs) et_rpc.NodeState {
|
|
// TODO CPU/Memory/Disk 使用是否超过阈值
|
|
//resourcesIsOver := true
|
|
|
|
return et_rpc.NodeState_Healthy
|
|
|
|
// 荷载是否超过阈值、访问RPC失败
|
|
//if args.Load > Node_Load_Threshold || args.Status == et_rpc.NodeState_Unhealthy {
|
|
// return et_rpc.NodeState_Unhealthy
|
|
//} else {
|
|
// return et_rpc.NodeState_Healthy
|
|
//}
|
|
}
|
|
|
|
// 更新节点信息
|
|
//func (n *NodeConnection) UpdateNodeArgs(args *et_rpc.NodeArgs, forceUpdate bool) {
|
|
// n.mu.Lock()
|
|
// defer n.mu.Unlock()
|
|
//
|
|
// // 检查是否需要更新节点信息
|
|
// isOverThreshold := abs(n.NArgs.Load-args.Load) > Node_Load_Change_Threshold // 荷载变化超阈值
|
|
// isTimeout := time.Since(n.lastUpdate) > Node_Refresh_Interval // 超刷新间隔
|
|
//
|
|
// if forceUpdate || isOverThreshold || isTimeout {
|
|
// // 更新节点信息
|
|
// n.NArgs.Load = args.Load
|
|
// n.NArgs.Status = n.GetHealthStatus(args)
|
|
// n.lastUpdate = time.Now()
|
|
// }
|
|
//}
|
|
|
|
func (n *NodeConnection) CallHandleIotaData(id string, messages []string) error {
|
|
// 创建新的上下文并设置超时
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
defer cancel()
|
|
|
|
// 从连接池中借用一个连接
|
|
obj1, err := n.rpcPool.BorrowObject(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("gRPC[HandleIotaData] 借用对象错误: %w", err)
|
|
}
|
|
|
|
// 使用连接相关处理
|
|
rpcPoolObj, ok := obj1.(*GRPCPoolObject)
|
|
if !ok {
|
|
log.Fatalf("类型断言失败,obj1 不是 *GRPCPoolObject 类型")
|
|
}
|
|
|
|
defer func() {
|
|
if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil {
|
|
log.Printf("gRPC[HandleIotaData] 归还对象到连接池失败: %v", err)
|
|
}
|
|
}()
|
|
|
|
// 进行 RPC 调用
|
|
request := &pb.HandleDataRequest{
|
|
Id: id,
|
|
Messages: messages,
|
|
}
|
|
|
|
startTime := time.Now()
|
|
_, err = rpcPoolObj.Client.HandleIotaData(ctx, request)
|
|
duration := time.Since(startTime)
|
|
|
|
if err != nil {
|
|
log.Printf("调用失败。gRPC[HandleIotaData] 错误: %v, 耗时: %v", err, duration)
|
|
return fmt.Errorf("调用失败。gRPC[HandleIotaData] 错误: %w", err)
|
|
}
|
|
|
|
//log.Printf("调用成功。gRPC[HandleIotaData] resp=%+v, 耗时: %v", resp, duration)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (n *NodeConnection) CallHandleAggData(id string, messages []string) error {
|
|
// 创建新的上下文并设置超时
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
defer cancel()
|
|
|
|
// 从连接池中借用一个连接
|
|
obj1, err := n.rpcPool.BorrowObject(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("gRPC[HandleAggData] 借用对象错误: %w", err)
|
|
}
|
|
|
|
// 使用连接相关处理
|
|
rpcPoolObj, ok := obj1.(*GRPCPoolObject)
|
|
if !ok {
|
|
log.Fatalf("类型断言失败,obj1 不是 *GRPCPoolObject 类型")
|
|
}
|
|
|
|
defer func() {
|
|
if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil {
|
|
log.Printf("gRPC[HandleAggData] 归还对象到连接池失败: %v", err)
|
|
}
|
|
}()
|
|
|
|
// 进行 RPC 调用
|
|
request := &pb.HandleDataRequest{
|
|
Id: id,
|
|
Messages: messages,
|
|
}
|
|
|
|
startTime := time.Now()
|
|
_, err = rpcPoolObj.Client.HandleAggData(ctx, request)
|
|
duration := time.Since(startTime)
|
|
|
|
if err != nil {
|
|
log.Printf("调用失败。gRPC[HandleAggData] 错误: %v, 耗时: %v", err, duration)
|
|
return fmt.Errorf("调用失败。gRPC[HandleAggData] 错误: %w", err)
|
|
}
|
|
|
|
//log.Printf("调用成功。gRPC[HandleAggData] resp=%+v, 耗时: %v", resp, duration)
|
|
|
|
return nil
|
|
}
|
|
|