package app import ( "errors" "et_rpc/pb" "fmt" "gitea.anxinyun.cn/container/common_utils/configLoad" "github.com/panjf2000/ants/v2" "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "log" "master/data_source" "master/node_manager" "net" "os" "os/signal" "sync" "sync/atomic" "syscall" "time" ) type SendStatus struct { inProgressCount int32 // 正在处理的消息计数 limitThreshold int32 // 限流阈值 receiving int32 // 是否接收消息(1表示接收,0表示暂停) } // ETMaster 管理 Master 的核心逻辑 type ETMaster struct { nodeManager *node_manager.NodeManager grpcServer *grpc.Server masterRPCService *MasterRPCService dataSource *data_source.KafkaDataSource aggDataHandlers sync.Map // 聚合数据处理者 rawDataHandlers sync.Map // 原始数据处理者 aggSendStatus SendStatus // 聚合数据发送状态 rawSendStatus SendStatus // 原始数据发送状态 errRawChan chan []string errMessagesKafkaProducer *data_source.KafkaProducer // Kafka 生产者,用于发送失败的消息 } // 创建 ETMaster 实例 func NewETMaster() *ETMaster { lb := node_manager.NewLoadBalancer(&node_manager.RoundRobinSelector{}) nodeManager := node_manager.NewNodeManager(lb) grpcServer := grpc.NewServer() masterRPCService := NewMasterRPCService(nodeManager) pb.RegisterMasterServiceServer(grpcServer, masterRPCService) healthServer := health.NewServer() grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) healthServer.SetServingStatus("MasterService", grpc_health_v1.HealthCheckResponse_SERVING) return &ETMaster{ nodeManager: nodeManager, grpcServer: grpcServer, masterRPCService: masterRPCService, } } func (mm *ETMaster) StartRPCServer() { port := configLoad.LoadConfig().GetUint16("master.port") listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { log.Panicf("启动 Master RPC 服务失败: %v", err) } defer func() { if err := listener.Close(); err != nil { log.Printf("关闭监听器失败: %v", err) } }() log.Printf("启动 Master RPC 服务成功,服务端口:%d", port) // 启动 gRPC 服务器 if err := mm.grpcServer.Serve(listener); err != nil { log.Panicf("gRPC 服务器服务失败: %v", err) } } // 初始化 Kafka 数据源 func (mm *ETMaster) InitKafkaDataSource() { ds := data_source.NewKafkaDataSource() // 加载 kafka 相关的配置 // 创建 kafka 生产者实例 producer, err := data_source.NewKafkaProducer(ds.Brokers) if err != nil { log.Fatalf("创建 Kafka 生产者失败: %v", err) } mm.errMessagesKafkaProducer = producer // 设置 rawData 的处理者,每个分区一个处理者 if ds.Master_kafkaConsumer_config.RawData != nil { topicCfg := ds.Topics["data_raw"] for partId := 0; partId < topicCfg.Partitions; partId++ { key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) dataHandler := data_source.NewRawDataHandler(key, topicCfg.Topic, partId) mm.rawDataHandlers.Store(key, dataHandler) } // 发送失败的消息存入 DLP_DATA_RAW 主题) dlpKey := "DLP_DATA_RAW" mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0)) } // 设置 aggData 的处理者,每个分区一个处理者 if ds.Master_kafkaConsumer_config.AggData != nil { topicCfg := ds.Topics["data_agg"] for partId := 0; partId < topicCfg.Partitions; partId++ { key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) dataHandler := data_source.NewAggDataHandler(key, topicCfg.Topic, partId) mm.aggDataHandlers.Store(key, dataHandler) } // 发送失败的消息存入 DLP_DATA_AGG 主题 dlpKey := "DLP_DATA_AGG" mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0)) } ds.RawDataHandlers = &mm.rawDataHandlers ds.AggDataHandlers = &mm.aggDataHandlers mm.dataSource = ds } // 等待节点注册 func (mm *ETMaster) WaitNodeRegister() { log.Println("==== 等待 Node 注册 ====") for mm.masterRPCService.nodeManager.NodesCount() == 0 { time.Sleep(time.Second * 10) } } // AggDataPublishing 发布聚合数据 func (mm *ETMaster) AggDataPublishing() { concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50 mm.initSendStatus(&mm.aggSendStatus, concurrency) go mm.monitorSendStatus(&mm.aggSendStatus, "aggSendStatus") mm.startDataPublishing(&mm.aggDataHandlers, "AggData", mm.sendAggData, &mm.aggSendStatus) } // RawDataPublishing 发布原始数据 func (mm *ETMaster) RawDataPublishing() { concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50 mm.initSendStatus(&mm.rawSendStatus, concurrency) go mm.monitorSendStatus(&mm.rawSendStatus, "rawSendStatus") mm.startDataPublishing(&mm.rawDataHandlers, "RawData", mm.sendRawData, &mm.rawSendStatus) } // initSendStatus 初始化发送状态 func (mm *ETMaster) initSendStatus(status *SendStatus, threshold int32) { status.limitThreshold = threshold atomic.StoreInt32(&status.receiving, 1) } // startDataPublishing 启动数据发布 func (mm *ETMaster) startDataPublishing(handlers *sync.Map, handlerType string, sendFunc func(string, []string) error, status *SendStatus) { // 创建一个 Goroutine 池,最大并发数为 500 pool, err := ants.NewPool(500) if err != nil { log.Fatalf("创建 Goroutine 池失败: %v", err) } var wg sync.WaitGroup index := 0 handlers.Range(func(key, value any) bool { handler := value.(data_source.IMessageHandler) dataChannel := handler.GetDataChannel() log.Printf("启动[%s-Publishing]协程,Handler%d,dataChannel[%p] 容量:%d", handlerType, index, dataChannel, cap(dataChannel)) wg.Add(1) go func(idx int) { defer wg.Done() for { // 检查是否暂停接收 if atomic.LoadInt32(&status.receiving) == 0 { log.Printf("%sHandler%d: 接收已暂停,等待未完成的消息处理", handlerType, idx) time.Sleep(100 * time.Millisecond) continue } select { case d, ok := <-dataChannel: // 检查 dataChannel 是否已关闭 if !ok { log.Printf("%sHandler%d: dataChannel 已关闭,退出 Goroutine", handlerType, idx) return // 退出 Goroutine } data := d atomic.AddInt32(&status.inProgressCount, 1) log.Printf("[%s-Publishing] inProgressCount=%d. Handler%d 预备发送[%d]条数据,dataChannel[%p] 当前长度: %d/%d", handlerType, atomic.LoadInt32(&status.inProgressCount), idx, len(data.Messages), dataChannel, len(dataChannel), cap(dataChannel)) // 使用 ants 提交任务 poolErr := pool.Submit(func() { startTime := time.Now() defer atomic.AddInt32(&status.inProgressCount, -1) // 任务完成后减少计数 if err := sendFunc(data.Id, data.Messages); err != nil { log.Printf("%sHandler%d: 发送数据失败: %v. 耗时:%v", handlerType, idx, err, time.Since(startTime)) // 将失败数据发送到 Kafka(使用 Goroutine 池) _ = pool.Submit(func() { mm.errMessagesKafkaProducer.SendStringArrayMessage(fmt.Sprintf("DLP_%s", handlerType), data.Id, data.Messages) }) } else { log.Printf("[%s-Publishing]协程,Handler%d 成功发送[%d]条数据。耗时:%v,dataChannel[%p] 当前长度: %d/%d", handlerType, idx, len(data.Messages), time.Since(startTime), dataChannel, len(dataChannel), cap(dataChannel)) } }) if poolErr != nil { log.Printf("%sHandler%d: 提交任务到 Goroutine 池失败: %v", handlerType, idx, poolErr) atomic.AddInt32(&status.inProgressCount, -1) // 提交失败时减少计数 } default: // 如果 dataChannel 为空,则等待一段时间 time.Sleep(10 * time.Millisecond) } } }(index) index++ return true }) wg.Wait() defer pool.Release() // 确保在函数结束时释放池 } func (mm *ETMaster) sendRawData(thingId string, data []string) error { dataLog := fmt.Sprintf("thingId[%s]共[%d]条数据。", thingId, len(data)) //log.Printf("[RawData-Publishing][sendRawData]1.开始处理。%s", dataLog) var nodeConn *node_manager.NodeConnection var err error retry := 0 // 尝试获取 NodeConnection for retry < 3 { startTime := time.Now() nodeConn, err = mm.nodeManager.GetNodeConnection() duration := time.Since(startTime) // 计算获取连接的耗时 log.Printf("[sendRawData]1.获取 NodeConnection 耗时: %v", duration) if err != nil { log.Printf("[sendRawData]1.获取 NodeConnection 失败,错误: %v", err) //m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息 //log.Println("============ Kafka 消费已暂停...") retry++ time.Sleep(time.Duration(2< status.limitThreshold { atomic.StoreInt32(&status.receiving, 0) log.Printf("[%s] 未完成消息数量超过阈值,暂停接收新的消息。%+v\n", statusName, status) } else { atomic.StoreInt32(&status.receiving, 1) } time.Sleep(500 * time.Millisecond) } } // MonitorShutdown 监控退出信号 func (mm *ETMaster) MonitorShutdown() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sig := <-sigChan log.Printf("************ 接收到信号: %s,正在关闭服务器...", sig) mm.closeDataHandlers(&mm.rawDataHandlers, "DLP_DATA_RAW") mm.closeDataHandlers(&mm.aggDataHandlers, "DLP_DATA_AGG") mm.errMessagesKafkaProducer.Close() mm.grpcServer.GracefulStop() log.Println("************ 服务器已成功关闭") } // closeDataHandlers 关闭数据处理器 func (mm *ETMaster) closeDataHandlers(handlers *sync.Map, dlpTopic string) { handlers.Range(func(key, value any) bool { handler := value.(data_source.IMessageHandler) ch := handler.GetDataChannel() close(ch) for data := range ch { mm.errMessagesKafkaProducer.SendStringArrayMessage(dlpTopic, data.Id, data.Messages) } return true }) }