et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

398 lines
13 KiB

package app
import (
"errors"
"et_rpc/pb"
"fmt"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"github.com/panjf2000/ants/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"log"
"master/data_source"
"master/node_manager"
"net"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
type SendStatus struct {
inProgressCount int32 // 正在处理的消息计数
limitThreshold int32 // 限流阈值
receiving int32 // 是否接收消息(1表示接收,0表示暂停)
}
// ETMaster 管理 Master 的核心逻辑
type ETMaster struct {
nodeManager *node_manager.NodeManager
grpcServer *grpc.Server
masterRPCService *MasterRPCService
dataSource *data_source.KafkaDataSource
aggDataHandlers sync.Map // 聚合数据处理者
rawDataHandlers sync.Map // 原始数据处理者
aggSendStatus SendStatus // 聚合数据发送状态
rawSendStatus SendStatus // 原始数据发送状态
errRawChan chan []string
errMessagesKafkaProducer *data_source.KafkaProducer // Kafka 生产者,用于发送失败的消息
}
// 创建 ETMaster 实例
func NewETMaster() *ETMaster {
lb := node_manager.NewLoadBalancer(&node_manager.RoundRobinSelector{})
nodeManager := node_manager.NewNodeManager(lb)
grpcServer := grpc.NewServer()
masterRPCService := NewMasterRPCService(nodeManager)
pb.RegisterMasterServiceServer(grpcServer, masterRPCService)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
healthServer.SetServingStatus("MasterService", grpc_health_v1.HealthCheckResponse_SERVING)
return &ETMaster{
nodeManager: nodeManager,
grpcServer: grpcServer,
masterRPCService: masterRPCService,
}
}
func (mm *ETMaster) StartRPCServer() {
port := configLoad.LoadConfig().GetUint16("master.port")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Panicf("启动 Master RPC 服务失败: %v", err)
}
defer func() {
if err := listener.Close(); err != nil {
log.Printf("关闭监听器失败: %v", err)
}
}()
log.Printf("启动 Master RPC 服务成功,服务端口:%d", port)
// 启动 gRPC 服务器
if err := mm.grpcServer.Serve(listener); err != nil {
log.Panicf("gRPC 服务器服务失败: %v", err)
}
}
// 初始化 Kafka 数据源
func (mm *ETMaster) InitKafkaDataSource() {
ds := data_source.NewKafkaDataSource() // 加载 kafka 相关的配置
// 创建 kafka 生产者实例
producer, err := data_source.NewKafkaProducer(ds.Brokers)
if err != nil {
log.Fatalf("创建 Kafka 生产者失败: %v", err)
}
mm.errMessagesKafkaProducer = producer
// 设置 rawData 的处理者,每个分区一个处理者
if ds.Master_kafkaConsumer_config.RawData != nil {
topicCfg := ds.Topics["data_raw"]
for partId := 0; partId < topicCfg.Partitions; partId++ {
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
dataHandler := data_source.NewRawDataHandler(key, topicCfg.Topic, partId)
mm.rawDataHandlers.Store(key, dataHandler)
}
// 发送失败的消息存入 DLP_DATA_RAW 主题)
dlpKey := "DLP_DATA_RAW"
mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0))
}
// 设置 aggData 的处理者,每个分区一个处理者
if ds.Master_kafkaConsumer_config.AggData != nil {
topicCfg := ds.Topics["data_agg"]
for partId := 0; partId < topicCfg.Partitions; partId++ {
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
dataHandler := data_source.NewAggDataHandler(key, topicCfg.Topic, partId)
mm.aggDataHandlers.Store(key, dataHandler)
}
// 发送失败的消息存入 DLP_DATA_AGG 主题
dlpKey := "DLP_DATA_AGG"
mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0))
}
ds.RawDataHandlers = &mm.rawDataHandlers
ds.AggDataHandlers = &mm.aggDataHandlers
mm.dataSource = ds
}
// 等待节点注册
func (mm *ETMaster) WaitNodeRegister() {
log.Println("==== 等待 Node 注册 ====")
for mm.masterRPCService.nodeManager.NodesCount() == 0 {
time.Sleep(time.Second * 10)
}
}
// AggDataPublishing 发布聚合数据
func (mm *ETMaster) AggDataPublishing() {
concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50
mm.initSendStatus(&mm.aggSendStatus, concurrency)
go mm.monitorSendStatus(&mm.aggSendStatus, "aggSendStatus")
mm.startDataPublishing(&mm.aggDataHandlers, "AggData", mm.sendAggData, &mm.aggSendStatus)
}
// RawDataPublishing 发布原始数据
func (mm *ETMaster) RawDataPublishing() {
concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50
mm.initSendStatus(&mm.rawSendStatus, concurrency)
go mm.monitorSendStatus(&mm.rawSendStatus, "rawSendStatus")
mm.startDataPublishing(&mm.rawDataHandlers, "RawData", mm.sendRawData, &mm.rawSendStatus)
}
// initSendStatus 初始化发送状态
func (mm *ETMaster) initSendStatus(status *SendStatus, threshold int32) {
status.limitThreshold = threshold
atomic.StoreInt32(&status.receiving, 1)
}
// startDataPublishing 启动数据发布
func (mm *ETMaster) startDataPublishing(handlers *sync.Map, handlerType string, sendFunc func(string, []string) error, status *SendStatus) {
// 创建一个 Goroutine 池,最大并发数为 500
pool, err := ants.NewPool(500)
if err != nil {
log.Fatalf("创建 Goroutine 池失败: %v", err)
}
var wg sync.WaitGroup
index := 0
handlers.Range(func(key, value any) bool {
handler := value.(data_source.IMessageHandler)
dataChannel := handler.GetDataChannel()
log.Printf("启动[%s-Publishing]协程,Handler%d,dataChannel[%p] 容量:%d", handlerType, index, dataChannel, cap(dataChannel))
wg.Add(1)
go func(idx int) {
defer wg.Done()
for {
// 检查是否暂停接收
if atomic.LoadInt32(&status.receiving) == 0 {
log.Printf("%sHandler%d: 接收已暂停,等待未完成的消息处理", handlerType, idx)
time.Sleep(100 * time.Millisecond)
continue
}
select {
case d, ok := <-dataChannel: // 检查 dataChannel 是否已关闭
if !ok {
log.Printf("%sHandler%d: dataChannel 已关闭,退出 Goroutine", handlerType, idx)
return // 退出 Goroutine
}
data := d
atomic.AddInt32(&status.inProgressCount, 1)
log.Printf("[%s-Publishing] inProgressCount=%d. Handler%d 预备发送[%d]条数据,dataChannel[%p] 当前长度: %d/%d",
handlerType, atomic.LoadInt32(&status.inProgressCount), idx, len(data.Messages), dataChannel, len(dataChannel), cap(dataChannel))
// 使用 ants 提交任务
poolErr := pool.Submit(func() {
startTime := time.Now()
defer atomic.AddInt32(&status.inProgressCount, -1) // 任务完成后减少计数
if err := sendFunc(data.Id, data.Messages); err != nil {
log.Printf("%sHandler%d: 发送数据失败: %v. 耗时:%v", handlerType, idx, err, time.Since(startTime))
// 将失败数据发送到 Kafka(使用 Goroutine 池)
_ = pool.Submit(func() {
mm.errMessagesKafkaProducer.SendStringArrayMessage(fmt.Sprintf("DLP_%s", handlerType), data.Id, data.Messages)
})
} else {
log.Printf("[%s-Publishing]协程,Handler%d 成功发送[%d]条数据。耗时:%v,dataChannel[%p] 当前长度: %d/%d",
handlerType, idx, len(data.Messages), time.Since(startTime), dataChannel, len(dataChannel), cap(dataChannel))
}
})
if poolErr != nil {
log.Printf("%sHandler%d: 提交任务到 Goroutine 池失败: %v", handlerType, idx, poolErr)
atomic.AddInt32(&status.inProgressCount, -1) // 提交失败时减少计数
}
default:
// 如果 dataChannel 为空,则等待一段时间
time.Sleep(10 * time.Millisecond)
}
}
}(index)
index++
return true
})
wg.Wait()
defer pool.Release() // 确保在函数结束时释放池
}
func (mm *ETMaster) sendRawData(thingId string, data []string) error {
dataLog := fmt.Sprintf("thingId[%s]共[%d]条数据。", thingId, len(data))
//log.Printf("[RawData-Publishing][sendRawData]1.开始处理。%s", dataLog)
var nodeConn *node_manager.NodeConnection
var err error
retry := 0
// 尝试获取 NodeConnection
for retry < 3 {
startTime := time.Now()
nodeConn, err = mm.nodeManager.GetNodeConnection()
duration := time.Since(startTime) // 计算获取连接的耗时
log.Printf("[sendRawData]1.获取 NodeConnection 耗时: %v", duration)
if err != nil {
log.Printf("[sendRawData]1.获取 NodeConnection 失败,错误: %v", err)
//m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息
//log.Println("============ Kafka 消费已暂停...")
retry++
time.Sleep(time.Duration(2<<retry) * time.Second) // 指数退避
continue
}
// TODO 成功获取连接,恢复 Kafka 消费并退出循环
//m.kafkaDS.ResumeConsumers()
//log.Printf("[sendAggData] 成功获取 NodeConnection: %+v", nodeConn)
break
}
if err != nil || nodeConn == nil {
log.Printf("[sendRawData]1. 达到最大重试次数,无法获取健康节点连接,错误: %v", err)
return err
}
// 记录调用 Node.ProcessData 的时间
//defer LogProcessDataTimeCost(nodeConn.NArgs.Addr, "[]aggData", time.Now())
// RPC 调用 Node.ProcessData,传递 []*pb.AggData
resultChan := make(chan error, 1)
log.Printf("[sendRawData]2.开始调用 RPC[Node.HandleRawData] %s", dataLog)
callStartTime := time.Now()
callErr := nodeConn.CallHandleIotaData(thingId, data)
log.Printf("<--[sendRawData]3.RPC调用成功。耗时: %v,%s", time.Since(callStartTime), dataLog)
resultChan <- callErr
// 设置超时
select {
case callErr := <-resultChan:
if callErr != nil {
log.Printf("[sendRawData]4.RPC调用结束,错误: %+v,%s", callErr, dataLog)
return callErr
}
//log.Printf("[sendRawData]4.RPC调用成功")
case <-time.After(5 * time.Minute): // 设置超时
log.Printf("[sendRawData]4.请求超过5分钟。%s", dataLog)
return errors.New("请求超时5m")
}
return nil
}
func (mm *ETMaster) sendAggData(structId string, data []string) error {
dataLog := fmt.Sprintf("structId[%s]共[%d]条数据。", structId, len(data))
//log.Printf("[AggData-Publishing][sendAggData]1.开始处理。%s", dataLog)
var nodeConn *node_manager.NodeConnection
var err error
retry := 0
for retry < 3 {
startTime := time.Now()
nodeConn, err = mm.nodeManager.GetNodeConnection()
duration := time.Since(startTime) // 计算获取连接的耗时
log.Printf("[AggData-Publishing][sendAggData]2.获取 NodeConnection 耗时: %v", duration)
if err != nil {
log.Printf("[AggData-Publishing][sendAggData]2.1获取 NodeConnection 失败,错误: %v", err)
//m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息
//log.Println("============ Kafka 消费已暂停...")
retry++
time.Sleep(time.Duration(2<<retry) * time.Second) // 指数退避
continue
}
// TODO 成功获取连接,恢复 Kafka 消费并退出循环
//m.kafkaDS.ResumeConsumers()
//log.Printf("[sendAggData] 成功获取 NodeConnection: %+v", nodeConn)
break
}
if err != nil || nodeConn == nil {
log.Printf("[AggData-Publishing][sendAggData]2.2 达到最大重试次数,无法获取健康节点连接,错误: %v", err)
return err
}
// 记录调用 Node.ProcessData 的时间
//defer LogProcessDataTimeCost(nodeConn.NArgs.Addr, "[]aggData", time.Now())
// RPC 调用 Node.ProcessData,传递 []*pb.AggData
resultChan := make(chan error, 1)
log.Printf("[AggData-Publishing][sendAggData]3.开始调用 RPC[Node.HandleAggData] %s", dataLog)
callStartTime := time.Now()
callErr := nodeConn.CallHandleAggData(structId, data)
log.Printf("[AggData-Publishing][sendAggData]4.RPC调用耗时: %v,%s", time.Since(callStartTime), dataLog)
resultChan <- callErr
select {
case callErr := <-resultChan:
if callErr != nil {
log.Printf("[AggData-Publishing][sendAggData]4.RPC调用结束,错误: %+v,%s", callErr, dataLog)
return callErr
}
//log.Printf("[sendAggData]4.RPC调用成功")
case <-time.After(5 * time.Minute): // 设置超时
log.Printf("[AggData-Publishing][sendAggData]请求超过5分钟。%s", dataLog)
return errors.New("请求超时5m")
}
return nil
}
// monitorSendStatus 监控发送状态
func (mm *ETMaster) monitorSendStatus(status *SendStatus, statusName string) {
for {
inProgressCount := atomic.LoadInt32(&status.inProgressCount)
if inProgressCount > status.limitThreshold {
atomic.StoreInt32(&status.receiving, 0)
log.Printf("[%s] 未完成消息数量超过阈值,暂停接收新的消息。%+v\n", statusName, status)
} else {
atomic.StoreInt32(&status.receiving, 1)
}
time.Sleep(500 * time.Millisecond)
}
}
// MonitorShutdown 监控退出信号
func (mm *ETMaster) MonitorShutdown() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Printf("************ 接收到信号: %s,正在关闭服务器...", sig)
mm.closeDataHandlers(&mm.rawDataHandlers, "DLP_DATA_RAW")
mm.closeDataHandlers(&mm.aggDataHandlers, "DLP_DATA_AGG")
mm.errMessagesKafkaProducer.Close()
mm.grpcServer.GracefulStop()
log.Println("************ 服务器已成功关闭")
}
// closeDataHandlers 关闭数据处理器
func (mm *ETMaster) closeDataHandlers(handlers *sync.Map, dlpTopic string) {
handlers.Range(func(key, value any) bool {
handler := value.(data_source.IMessageHandler)
ch := handler.GetDataChannel()
close(ch)
for data := range ch {
mm.errMessagesKafkaProducer.SendStringArrayMessage(dlpTopic, data.Id, data.Messages)
}
return true
})
}