diff --git a/master/app/app.go b/master/app/app.go index 51bb52f..b82c512 100644 --- a/master/app/app.go +++ b/master/app/app.go @@ -1,8 +1,8 @@ package app import ( - "dataSource/kafka" "log" + "time" ) func init() { @@ -10,16 +10,25 @@ func init() { } func Start() { // 启动 master 服务 - master := NewEtMaster() - go master.RegisterListen() + master := NewETMaster() + go master.StartRPCServer() + + // 设置 Kafka 消费者配置信息 + master.InitKafkaDataSource() + //等待node注册 master.WaitNodeRegister() println("=======") - // -> 源数据 - kafkaDataSource := kafka.NewKafkaDataSource() - go kafkaDataSource.Producer() + // 发布数据 + go master.AggDataPublishing() + go master.RawDataPublishing() + time.Sleep(2 * time.Second) + + // Kafka 数据消费与处理 + go master.dataSource.RawDataProducer() + go master.dataSource.AggDataProducer() - // 将源数据 -> 各类型节点处理 - master.DistributeData(kafkaDataSource.DataChannels) + // 监控系统关闭 + master.MonitorShutdown() } diff --git a/master/app/et_master.go b/master/app/et_master.go index 4f9db91..48c0c89 100644 --- a/master/app/et_master.go +++ b/master/app/et_master.go @@ -1,403 +1,398 @@ package app import ( - "dataSource" - "encoding/gob" "errors" - "et_prometheus_exporter" + "et_rpc/pb" "fmt" - "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" + "github.com/panjf2000/ants/v2" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "log" - "math" + "master/data_source" + "master/node_manager" "net" - "net/rpc" - "strings" + "os" + "os/signal" "sync" + "sync/atomic" + "syscall" "time" ) -type EtMaster struct { - nodeMap sync.Map - exporter et_prometheus_exporter.PrometheusExporter - sleepCH chan bool +type SendStatus struct { + inProgressCount int32 // 正在处理的消息计数 + limitThreshold int32 // 限流阈值 + receiving int32 // 是否接收消息(1表示接收,0表示暂停) } -func NewEtMaster() *EtMaster { - master := EtMaster{ - exporter: et_prometheus_exporter.NewPrometheusExporter(), - sleepCH: make(chan bool, 1), - } - return &master -} +// ETMaster 管理 Master 的核心逻辑 +type ETMaster struct { + nodeManager *node_manager.NodeManager + grpcServer *grpc.Server + masterRPCService *MasterRPCService -type NodeRpc struct { - args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数 - resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果 - aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数 - client *rpc.Client + dataSource *data_source.KafkaDataSource + aggDataHandlers sync.Map // 聚合数据处理者 + rawDataHandlers sync.Map // 原始数据处理者 + aggSendStatus SendStatus // 聚合数据发送状态 + rawSendStatus SendStatus // 原始数据发送状态 + + errRawChan chan []string + errMessagesKafkaProducer *data_source.KafkaProducer // Kafka 生产者,用于发送失败的消息 } -// RegisterListen 启动 master RPC服务 -func (the *EtMaster) RegisterListen() { - //监听 - err := rpc.RegisterName("master", the) - if err != nil { - log.Println("master 提供注册服务异常") - return +// 创建 ETMaster 实例 +func NewETMaster() *ETMaster { + lb := node_manager.NewLoadBalancer(&node_manager.RoundRobinSelector{}) + nodeManager := node_manager.NewNodeManager(lb) + + grpcServer := grpc.NewServer() + masterRPCService := NewMasterRPCService(nodeManager) + pb.RegisterMasterServiceServer(grpcServer, masterRPCService) + + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + healthServer.SetServingStatus("MasterService", grpc_health_v1.HealthCheckResponse_SERVING) + + return &ETMaster{ + nodeManager: nodeManager, + grpcServer: grpcServer, + masterRPCService: masterRPCService, } +} +func (mm *ETMaster) StartRPCServer() { port := configLoad.LoadConfig().GetUint16("master.port") - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - log.Panic("master 启动 node服务注册功能异常") + log.Panicf("启动 Master RPC 服务失败: %v", err) } - log.Printf("master 启动 node服务注册功能 :%d", port) - for { - //log.Println("master 监听新注册链接") - conn, err := listener.Accept() - if err != nil { - log.Println("master rpc Accept异常") + defer func() { + if err := listener.Close(); err != nil { + log.Printf("关闭监听器失败: %v", err) } - log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr()) - go rpc.ServeConn(conn) + }() + log.Printf("启动 Master RPC 服务成功,服务端口:%d", port) + + // 启动 gRPC 服务器 + if err := mm.grpcServer.Serve(listener); err != nil { + log.Panicf("gRPC 服务器服务失败: %v", err) } } -// DistributeData 分发数据。 -// 监听两个数据通道RawDataChan和AggDataChan,根据不同类型的数据通道接收到的数据,调用notifyData方法进行相应的处理操作。 -func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { - //数据类型注册 - gob.Register([]interface{}{}) - for { - log.Println("L74 nodeCount: %d", the.nodeMapCount()) - if the.nodeMapCount() == 0 { - log.Printf("nodeList is empty!") - time.Sleep(time.Second * 10) - continue - } +// 初始化 Kafka 数据源 +func (mm *ETMaster) InitKafkaDataSource() { + ds := data_source.NewKafkaDataSource() // 加载 kafka 相关的配置 - select { - case stopEnable := <-the.sleepCH: - log.Println("L83 nodeCount: %d", the.nodeMapCount()) - if stopEnable { - stopTime := time.Second * 10 - log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime) - time.Sleep(stopTime) - } else { - log.Printf("node 处理积压,%v,不正常空数据", stopEnable) - } - default: + // 创建 kafka 生产者实例 + producer, err := data_source.NewKafkaProducer(ds.Brokers) + if err != nil { + log.Fatalf("创建 Kafka 生产者失败: %v", err) + } + mm.errMessagesKafkaProducer = producer + + // 设置 rawData 的处理者,每个分区一个处理者 + if ds.Master_kafkaConsumer_config.RawData != nil { + topicCfg := ds.Topics["data_raw"] + for partId := 0; partId < topicCfg.Partitions; partId++ { + key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) + dataHandler := data_source.NewRawDataHandler(key, topicCfg.Topic, partId) + mm.rawDataHandlers.Store(key, dataHandler) } - select { - case data := <-dataChannels.RawDataChan: - log.Println("L96 nodeCount: %d", the.nodeMapCount()) - the.notifyData(&data, the.callNodeService) - case data := <-dataChannels.AggDataChan: - log.Println("L99 nodeCount: %d", the.nodeMapCount()) - the.notifyData(&data, the.callNodeService) - //default: - // time.Sleep(100 * time.Millisecond) + // 发送失败的消息存入 DLP_DATA_RAW 主题) + dlpKey := "DLP_DATA_RAW" + mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0)) + } + + // 设置 aggData 的处理者,每个分区一个处理者 + if ds.Master_kafkaConsumer_config.AggData != nil { + topicCfg := ds.Topics["data_agg"] + for partId := 0; partId < topicCfg.Partitions; partId++ { + key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) + dataHandler := data_source.NewAggDataHandler(key, topicCfg.Topic, partId) + mm.aggDataHandlers.Store(key, dataHandler) } + + // 发送失败的消息存入 DLP_DATA_AGG 主题 + dlpKey := "DLP_DATA_AGG" + mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0)) + } + + ds.RawDataHandlers = &mm.rawDataHandlers + ds.AggDataHandlers = &mm.aggDataHandlers + mm.dataSource = ds +} + +// 等待节点注册 +func (mm *ETMaster) WaitNodeRegister() { + log.Println("==== 等待 Node 注册 ====") + for mm.masterRPCService.nodeManager.NodesCount() == 0 { + time.Sleep(time.Second * 10) } } -func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { - thingId := data.GetThingId() - isMatch := false - the.nodeMap.Range(func(address, value interface{}) bool { - if nodePtr, ok := value.(*NodeRpc); ok { - if nodePtr != nil { - if contains(nodePtr.args.ThingIds, thingId) { - isMatch = true - go callNodeFunc(nodePtr, data) - return false + +// AggDataPublishing 发布聚合数据 +func (mm *ETMaster) AggDataPublishing() { + concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50 + mm.initSendStatus(&mm.aggSendStatus, concurrency) + go mm.monitorSendStatus(&mm.aggSendStatus, "aggSendStatus") + mm.startDataPublishing(&mm.aggDataHandlers, "AggData", mm.sendAggData, &mm.aggSendStatus) +} + +// RawDataPublishing 发布原始数据 +func (mm *ETMaster) RawDataPublishing() { + concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50 + mm.initSendStatus(&mm.rawSendStatus, concurrency) + go mm.monitorSendStatus(&mm.rawSendStatus, "rawSendStatus") + mm.startDataPublishing(&mm.rawDataHandlers, "RawData", mm.sendRawData, &mm.rawSendStatus) +} + +// initSendStatus 初始化发送状态 +func (mm *ETMaster) initSendStatus(status *SendStatus, threshold int32) { + status.limitThreshold = threshold + atomic.StoreInt32(&status.receiving, 1) +} + +// startDataPublishing 启动数据发布 +func (mm *ETMaster) startDataPublishing(handlers *sync.Map, handlerType string, sendFunc func(string, []string) error, status *SendStatus) { + // 创建一个 Goroutine 池,最大并发数为 500 + pool, err := ants.NewPool(500) + if err != nil { + log.Fatalf("创建 Goroutine 池失败: %v", err) + } + + var wg sync.WaitGroup + index := 0 + handlers.Range(func(key, value any) bool { + handler := value.(data_source.IMessageHandler) + dataChannel := handler.GetDataChannel() + log.Printf("启动[%s-Publishing]协程,Handler%d,dataChannel[%p] 容量:%d", handlerType, index, dataChannel, cap(dataChannel)) + + wg.Add(1) + go func(idx int) { + defer wg.Done() + + for { + // 检查是否暂停接收 + if atomic.LoadInt32(&status.receiving) == 0 { + log.Printf("%sHandler%d: 接收已暂停,等待未完成的消息处理", handlerType, idx) + time.Sleep(100 * time.Millisecond) + continue + } + + select { + case d, ok := <-dataChannel: // 检查 dataChannel 是否已关闭 + if !ok { + log.Printf("%sHandler%d: dataChannel 已关闭,退出 Goroutine", handlerType, idx) + return // 退出 Goroutine + } + + data := d + atomic.AddInt32(&status.inProgressCount, 1) + log.Printf("[%s-Publishing] inProgressCount=%d. Handler%d 预备发送[%d]条数据,dataChannel[%p] 当前长度: %d/%d", + handlerType, atomic.LoadInt32(&status.inProgressCount), idx, len(data.Messages), dataChannel, len(dataChannel), cap(dataChannel)) + + // 使用 ants 提交任务 + poolErr := pool.Submit(func() { + startTime := time.Now() + defer atomic.AddInt32(&status.inProgressCount, -1) // 任务完成后减少计数 + + if err := sendFunc(data.Id, data.Messages); err != nil { + log.Printf("%sHandler%d: 发送数据失败: %v. 耗时:%v", handlerType, idx, err, time.Since(startTime)) + // 将失败数据发送到 Kafka(使用 Goroutine 池) + _ = pool.Submit(func() { + mm.errMessagesKafkaProducer.SendStringArrayMessage(fmt.Sprintf("DLP_%s", handlerType), data.Id, data.Messages) + }) + } else { + log.Printf("[%s-Publishing]协程,Handler%d 成功发送[%d]条数据。耗时:%v,dataChannel[%p] 当前长度: %d/%d", + handlerType, idx, len(data.Messages), time.Since(startTime), dataChannel, len(dataChannel), cap(dataChannel)) + } + }) + + if poolErr != nil { + log.Printf("%sHandler%d: 提交任务到 Goroutine 池失败: %v", handlerType, idx, poolErr) + atomic.AddInt32(&status.inProgressCount, -1) // 提交失败时减少计数 + } + + default: + // 如果 dataChannel 为空,则等待一段时间 + time.Sleep(10 * time.Millisecond) } } - } + }(index) + index++ return true }) - //无匹配触发 reBalance - if !isMatch { - nodePtr := the.getNodeWithMinThings() - if nodePtr != nil { - nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId) - log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr) - go callNodeFunc(nodePtr, data) - } - } + wg.Wait() + defer pool.Release() // 确保在函数结束时释放池 } -// callNodeService 调用 etNode 的RPC服务 -func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrace) { - if node.client == nil { - log.Printf("node [%v] client=nil", node.args) - return - } - - var serviceMethod = "" - var resultCH chan int - var v interface{} - - switch data.(type) { - case *common_models.IotaData: - v = data.(*common_models.IotaData) - the.exporter.OnIotaData2metricByPrometheus(data.(*common_models.IotaData)) - serviceMethod = "etNode.IotaDataHandler" - resultCH = node.resultCH - case *common_models.AggData: - v = data.(*common_models.AggData) - serviceMethod = "etNode.AggDataHandler" - resultCH = node.aggResultCH - default: - log.Printf("Unknown kafka data type:%v", v) - return - } - - log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v) +func (mm *ETMaster) sendRawData(thingId string, data []string) error { + dataLog := fmt.Sprintf("thingId[%s]共[%d]条数据。", thingId, len(data)) + //log.Printf("[RawData-Publishing][sendRawData]1.开始处理。%s", dataLog) - go func() { - defer timeCost(node.args.ID, data.Q(), time.Now()) - var reply bool - err := node.client.Call(serviceMethod, data, &reply) + var nodeConn *node_manager.NodeConnection + var err error + retry := 0 - result := boolToInt(reply) + // 尝试获取 NodeConnection + for retry < 3 { + startTime := time.Now() + nodeConn, err = mm.nodeManager.GetNodeConnection() + duration := time.Since(startTime) // 计算获取连接的耗时 + log.Printf("[sendRawData]1.获取 NodeConnection 耗时: %v", duration) if err != nil { - isAggParseErr := strings.Contains(err.Error(), "aggData非法数据") - log.Printf("master调用node异常。Error:%s", err.Error()) - if !isAggParseErr { - // rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host. - result = 2 - } + log.Printf("[sendRawData]1.获取 NodeConnection 失败,错误: %v", err) + //m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息 + //log.Println("============ Kafka 消费已暂停...") + retry++ + time.Sleep(time.Duration(2<= 200 { - the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T())) - } else { - //log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T()) - //log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v) - log.Printf("****** RPC[%s]node已处理的数据errorCode=%d ****** \n\n", serviceMethod, errorCode) + if err != nil || nodeConn == nil { + log.Printf("[sendRawData]1. 达到最大重试次数,无法获取健康节点连接,错误: %v", err) + return err } -} -// NodeRegister 是 RPC 服务方法,由 et_node 远程调用 -func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error { - node := &NodeRpc{ - args: nodeArgs, - resultCH: make(chan int, 1), - aggResultCH: make(chan int, 1), - client: nil, - } - //master 初始化 node client - client, err := rpc.Dial("tcp", nodeArgs.Addr) - if err != nil { - log.Printf("链接node失败-> node[%v]", nodeArgs.Addr) - return err + // 记录调用 Node.ProcessData 的时间 + //defer LogProcessDataTimeCost(nodeConn.NArgs.Addr, "[]aggData", time.Now()) + // RPC 调用 Node.ProcessData,传递 []*pb.AggData + resultChan := make(chan error, 1) + log.Printf("[sendRawData]2.开始调用 RPC[Node.HandleRawData] %s", dataLog) + callStartTime := time.Now() + callErr := nodeConn.CallHandleIotaData(thingId, data) + log.Printf("<--[sendRawData]3.RPC调用成功。耗时: %v,%s", time.Since(callStartTime), dataLog) + resultChan <- callErr + + // 设置超时 + select { + case callErr := <-resultChan: + if callErr != nil { + log.Printf("[sendRawData]4.RPC调用结束,错误: %+v,%s", callErr, dataLog) + return callErr + } + //log.Printf("[sendRawData]4.RPC调用成功") + case <-time.After(5 * time.Minute): // 设置超时 + log.Printf("[sendRawData]4.请求超过5分钟。%s", dataLog) + return errors.New("请求超时5m") } - node.client = client - the.addOrUpdate(nodeArgs.Addr, node) - log.Printf("node服务[%v] 注册成功", nodeArgs) - the.printNodes() - *reply = true return nil } -func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error { - if !the.clientIsValid(nodeArgs.Addr) { - log.Printf("收到-未注册的node[%v] 心跳", nodeArgs) - *reply = false - err := the.NodeRegister(nodeArgs, reply) +func (mm *ETMaster) sendAggData(structId string, data []string) error { + dataLog := fmt.Sprintf("structId[%s]共[%d]条数据。", structId, len(data)) + //log.Printf("[AggData-Publishing][sendAggData]1.开始处理。%s", dataLog) + + var nodeConn *node_manager.NodeConnection + var err error + retry := 0 + + for retry < 3 { + startTime := time.Now() + nodeConn, err = mm.nodeManager.GetNodeConnection() + duration := time.Since(startTime) // 计算获取连接的耗时 + log.Printf("[AggData-Publishing][sendAggData]2.获取 NodeConnection 耗时: %v", duration) + if err != nil { - return errors.New("未注册的node") - } else { - *reply = true - log.Printf("收到未注册的node[%v]心跳,master已将node重新注册。", nodeArgs) - return nil + log.Printf("[AggData-Publishing][sendAggData]2.1获取 NodeConnection 失败,错误: %v", err) + //m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息 + //log.Println("============ Kafka 消费已暂停...") + retry++ + time.Sleep(time.Duration(2< 0 { - break + inProgressCount := atomic.LoadInt32(&status.inProgressCount) + if inProgressCount > status.limitThreshold { + atomic.StoreInt32(&status.receiving, 0) + log.Printf("[%s] 未完成消息数量超过阈值,暂停接收新的消息。%+v\n", statusName, status) + } else { + atomic.StoreInt32(&status.receiving, 1) } - time.Sleep(time.Second * 10) + time.Sleep(500 * time.Millisecond) } } -func (the *EtMaster) ConnectNode() { - the.nodeMap.Range(func(key, value interface{}) bool { - node := value.(*NodeRpc) - nodeAddr := key.(string) +// MonitorShutdown 监控退出信号 +func (mm *ETMaster) MonitorShutdown() { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - if node.client == nil { - client, err := rpc.Dial("tcp", nodeAddr) - if err != nil { - log.Printf("链接node失败-> node[%v]", nodeAddr) - return true - } + sig := <-sigChan + log.Printf("************ 接收到信号: %s,正在关闭服务器...", sig) - node.client = client - the.nodeMap.Store(nodeAddr, node) - } + mm.closeDataHandlers(&mm.rawDataHandlers, "DLP_DATA_RAW") + mm.closeDataHandlers(&mm.aggDataHandlers, "DLP_DATA_AGG") - return true - }) + mm.errMessagesKafkaProducer.Close() + mm.grpcServer.GracefulStop() + log.Println("************ 服务器已成功关闭") } -func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) { - if val, ok := the.nodeMap.Load(key); ok { - hisNode := val.(*NodeRpc) - hisNode.client = newNode.client - the.nodeMap.Store(key, hisNode) - } else { - the.nodeMap.Store(key, newNode) - } -} -func (the *EtMaster) nodeMapCount() int { - count := 0 - the.nodeMap.Range(func(key, value interface{}) bool { - count++ - return true - }) - return count -} -func (the *EtMaster) clientIsValid(address string) bool { - val, ok := the.nodeMap.Load(address) - if !ok { - return false - } +// closeDataHandlers 关闭数据处理器 +func (mm *ETMaster) closeDataHandlers(handlers *sync.Map, dlpTopic string) { + handlers.Range(func(key, value any) bool { + handler := value.(data_source.IMessageHandler) + ch := handler.GetDataChannel() + close(ch) - if val.(*NodeRpc).client == nil { - return false - } - return true -} - -// 获取最少things的节点 -func (the *EtMaster) getNodeWithMinThings() *NodeRpc { - var minNode *NodeRpc - minThings := math.MaxInt64 // 初始化为最大值 - - the.nodeMap.Range(func(key, value interface{}) bool { - node := value.(*NodeRpc) - if len(node.args.ThingIds) < minThings { - minThings = len(node.args.ThingIds) - minNode = node + for data := range ch { + mm.errMessagesKafkaProducer.SendStringArrayMessage(dlpTopic, data.Id, data.Messages) } - return true }) - - return minNode -} -func (the *EtMaster) printNodes() { - count := 0 - info := "" - the.nodeMap.Range(func(key, value interface{}) bool { - count++ - node := value.(*NodeRpc) - info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) - return true - }) - countInfo := fmt.Sprintf("共[%d]个节点:\n ", count) - log.Printf("%s %s\n", countInfo, info) -} -func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) { - val, ok := the.nodeMap.Load(address) - if !ok { - log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address) - return - } - node := val.(*NodeRpc) - - //发送 stop 信号 - the.sleepCH <- true - log.Println("=============================================") - - // 100 故障:程序内部错误 - // 200 故障:网络通信问题 - // 300 故障:处理超时 - if errCode == 200 { - log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc) - time.Sleep(time.Second * 5) - the.nodeMap.Delete(address) - } else if errCode == 300 { - log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc) - time.Sleep(time.Second * 5) - the.nodeMap.Delete(address) - } - - the.printNodes() -} - -func contains(arr []string, target string) bool { - for _, value := range arr { - if value == target { - return true - } - } - return false -} -func timeCost(nodeId, deviceId string, start time.Time) { - tc := time.Since(start) - log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc) -} -func boolToInt(b bool) int { - if b { - return 1 - } - return 0 } diff --git a/master/app/master_rpc_service.go b/master/app/master_rpc_service.go new file mode 100644 index 0000000..0f649a1 --- /dev/null +++ b/master/app/master_rpc_service.go @@ -0,0 +1,137 @@ +package app + +import ( + "context" + "et_rpc" + "et_rpc/pb" + "fmt" + "log" + "master/node_manager" +) + +// 实现 gRPC 服务接口 +type MasterRPCService struct { + pb.UnimplementedMasterServiceServer + nodeManager *node_manager.NodeManager // 用于存储节点信息 +} + +func NewMasterRPCService(nodeManager *node_manager.NodeManager) *MasterRPCService { + return &MasterRPCService{ + nodeManager: nodeManager, + } +} + +// 实现 RegisterNode 方法 +func (s *MasterRPCService) RegisterNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { + // 创建响应对象 + response := &pb.RpcResponse{ + Status: pb.RpcResponse_SUCCESS, + ErrorMessage: "", + } + + // 检查请求是否有效 + if req == nil { + return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "节点注册失败:req 为 nil") + } + + // 添加节点 + nodeArgs := &et_rpc.NodeArgs{ + ID: req.Id, + Addr: req.Address, + Load: 0, + ResourceJson: "", + Weight: 0, + Status: 0, + ErrCode: 0, + ErrMessage: "", + } + + if err := s.nodeManager.AddNode(nodeArgs); err != nil { + msg := fmt.Sprintf("[%s]节点注册失败:%s", nodeArgs.Addr, err.Error()) + return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg) + } + + response.Status = pb.RpcResponse_SUCCESS + response.ErrorMessage = fmt.Sprintf("[%s]节点注册成功!!", req.Address) + log.Printf(response.ErrorMessage) + return response, nil +} + +// 实现 HeartbeatNode 方法 +func (s *MasterRPCService) HeartbeatNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { + // 创建响应对象 + response := &pb.RpcResponse{ + Status: pb.RpcResponse_SUCCESS, + ErrorMessage: "", + } + + // 检查请求是否有效 + if req == nil { + return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil") + } + + // 尝试更新节点状态 + if !s.nodeManager.NodeExists(req.Address) { + msg := fmt.Sprintf("未注册的节点: %s", req.Address) + return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg) + } + + log.Printf("收到 Node[%s] 心跳", req.Address) + response.Status = pb.RpcResponse_SUCCESS + return response, nil +} + +// 实现 UnregisterNode 方法 +func (s *MasterRPCService) UnregisterNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { + // 创建响应对象 + response := &pb.RpcResponse{ + Status: pb.RpcResponse_SUCCESS, + ErrorMessage: "", + } + + // 检查请求是否有效 + if req == nil { + return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil") + } + + // 尝试更新节点状态 + if !s.nodeManager.RemoveNode(req.Address) { + log.Printf("删除节点Node[%s],节点不存在", req.Address) + //return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg) + } + + log.Printf("节点Node[%s]已删除", req.Address) + response.Status = pb.RpcResponse_SUCCESS + return response, nil +} + +// 实现 CheckMasterStatus 方法 +func (s *MasterRPCService) CheckMasterStatus(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { + // 创建响应对象 + response := &pb.RpcResponse{ + Status: pb.RpcResponse_SUCCESS, + ErrorMessage: "", + } + + // 检查请求是否有效 + if req == nil { + return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil") + } + + // 记录主节点状态检查信息 + log.Printf("主节点状态被节点检查: ID=%s", req.Address) + return response, nil +} + +// mustEmbedUnimplementedMasterServiceServer 是一个占位符方法 +func (s *MasterRPCService) mustEmbedUnimplementedMasterServiceServer() {} + +// createErrorResponse 用于创建错误响应 +func (s *MasterRPCService) createErrorResponse(status pb.RpcResponse_Status, message string) (*pb.RpcResponse, error) { + response := &pb.RpcResponse{ + Status: status, + ErrorMessage: message, + } + log.Printf(message) // 记录错误信息 + return response, fmt.Errorf(message) +}