You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
398 lines
13 KiB
398 lines
13 KiB
package app
|
|
|
|
import (
|
|
"errors"
|
|
"et_rpc/pb"
|
|
"fmt"
|
|
"gitea.anxinyun.cn/container/common_utils/configLoad"
|
|
"github.com/panjf2000/ants/v2"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/health"
|
|
"google.golang.org/grpc/health/grpc_health_v1"
|
|
"log"
|
|
"master/data_source"
|
|
"master/node_manager"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
type SendStatus struct {
|
|
inProgressCount int32 // 正在处理的消息计数
|
|
limitThreshold int32 // 限流阈值
|
|
receiving int32 // 是否接收消息(1表示接收,0表示暂停)
|
|
}
|
|
|
|
// ETMaster 管理 Master 的核心逻辑
|
|
type ETMaster struct {
|
|
nodeManager *node_manager.NodeManager
|
|
grpcServer *grpc.Server
|
|
masterRPCService *MasterRPCService
|
|
|
|
dataSource *data_source.KafkaDataSource
|
|
aggDataHandlers sync.Map // 聚合数据处理者
|
|
rawDataHandlers sync.Map // 原始数据处理者
|
|
aggSendStatus SendStatus // 聚合数据发送状态
|
|
rawSendStatus SendStatus // 原始数据发送状态
|
|
|
|
errRawChan chan []string
|
|
errMessagesKafkaProducer *data_source.KafkaProducer // Kafka 生产者,用于发送失败的消息
|
|
}
|
|
|
|
// 创建 ETMaster 实例
|
|
func NewETMaster() *ETMaster {
|
|
lb := node_manager.NewLoadBalancer(&node_manager.RoundRobinSelector{})
|
|
nodeManager := node_manager.NewNodeManager(lb)
|
|
|
|
grpcServer := grpc.NewServer()
|
|
masterRPCService := NewMasterRPCService(nodeManager)
|
|
pb.RegisterMasterServiceServer(grpcServer, masterRPCService)
|
|
|
|
healthServer := health.NewServer()
|
|
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
|
|
healthServer.SetServingStatus("MasterService", grpc_health_v1.HealthCheckResponse_SERVING)
|
|
|
|
return &ETMaster{
|
|
nodeManager: nodeManager,
|
|
grpcServer: grpcServer,
|
|
masterRPCService: masterRPCService,
|
|
}
|
|
}
|
|
|
|
func (mm *ETMaster) StartRPCServer() {
|
|
port := configLoad.LoadConfig().GetUint16("master.port")
|
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
|
if err != nil {
|
|
log.Panicf("启动 Master RPC 服务失败: %v", err)
|
|
}
|
|
defer func() {
|
|
if err := listener.Close(); err != nil {
|
|
log.Printf("关闭监听器失败: %v", err)
|
|
}
|
|
}()
|
|
log.Printf("启动 Master RPC 服务成功,服务端口:%d", port)
|
|
|
|
// 启动 gRPC 服务器
|
|
if err := mm.grpcServer.Serve(listener); err != nil {
|
|
log.Panicf("gRPC 服务器服务失败: %v", err)
|
|
}
|
|
}
|
|
|
|
// 初始化 Kafka 数据源
|
|
func (mm *ETMaster) InitKafkaDataSource() {
|
|
ds := data_source.NewKafkaDataSource() // 加载 kafka 相关的配置
|
|
|
|
// 创建 kafka 生产者实例
|
|
producer, err := data_source.NewKafkaProducer(ds.Brokers)
|
|
if err != nil {
|
|
log.Fatalf("创建 Kafka 生产者失败: %v", err)
|
|
}
|
|
mm.errMessagesKafkaProducer = producer
|
|
|
|
// 设置 rawData 的处理者,每个分区一个处理者
|
|
if ds.Master_kafkaConsumer_config.RawData != nil {
|
|
topicCfg := ds.Topics["data_raw"]
|
|
for partId := 0; partId < topicCfg.Partitions; partId++ {
|
|
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
|
|
dataHandler := data_source.NewRawDataHandler(key, topicCfg.Topic, partId)
|
|
mm.rawDataHandlers.Store(key, dataHandler)
|
|
}
|
|
|
|
// 发送失败的消息存入 DLP_DATA_RAW 主题)
|
|
dlpKey := "DLP_DATA_RAW"
|
|
mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0))
|
|
}
|
|
|
|
// 设置 aggData 的处理者,每个分区一个处理者
|
|
if ds.Master_kafkaConsumer_config.AggData != nil {
|
|
topicCfg := ds.Topics["data_agg"]
|
|
for partId := 0; partId < topicCfg.Partitions; partId++ {
|
|
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId)
|
|
dataHandler := data_source.NewAggDataHandler(key, topicCfg.Topic, partId)
|
|
mm.aggDataHandlers.Store(key, dataHandler)
|
|
}
|
|
|
|
// 发送失败的消息存入 DLP_DATA_AGG 主题
|
|
dlpKey := "DLP_DATA_AGG"
|
|
mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0))
|
|
}
|
|
|
|
ds.RawDataHandlers = &mm.rawDataHandlers
|
|
ds.AggDataHandlers = &mm.aggDataHandlers
|
|
mm.dataSource = ds
|
|
}
|
|
|
|
// 等待节点注册
|
|
func (mm *ETMaster) WaitNodeRegister() {
|
|
log.Println("==== 等待 Node 注册 ====")
|
|
for mm.masterRPCService.nodeManager.NodesCount() == 0 {
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
}
|
|
|
|
// AggDataPublishing 发布聚合数据
|
|
func (mm *ETMaster) AggDataPublishing() {
|
|
concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50
|
|
mm.initSendStatus(&mm.aggSendStatus, concurrency)
|
|
go mm.monitorSendStatus(&mm.aggSendStatus, "aggSendStatus")
|
|
mm.startDataPublishing(&mm.aggDataHandlers, "AggData", mm.sendAggData, &mm.aggSendStatus)
|
|
}
|
|
|
|
// RawDataPublishing 发布原始数据
|
|
func (mm *ETMaster) RawDataPublishing() {
|
|
concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50
|
|
mm.initSendStatus(&mm.rawSendStatus, concurrency)
|
|
go mm.monitorSendStatus(&mm.rawSendStatus, "rawSendStatus")
|
|
mm.startDataPublishing(&mm.rawDataHandlers, "RawData", mm.sendRawData, &mm.rawSendStatus)
|
|
}
|
|
|
|
// initSendStatus 初始化发送状态
|
|
func (mm *ETMaster) initSendStatus(status *SendStatus, threshold int32) {
|
|
status.limitThreshold = threshold
|
|
atomic.StoreInt32(&status.receiving, 1)
|
|
}
|
|
|
|
// startDataPublishing 启动数据发布
|
|
func (mm *ETMaster) startDataPublishing(handlers *sync.Map, handlerType string, sendFunc func(string, []string) error, status *SendStatus) {
|
|
// 创建一个 Goroutine 池,最大并发数为 500
|
|
pool, err := ants.NewPool(500)
|
|
if err != nil {
|
|
log.Fatalf("创建 Goroutine 池失败: %v", err)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
index := 0
|
|
handlers.Range(func(key, value any) bool {
|
|
handler := value.(data_source.IMessageHandler)
|
|
dataChannel := handler.GetDataChannel()
|
|
log.Printf("启动[%s-Publishing]协程,Handler%d,dataChannel[%p] 容量:%d", handlerType, index, dataChannel, cap(dataChannel))
|
|
|
|
wg.Add(1)
|
|
go func(idx int) {
|
|
defer wg.Done()
|
|
|
|
for {
|
|
// 检查是否暂停接收
|
|
if atomic.LoadInt32(&status.receiving) == 0 {
|
|
log.Printf("%sHandler%d: 接收已暂停,等待未完成的消息处理", handlerType, idx)
|
|
time.Sleep(100 * time.Millisecond)
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case d, ok := <-dataChannel: // 检查 dataChannel 是否已关闭
|
|
if !ok {
|
|
log.Printf("%sHandler%d: dataChannel 已关闭,退出 Goroutine", handlerType, idx)
|
|
return // 退出 Goroutine
|
|
}
|
|
|
|
data := d
|
|
atomic.AddInt32(&status.inProgressCount, 1)
|
|
log.Printf("[%s-Publishing] inProgressCount=%d. Handler%d 预备发送[%d]条数据,dataChannel[%p] 当前长度: %d/%d",
|
|
handlerType, atomic.LoadInt32(&status.inProgressCount), idx, len(data.Messages), dataChannel, len(dataChannel), cap(dataChannel))
|
|
|
|
// 使用 ants 提交任务
|
|
poolErr := pool.Submit(func() {
|
|
startTime := time.Now()
|
|
defer atomic.AddInt32(&status.inProgressCount, -1) // 任务完成后减少计数
|
|
|
|
if err := sendFunc(data.Id, data.Messages); err != nil {
|
|
log.Printf("%sHandler%d: 发送数据失败: %v. 耗时:%v", handlerType, idx, err, time.Since(startTime))
|
|
// 将失败数据发送到 Kafka(使用 Goroutine 池)
|
|
_ = pool.Submit(func() {
|
|
mm.errMessagesKafkaProducer.SendStringArrayMessage(fmt.Sprintf("DLP_%s", handlerType), data.Id, data.Messages)
|
|
})
|
|
} else {
|
|
log.Printf("[%s-Publishing]协程,Handler%d 成功发送[%d]条数据。耗时:%v,dataChannel[%p] 当前长度: %d/%d",
|
|
handlerType, idx, len(data.Messages), time.Since(startTime), dataChannel, len(dataChannel), cap(dataChannel))
|
|
}
|
|
})
|
|
|
|
if poolErr != nil {
|
|
log.Printf("%sHandler%d: 提交任务到 Goroutine 池失败: %v", handlerType, idx, poolErr)
|
|
atomic.AddInt32(&status.inProgressCount, -1) // 提交失败时减少计数
|
|
}
|
|
|
|
default:
|
|
// 如果 dataChannel 为空,则等待一段时间
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
}
|
|
}(index)
|
|
|
|
index++
|
|
return true
|
|
})
|
|
|
|
wg.Wait()
|
|
defer pool.Release() // 确保在函数结束时释放池
|
|
}
|
|
|
|
func (mm *ETMaster) sendRawData(thingId string, data []string) error {
|
|
dataLog := fmt.Sprintf("thingId[%s]共[%d]条数据。", thingId, len(data))
|
|
//log.Printf("[RawData-Publishing][sendRawData]1.开始处理。%s", dataLog)
|
|
|
|
var nodeConn *node_manager.NodeConnection
|
|
var err error
|
|
retry := 0
|
|
|
|
// 尝试获取 NodeConnection
|
|
for retry < 3 {
|
|
startTime := time.Now()
|
|
nodeConn, err = mm.nodeManager.GetNodeConnection()
|
|
duration := time.Since(startTime) // 计算获取连接的耗时
|
|
log.Printf("[sendRawData]1.获取 NodeConnection 耗时: %v", duration)
|
|
|
|
if err != nil {
|
|
log.Printf("[sendRawData]1.获取 NodeConnection 失败,错误: %v", err)
|
|
//m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息
|
|
//log.Println("============ Kafka 消费已暂停...")
|
|
retry++
|
|
time.Sleep(time.Duration(2<<retry) * time.Second) // 指数退避
|
|
continue
|
|
}
|
|
|
|
// TODO 成功获取连接,恢复 Kafka 消费并退出循环
|
|
//m.kafkaDS.ResumeConsumers()
|
|
//log.Printf("[sendAggData] 成功获取 NodeConnection: %+v", nodeConn)
|
|
break
|
|
}
|
|
|
|
if err != nil || nodeConn == nil {
|
|
log.Printf("[sendRawData]1. 达到最大重试次数,无法获取健康节点连接,错误: %v", err)
|
|
return err
|
|
}
|
|
|
|
// 记录调用 Node.ProcessData 的时间
|
|
//defer LogProcessDataTimeCost(nodeConn.NArgs.Addr, "[]aggData", time.Now())
|
|
// RPC 调用 Node.ProcessData,传递 []*pb.AggData
|
|
resultChan := make(chan error, 1)
|
|
log.Printf("[sendRawData]2.开始调用 RPC[Node.HandleRawData] %s", dataLog)
|
|
callStartTime := time.Now()
|
|
callErr := nodeConn.CallHandleIotaData(thingId, data)
|
|
log.Printf("<--[sendRawData]3.RPC调用成功。耗时: %v,%s", time.Since(callStartTime), dataLog)
|
|
resultChan <- callErr
|
|
|
|
// 设置超时
|
|
select {
|
|
case callErr := <-resultChan:
|
|
if callErr != nil {
|
|
log.Printf("[sendRawData]4.RPC调用结束,错误: %+v,%s", callErr, dataLog)
|
|
return callErr
|
|
}
|
|
//log.Printf("[sendRawData]4.RPC调用成功")
|
|
case <-time.After(5 * time.Minute): // 设置超时
|
|
log.Printf("[sendRawData]4.请求超过5分钟。%s", dataLog)
|
|
return errors.New("请求超时5m")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (mm *ETMaster) sendAggData(structId string, data []string) error {
|
|
dataLog := fmt.Sprintf("structId[%s]共[%d]条数据。", structId, len(data))
|
|
//log.Printf("[AggData-Publishing][sendAggData]1.开始处理。%s", dataLog)
|
|
|
|
var nodeConn *node_manager.NodeConnection
|
|
var err error
|
|
retry := 0
|
|
|
|
for retry < 3 {
|
|
startTime := time.Now()
|
|
nodeConn, err = mm.nodeManager.GetNodeConnection()
|
|
duration := time.Since(startTime) // 计算获取连接的耗时
|
|
log.Printf("[AggData-Publishing][sendAggData]2.获取 NodeConnection 耗时: %v", duration)
|
|
|
|
if err != nil {
|
|
log.Printf("[AggData-Publishing][sendAggData]2.1获取 NodeConnection 失败,错误: %v", err)
|
|
//m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息
|
|
//log.Println("============ Kafka 消费已暂停...")
|
|
retry++
|
|
time.Sleep(time.Duration(2<<retry) * time.Second) // 指数退避
|
|
continue
|
|
}
|
|
|
|
// TODO 成功获取连接,恢复 Kafka 消费并退出循环
|
|
//m.kafkaDS.ResumeConsumers()
|
|
//log.Printf("[sendAggData] 成功获取 NodeConnection: %+v", nodeConn)
|
|
|
|
break
|
|
}
|
|
|
|
if err != nil || nodeConn == nil {
|
|
log.Printf("[AggData-Publishing][sendAggData]2.2 达到最大重试次数,无法获取健康节点连接,错误: %v", err)
|
|
return err
|
|
}
|
|
|
|
// 记录调用 Node.ProcessData 的时间
|
|
//defer LogProcessDataTimeCost(nodeConn.NArgs.Addr, "[]aggData", time.Now())
|
|
// RPC 调用 Node.ProcessData,传递 []*pb.AggData
|
|
resultChan := make(chan error, 1)
|
|
log.Printf("[AggData-Publishing][sendAggData]3.开始调用 RPC[Node.HandleAggData] %s", dataLog)
|
|
callStartTime := time.Now()
|
|
callErr := nodeConn.CallHandleAggData(structId, data)
|
|
log.Printf("[AggData-Publishing][sendAggData]4.RPC调用耗时: %v,%s", time.Since(callStartTime), dataLog)
|
|
resultChan <- callErr
|
|
|
|
select {
|
|
case callErr := <-resultChan:
|
|
if callErr != nil {
|
|
log.Printf("[AggData-Publishing][sendAggData]4.RPC调用结束,错误: %+v,%s", callErr, dataLog)
|
|
return callErr
|
|
}
|
|
//log.Printf("[sendAggData]4.RPC调用成功")
|
|
case <-time.After(5 * time.Minute): // 设置超时
|
|
log.Printf("[AggData-Publishing][sendAggData]请求超过5分钟。%s", dataLog)
|
|
return errors.New("请求超时5m")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// monitorSendStatus 监控发送状态
|
|
func (mm *ETMaster) monitorSendStatus(status *SendStatus, statusName string) {
|
|
for {
|
|
inProgressCount := atomic.LoadInt32(&status.inProgressCount)
|
|
if inProgressCount > status.limitThreshold {
|
|
atomic.StoreInt32(&status.receiving, 0)
|
|
log.Printf("[%s] 未完成消息数量超过阈值,暂停接收新的消息。%+v\n", statusName, status)
|
|
} else {
|
|
atomic.StoreInt32(&status.receiving, 1)
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// MonitorShutdown 监控退出信号
|
|
func (mm *ETMaster) MonitorShutdown() {
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
sig := <-sigChan
|
|
log.Printf("************ 接收到信号: %s,正在关闭服务器...", sig)
|
|
|
|
mm.closeDataHandlers(&mm.rawDataHandlers, "DLP_DATA_RAW")
|
|
mm.closeDataHandlers(&mm.aggDataHandlers, "DLP_DATA_AGG")
|
|
|
|
mm.errMessagesKafkaProducer.Close()
|
|
mm.grpcServer.GracefulStop()
|
|
log.Println("************ 服务器已成功关闭")
|
|
}
|
|
|
|
// closeDataHandlers 关闭数据处理器
|
|
func (mm *ETMaster) closeDataHandlers(handlers *sync.Map, dlpTopic string) {
|
|
handlers.Range(func(key, value any) bool {
|
|
handler := value.(data_source.IMessageHandler)
|
|
ch := handler.GetDataChannel()
|
|
close(ch)
|
|
|
|
for data := range ch {
|
|
mm.errMessagesKafkaProducer.SendStringArrayMessage(dlpTopic, data.Id, data.Messages)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|