et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

137 lines
3.8 KiB

package app
import (
"context"
"et_rpc"
"et_rpc/pb"
"fmt"
"log"
"master/node_manager"
)
// 实现 gRPC 服务接口
type MasterRPCService struct {
pb.UnimplementedMasterServiceServer
nodeManager *node_manager.NodeManager // 用于存储节点信息
}
func NewMasterRPCService(nodeManager *node_manager.NodeManager) *MasterRPCService {
return &MasterRPCService{
nodeManager: nodeManager,
}
}
// 实现 RegisterNode 方法
func (s *MasterRPCService) RegisterNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) {
// 创建响应对象
response := &pb.RpcResponse{
Status: pb.RpcResponse_SUCCESS,
ErrorMessage: "",
}
// 检查请求是否有效
if req == nil {
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "节点注册失败:req 为 nil")
}
// 添加节点
nodeArgs := &et_rpc.NodeArgs{
ID: req.Id,
Addr: req.Address,
Load: 0,
ResourceJson: "",
Weight: 0,
Status: 0,
ErrCode: 0,
ErrMessage: "",
}
if err := s.nodeManager.AddNode(nodeArgs); err != nil {
msg := fmt.Sprintf("[%s]节点注册失败:%s", nodeArgs.Addr, err.Error())
return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg)
}
response.Status = pb.RpcResponse_SUCCESS
response.ErrorMessage = fmt.Sprintf("[%s]节点注册成功!!", req.Address)
log.Printf(response.ErrorMessage)
return response, nil
}
// 实现 HeartbeatNode 方法
func (s *MasterRPCService) HeartbeatNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) {
// 创建响应对象
response := &pb.RpcResponse{
Status: pb.RpcResponse_SUCCESS,
ErrorMessage: "",
}
// 检查请求是否有效
if req == nil {
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil")
}
// 尝试更新节点状态
if !s.nodeManager.NodeExists(req.Address) {
msg := fmt.Sprintf("未注册的节点: %s", req.Address)
return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg)
}
log.Printf("收到 Node[%s] 心跳", req.Address)
response.Status = pb.RpcResponse_SUCCESS
return response, nil
}
// 实现 UnregisterNode 方法
func (s *MasterRPCService) UnregisterNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) {
// 创建响应对象
response := &pb.RpcResponse{
Status: pb.RpcResponse_SUCCESS,
ErrorMessage: "",
}
// 检查请求是否有效
if req == nil {
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil")
}
// 尝试更新节点状态
if !s.nodeManager.RemoveNode(req.Address) {
log.Printf("删除节点Node[%s],节点不存在", req.Address)
//return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg)
}
log.Printf("节点Node[%s]已删除", req.Address)
response.Status = pb.RpcResponse_SUCCESS
return response, nil
}
// 实现 CheckMasterStatus 方法
func (s *MasterRPCService) CheckMasterStatus(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) {
// 创建响应对象
response := &pb.RpcResponse{
Status: pb.RpcResponse_SUCCESS,
ErrorMessage: "",
}
// 检查请求是否有效
if req == nil {
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil")
}
// 记录主节点状态检查信息
log.Printf("主节点状态被节点检查: ID=%s", req.Address)
return response, nil
}
// mustEmbedUnimplementedMasterServiceServer 是一个占位符方法
func (s *MasterRPCService) mustEmbedUnimplementedMasterServiceServer() {}
// createErrorResponse 用于创建错误响应
func (s *MasterRPCService) createErrorResponse(status pb.RpcResponse_Status, message string) (*pb.RpcResponse, error) {
response := &pb.RpcResponse{
Status: status,
ErrorMessage: message,
}
log.Printf(message) // 记录错误信息
return response, fmt.Errorf(message)
}