3 changed files with 469 additions and 328 deletions
@ -1,403 +1,398 @@ |
|||
package app |
|||
|
|||
import ( |
|||
"dataSource" |
|||
"encoding/gob" |
|||
"errors" |
|||
"et_prometheus_exporter" |
|||
"et_rpc/pb" |
|||
"fmt" |
|||
"gitea.anxinyun.cn/container/common_models" |
|||
"gitea.anxinyun.cn/container/common_utils/configLoad" |
|||
"github.com/panjf2000/ants/v2" |
|||
"google.golang.org/grpc" |
|||
"google.golang.org/grpc/health" |
|||
"google.golang.org/grpc/health/grpc_health_v1" |
|||
"log" |
|||
"math" |
|||
"master/data_source" |
|||
"master/node_manager" |
|||
"net" |
|||
"net/rpc" |
|||
"strings" |
|||
"os" |
|||
"os/signal" |
|||
"sync" |
|||
"sync/atomic" |
|||
"syscall" |
|||
"time" |
|||
) |
|||
|
|||
type EtMaster struct { |
|||
nodeMap sync.Map |
|||
exporter et_prometheus_exporter.PrometheusExporter |
|||
sleepCH chan bool |
|||
type SendStatus struct { |
|||
inProgressCount int32 // 正在处理的消息计数
|
|||
limitThreshold int32 // 限流阈值
|
|||
receiving int32 // 是否接收消息(1表示接收,0表示暂停)
|
|||
} |
|||
|
|||
func NewEtMaster() *EtMaster { |
|||
master := EtMaster{ |
|||
exporter: et_prometheus_exporter.NewPrometheusExporter(), |
|||
sleepCH: make(chan bool, 1), |
|||
} |
|||
return &master |
|||
} |
|||
// ETMaster 管理 Master 的核心逻辑
|
|||
type ETMaster struct { |
|||
nodeManager *node_manager.NodeManager |
|||
grpcServer *grpc.Server |
|||
masterRPCService *MasterRPCService |
|||
|
|||
type NodeRpc struct { |
|||
args *common_models.NodeArgs // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输入参数
|
|||
resultCH chan int // 注册节点参数:RPC服务名为master, 服务方法 NodeRegister 的输出结果
|
|||
aggResultCH chan int // 聚集数据被处理后的返回结果 对应 Reply 参数
|
|||
client *rpc.Client |
|||
dataSource *data_source.KafkaDataSource |
|||
aggDataHandlers sync.Map // 聚合数据处理者
|
|||
rawDataHandlers sync.Map // 原始数据处理者
|
|||
aggSendStatus SendStatus // 聚合数据发送状态
|
|||
rawSendStatus SendStatus // 原始数据发送状态
|
|||
|
|||
errRawChan chan []string |
|||
errMessagesKafkaProducer *data_source.KafkaProducer // Kafka 生产者,用于发送失败的消息
|
|||
} |
|||
|
|||
// RegisterListen 启动 master RPC服务
|
|||
func (the *EtMaster) RegisterListen() { |
|||
//监听
|
|||
err := rpc.RegisterName("master", the) |
|||
if err != nil { |
|||
log.Println("master 提供注册服务异常") |
|||
return |
|||
// 创建 ETMaster 实例
|
|||
func NewETMaster() *ETMaster { |
|||
lb := node_manager.NewLoadBalancer(&node_manager.RoundRobinSelector{}) |
|||
nodeManager := node_manager.NewNodeManager(lb) |
|||
|
|||
grpcServer := grpc.NewServer() |
|||
masterRPCService := NewMasterRPCService(nodeManager) |
|||
pb.RegisterMasterServiceServer(grpcServer, masterRPCService) |
|||
|
|||
healthServer := health.NewServer() |
|||
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) |
|||
healthServer.SetServingStatus("MasterService", grpc_health_v1.HealthCheckResponse_SERVING) |
|||
|
|||
return &ETMaster{ |
|||
nodeManager: nodeManager, |
|||
grpcServer: grpcServer, |
|||
masterRPCService: masterRPCService, |
|||
} |
|||
} |
|||
|
|||
func (mm *ETMaster) StartRPCServer() { |
|||
port := configLoad.LoadConfig().GetUint16("master.port") |
|||
|
|||
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) |
|||
if err != nil { |
|||
log.Panic("master 启动 node服务注册功能异常") |
|||
log.Panicf("启动 Master RPC 服务失败: %v", err) |
|||
} |
|||
log.Printf("master 启动 node服务注册功能 :%d", port) |
|||
for { |
|||
//log.Println("master 监听新注册链接")
|
|||
conn, err := listener.Accept() |
|||
if err != nil { |
|||
log.Println("master rpc Accept异常") |
|||
defer func() { |
|||
if err := listener.Close(); err != nil { |
|||
log.Printf("关闭监听器失败: %v", err) |
|||
} |
|||
log.Printf("master Accept注册链接 from node[%s]", conn.RemoteAddr()) |
|||
go rpc.ServeConn(conn) |
|||
}() |
|||
log.Printf("启动 Master RPC 服务成功,服务端口:%d", port) |
|||
|
|||
// 启动 gRPC 服务器
|
|||
if err := mm.grpcServer.Serve(listener); err != nil { |
|||
log.Panicf("gRPC 服务器服务失败: %v", err) |
|||
} |
|||
} |
|||
|
|||
// DistributeData 分发数据。
|
|||
// 监听两个数据通道RawDataChan和AggDataChan,根据不同类型的数据通道接收到的数据,调用notifyData方法进行相应的处理操作。
|
|||
func (the *EtMaster) DistributeData(dataChannels *dataSource.DataChannels) { |
|||
//数据类型注册
|
|||
gob.Register([]interface{}{}) |
|||
for { |
|||
log.Println("L74 nodeCount: %d", the.nodeMapCount()) |
|||
if the.nodeMapCount() == 0 { |
|||
log.Printf("nodeList is empty!") |
|||
time.Sleep(time.Second * 10) |
|||
continue |
|||
} |
|||
// 初始化 Kafka 数据源
|
|||
func (mm *ETMaster) InitKafkaDataSource() { |
|||
ds := data_source.NewKafkaDataSource() // 加载 kafka 相关的配置
|
|||
|
|||
select { |
|||
case stopEnable := <-the.sleepCH: |
|||
log.Println("L83 nodeCount: %d", the.nodeMapCount()) |
|||
if stopEnable { |
|||
stopTime := time.Second * 10 |
|||
log.Printf("node 处理积压,%v,master 暂停 %v", stopEnable, stopTime) |
|||
time.Sleep(stopTime) |
|||
} else { |
|||
log.Printf("node 处理积压,%v,不正常空数据", stopEnable) |
|||
} |
|||
default: |
|||
// 创建 kafka 生产者实例
|
|||
producer, err := data_source.NewKafkaProducer(ds.Brokers) |
|||
if err != nil { |
|||
log.Fatalf("创建 Kafka 生产者失败: %v", err) |
|||
} |
|||
mm.errMessagesKafkaProducer = producer |
|||
|
|||
// 设置 rawData 的处理者,每个分区一个处理者
|
|||
if ds.Master_kafkaConsumer_config.RawData != nil { |
|||
topicCfg := ds.Topics["data_raw"] |
|||
for partId := 0; partId < topicCfg.Partitions; partId++ { |
|||
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) |
|||
dataHandler := data_source.NewRawDataHandler(key, topicCfg.Topic, partId) |
|||
mm.rawDataHandlers.Store(key, dataHandler) |
|||
} |
|||
|
|||
select { |
|||
case data := <-dataChannels.RawDataChan: |
|||
log.Println("L96 nodeCount: %d", the.nodeMapCount()) |
|||
the.notifyData(&data, the.callNodeService) |
|||
case data := <-dataChannels.AggDataChan: |
|||
log.Println("L99 nodeCount: %d", the.nodeMapCount()) |
|||
the.notifyData(&data, the.callNodeService) |
|||
//default:
|
|||
// time.Sleep(100 * time.Millisecond)
|
|||
// 发送失败的消息存入 DLP_DATA_RAW 主题)
|
|||
dlpKey := "DLP_DATA_RAW" |
|||
mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0)) |
|||
} |
|||
|
|||
// 设置 aggData 的处理者,每个分区一个处理者
|
|||
if ds.Master_kafkaConsumer_config.AggData != nil { |
|||
topicCfg := ds.Topics["data_agg"] |
|||
for partId := 0; partId < topicCfg.Partitions; partId++ { |
|||
key := fmt.Sprintf("%s_%d", topicCfg.Topic, partId) |
|||
dataHandler := data_source.NewAggDataHandler(key, topicCfg.Topic, partId) |
|||
mm.aggDataHandlers.Store(key, dataHandler) |
|||
} |
|||
|
|||
// 发送失败的消息存入 DLP_DATA_AGG 主题
|
|||
dlpKey := "DLP_DATA_AGG" |
|||
mm.rawDataHandlers.Store(dlpKey, data_source.NewRawDataHandler(dlpKey, dlpKey, 0)) |
|||
} |
|||
|
|||
ds.RawDataHandlers = &mm.rawDataHandlers |
|||
ds.AggDataHandlers = &mm.aggDataHandlers |
|||
mm.dataSource = ds |
|||
} |
|||
|
|||
// 等待节点注册
|
|||
func (mm *ETMaster) WaitNodeRegister() { |
|||
log.Println("==== 等待 Node 注册 ====") |
|||
for mm.masterRPCService.nodeManager.NodesCount() == 0 { |
|||
time.Sleep(time.Second * 10) |
|||
} |
|||
} |
|||
func (the *EtMaster) notifyData(data common_models.IDataTrace, callNodeFunc func(*NodeRpc, common_models.IDataTrace)) { |
|||
thingId := data.GetThingId() |
|||
isMatch := false |
|||
the.nodeMap.Range(func(address, value interface{}) bool { |
|||
if nodePtr, ok := value.(*NodeRpc); ok { |
|||
if nodePtr != nil { |
|||
if contains(nodePtr.args.ThingIds, thingId) { |
|||
isMatch = true |
|||
go callNodeFunc(nodePtr, data) |
|||
return false |
|||
|
|||
// AggDataPublishing 发布聚合数据
|
|||
func (mm *ETMaster) AggDataPublishing() { |
|||
concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50
|
|||
mm.initSendStatus(&mm.aggSendStatus, concurrency) |
|||
go mm.monitorSendStatus(&mm.aggSendStatus, "aggSendStatus") |
|||
mm.startDataPublishing(&mm.aggDataHandlers, "AggData", mm.sendAggData, &mm.aggSendStatus) |
|||
} |
|||
|
|||
// RawDataPublishing 发布原始数据
|
|||
func (mm *ETMaster) RawDataPublishing() { |
|||
concurrency := configLoad.LoadConfig().GetInt32("performance.master.rpc.concurrency") // 并发请求数 50
|
|||
mm.initSendStatus(&mm.rawSendStatus, concurrency) |
|||
go mm.monitorSendStatus(&mm.rawSendStatus, "rawSendStatus") |
|||
mm.startDataPublishing(&mm.rawDataHandlers, "RawData", mm.sendRawData, &mm.rawSendStatus) |
|||
} |
|||
|
|||
// initSendStatus 初始化发送状态
|
|||
func (mm *ETMaster) initSendStatus(status *SendStatus, threshold int32) { |
|||
status.limitThreshold = threshold |
|||
atomic.StoreInt32(&status.receiving, 1) |
|||
} |
|||
|
|||
// startDataPublishing 启动数据发布
|
|||
func (mm *ETMaster) startDataPublishing(handlers *sync.Map, handlerType string, sendFunc func(string, []string) error, status *SendStatus) { |
|||
// 创建一个 Goroutine 池,最大并发数为 500
|
|||
pool, err := ants.NewPool(500) |
|||
if err != nil { |
|||
log.Fatalf("创建 Goroutine 池失败: %v", err) |
|||
} |
|||
|
|||
var wg sync.WaitGroup |
|||
index := 0 |
|||
handlers.Range(func(key, value any) bool { |
|||
handler := value.(data_source.IMessageHandler) |
|||
dataChannel := handler.GetDataChannel() |
|||
log.Printf("启动[%s-Publishing]协程,Handler%d,dataChannel[%p] 容量:%d", handlerType, index, dataChannel, cap(dataChannel)) |
|||
|
|||
wg.Add(1) |
|||
go func(idx int) { |
|||
defer wg.Done() |
|||
|
|||
for { |
|||
// 检查是否暂停接收
|
|||
if atomic.LoadInt32(&status.receiving) == 0 { |
|||
log.Printf("%sHandler%d: 接收已暂停,等待未完成的消息处理", handlerType, idx) |
|||
time.Sleep(100 * time.Millisecond) |
|||
continue |
|||
} |
|||
|
|||
select { |
|||
case d, ok := <-dataChannel: // 检查 dataChannel 是否已关闭
|
|||
if !ok { |
|||
log.Printf("%sHandler%d: dataChannel 已关闭,退出 Goroutine", handlerType, idx) |
|||
return // 退出 Goroutine
|
|||
} |
|||
|
|||
data := d |
|||
atomic.AddInt32(&status.inProgressCount, 1) |
|||
log.Printf("[%s-Publishing] inProgressCount=%d. Handler%d 预备发送[%d]条数据,dataChannel[%p] 当前长度: %d/%d", |
|||
handlerType, atomic.LoadInt32(&status.inProgressCount), idx, len(data.Messages), dataChannel, len(dataChannel), cap(dataChannel)) |
|||
|
|||
// 使用 ants 提交任务
|
|||
poolErr := pool.Submit(func() { |
|||
startTime := time.Now() |
|||
defer atomic.AddInt32(&status.inProgressCount, -1) // 任务完成后减少计数
|
|||
|
|||
if err := sendFunc(data.Id, data.Messages); err != nil { |
|||
log.Printf("%sHandler%d: 发送数据失败: %v. 耗时:%v", handlerType, idx, err, time.Since(startTime)) |
|||
// 将失败数据发送到 Kafka(使用 Goroutine 池)
|
|||
_ = pool.Submit(func() { |
|||
mm.errMessagesKafkaProducer.SendStringArrayMessage(fmt.Sprintf("DLP_%s", handlerType), data.Id, data.Messages) |
|||
}) |
|||
} else { |
|||
log.Printf("[%s-Publishing]协程,Handler%d 成功发送[%d]条数据。耗时:%v,dataChannel[%p] 当前长度: %d/%d", |
|||
handlerType, idx, len(data.Messages), time.Since(startTime), dataChannel, len(dataChannel), cap(dataChannel)) |
|||
} |
|||
}) |
|||
|
|||
if poolErr != nil { |
|||
log.Printf("%sHandler%d: 提交任务到 Goroutine 池失败: %v", handlerType, idx, poolErr) |
|||
atomic.AddInt32(&status.inProgressCount, -1) // 提交失败时减少计数
|
|||
} |
|||
|
|||
default: |
|||
// 如果 dataChannel 为空,则等待一段时间
|
|||
time.Sleep(10 * time.Millisecond) |
|||
} |
|||
} |
|||
} |
|||
}(index) |
|||
|
|||
index++ |
|||
return true |
|||
}) |
|||
|
|||
//无匹配触发 reBalance
|
|||
if !isMatch { |
|||
nodePtr := the.getNodeWithMinThings() |
|||
if nodePtr != nil { |
|||
nodePtr.args.ThingIds = append(nodePtr.args.ThingIds, thingId) |
|||
log.Printf("thingId:[%s]被分配到node:[%s]", thingId, nodePtr.args.Addr) |
|||
go callNodeFunc(nodePtr, data) |
|||
} |
|||
} |
|||
wg.Wait() |
|||
defer pool.Release() // 确保在函数结束时释放池
|
|||
} |
|||
|
|||
// callNodeService 调用 etNode 的RPC服务
|
|||
func (the *EtMaster) callNodeService(node *NodeRpc, data common_models.IDataTrace) { |
|||
if node.client == nil { |
|||
log.Printf("node [%v] client=nil", node.args) |
|||
return |
|||
} |
|||
|
|||
var serviceMethod = "" |
|||
var resultCH chan int |
|||
var v interface{} |
|||
|
|||
switch data.(type) { |
|||
case *common_models.IotaData: |
|||
v = data.(*common_models.IotaData) |
|||
the.exporter.OnIotaData2metricByPrometheus(data.(*common_models.IotaData)) |
|||
serviceMethod = "etNode.IotaDataHandler" |
|||
resultCH = node.resultCH |
|||
case *common_models.AggData: |
|||
v = data.(*common_models.AggData) |
|||
serviceMethod = "etNode.AggDataHandler" |
|||
resultCH = node.aggResultCH |
|||
default: |
|||
log.Printf("Unknown kafka data type:%v", v) |
|||
return |
|||
} |
|||
|
|||
log.Printf("RPC[%s] node待处理的数据:%+v \n", serviceMethod, v) |
|||
func (mm *ETMaster) sendRawData(thingId string, data []string) error { |
|||
dataLog := fmt.Sprintf("thingId[%s]共[%d]条数据。", thingId, len(data)) |
|||
//log.Printf("[RawData-Publishing][sendRawData]1.开始处理。%s", dataLog)
|
|||
|
|||
go func() { |
|||
defer timeCost(node.args.ID, data.Q(), time.Now()) |
|||
var reply bool |
|||
err := node.client.Call(serviceMethod, data, &reply) |
|||
var nodeConn *node_manager.NodeConnection |
|||
var err error |
|||
retry := 0 |
|||
|
|||
result := boolToInt(reply) |
|||
// 尝试获取 NodeConnection
|
|||
for retry < 3 { |
|||
startTime := time.Now() |
|||
nodeConn, err = mm.nodeManager.GetNodeConnection() |
|||
duration := time.Since(startTime) // 计算获取连接的耗时
|
|||
log.Printf("[sendRawData]1.获取 NodeConnection 耗时: %v", duration) |
|||
|
|||
if err != nil { |
|||
isAggParseErr := strings.Contains(err.Error(), "aggData非法数据") |
|||
log.Printf("master调用node异常。Error:%s", err.Error()) |
|||
if !isAggParseErr { |
|||
// rpc 调用node, err:read tcp 10.8.30.104:57230->10.8.30.104:40000: wsarecv: An existing connection was forcibly closed by the remote host.
|
|||
result = 2 |
|||
} |
|||
log.Printf("[sendRawData]1.获取 NodeConnection 失败,错误: %v", err) |
|||
//m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息
|
|||
//log.Println("============ Kafka 消费已暂停...")
|
|||
retry++ |
|||
time.Sleep(time.Duration(2<<retry) * time.Second) // 指数退避
|
|||
continue |
|||
} |
|||
resultCH <- result |
|||
}() |
|||
|
|||
// RPC调用结果
|
|||
errorCode := 0 |
|||
timeoutMills := 300 * 1000 * time.Millisecond // 5分组
|
|||
select { |
|||
case reply := <-resultCH: |
|||
// reply 0=false(RPC访问结果返回false),1=true(RPC访问结果返回true),2访问RPC网络异常
|
|||
if reply == 2 { |
|||
log.Printf("RPC[%s]node连接已被关闭。未处理的数据*** %+v *** \n\n", serviceMethod, v) |
|||
errorCode = 200 |
|||
} else if reply == 0 { |
|||
//log.Printf("RPC[%s]node处理后回复false。处理失败的数据*** %+v *** \n\n", serviceMethod, v)
|
|||
errorCode = 100 |
|||
} |
|||
case <-time.After(timeoutMills): |
|||
log.Printf("RPC[%s]node调用超时退出gorutine,timeout:%v。未处理的数据*** %+v *** \n\n", serviceMethod, timeoutMills, v) |
|||
errorCode = 300 |
|||
// TODO 成功获取连接,恢复 Kafka 消费并退出循环
|
|||
//m.kafkaDS.ResumeConsumers()
|
|||
//log.Printf("[sendAggData] 成功获取 NodeConnection: %+v", nodeConn)
|
|||
break |
|||
} |
|||
|
|||
// 100 故障:程序内部问题
|
|||
// 200 故障:网络通信问题
|
|||
// 300 故障:处理超时
|
|||
if errorCode >= 200 { |
|||
the.errorHandle(errorCode, node.args.Addr, fmt.Sprintf("%s|%s", data.R(), data.T())) |
|||
} else { |
|||
//log.Printf("node[%s]node处理后回复true。处理成功的数据*** %+v *** \n\n", node.args.Addr, data.R(), data.T())
|
|||
//log.Printf("RPC[%s]node已处理的数据errorCode=%d *** %+v *** \n\n", serviceMethod, errorCode, v)
|
|||
log.Printf("****** RPC[%s]node已处理的数据errorCode=%d ****** \n\n", serviceMethod, errorCode) |
|||
if err != nil || nodeConn == nil { |
|||
log.Printf("[sendRawData]1. 达到最大重试次数,无法获取健康节点连接,错误: %v", err) |
|||
return err |
|||
} |
|||
} |
|||
|
|||
// NodeRegister 是 RPC 服务方法,由 et_node 远程调用
|
|||
func (the *EtMaster) NodeRegister(nodeArgs *common_models.NodeArgs, reply *bool) error { |
|||
node := &NodeRpc{ |
|||
args: nodeArgs, |
|||
resultCH: make(chan int, 1), |
|||
aggResultCH: make(chan int, 1), |
|||
client: nil, |
|||
} |
|||
//master 初始化 node client
|
|||
client, err := rpc.Dial("tcp", nodeArgs.Addr) |
|||
if err != nil { |
|||
log.Printf("链接node失败-> node[%v]", nodeArgs.Addr) |
|||
return err |
|||
// 记录调用 Node.ProcessData 的时间
|
|||
//defer LogProcessDataTimeCost(nodeConn.NArgs.Addr, "[]aggData", time.Now())
|
|||
// RPC 调用 Node.ProcessData,传递 []*pb.AggData
|
|||
resultChan := make(chan error, 1) |
|||
log.Printf("[sendRawData]2.开始调用 RPC[Node.HandleRawData] %s", dataLog) |
|||
callStartTime := time.Now() |
|||
callErr := nodeConn.CallHandleIotaData(thingId, data) |
|||
log.Printf("<--[sendRawData]3.RPC调用成功。耗时: %v,%s", time.Since(callStartTime), dataLog) |
|||
resultChan <- callErr |
|||
|
|||
// 设置超时
|
|||
select { |
|||
case callErr := <-resultChan: |
|||
if callErr != nil { |
|||
log.Printf("[sendRawData]4.RPC调用结束,错误: %+v,%s", callErr, dataLog) |
|||
return callErr |
|||
} |
|||
//log.Printf("[sendRawData]4.RPC调用成功")
|
|||
case <-time.After(5 * time.Minute): // 设置超时
|
|||
log.Printf("[sendRawData]4.请求超过5分钟。%s", dataLog) |
|||
return errors.New("请求超时5m") |
|||
} |
|||
|
|||
node.client = client |
|||
the.addOrUpdate(nodeArgs.Addr, node) |
|||
log.Printf("node服务[%v] 注册成功", nodeArgs) |
|||
the.printNodes() |
|||
*reply = true |
|||
return nil |
|||
} |
|||
|
|||
func (the *EtMaster) NodeHeart(nodeArgs *common_models.NodeArgs, reply *bool) error { |
|||
if !the.clientIsValid(nodeArgs.Addr) { |
|||
log.Printf("收到-未注册的node[%v] 心跳", nodeArgs) |
|||
*reply = false |
|||
err := the.NodeRegister(nodeArgs, reply) |
|||
func (mm *ETMaster) sendAggData(structId string, data []string) error { |
|||
dataLog := fmt.Sprintf("structId[%s]共[%d]条数据。", structId, len(data)) |
|||
//log.Printf("[AggData-Publishing][sendAggData]1.开始处理。%s", dataLog)
|
|||
|
|||
var nodeConn *node_manager.NodeConnection |
|||
var err error |
|||
retry := 0 |
|||
|
|||
for retry < 3 { |
|||
startTime := time.Now() |
|||
nodeConn, err = mm.nodeManager.GetNodeConnection() |
|||
duration := time.Since(startTime) // 计算获取连接的耗时
|
|||
log.Printf("[AggData-Publishing][sendAggData]2.获取 NodeConnection 耗时: %v", duration) |
|||
|
|||
if err != nil { |
|||
return errors.New("未注册的node") |
|||
} else { |
|||
*reply = true |
|||
log.Printf("收到未注册的node[%v]心跳,master已将node重新注册。", nodeArgs) |
|||
return nil |
|||
log.Printf("[AggData-Publishing][sendAggData]2.1获取 NodeConnection 失败,错误: %v", err) |
|||
//m.kafkaDS.StopConsumers() // TODO 暂停消费 Kafka 消息
|
|||
//log.Println("============ Kafka 消费已暂停...")
|
|||
retry++ |
|||
time.Sleep(time.Duration(2<<retry) * time.Second) // 指数退避
|
|||
continue |
|||
} |
|||
|
|||
// TODO 成功获取连接,恢复 Kafka 消费并退出循环
|
|||
//m.kafkaDS.ResumeConsumers()
|
|||
//log.Printf("[sendAggData] 成功获取 NodeConnection: %+v", nodeConn)
|
|||
|
|||
break |
|||
} |
|||
|
|||
log.Printf("收到-node[%v] 心跳", nodeArgs) |
|||
*reply = true |
|||
if err != nil || nodeConn == nil { |
|||
log.Printf("[AggData-Publishing][sendAggData]2.2 达到最大重试次数,无法获取健康节点连接,错误: %v", err) |
|||
return err |
|||
} |
|||
|
|||
return nil |
|||
} |
|||
// 记录调用 Node.ProcessData 的时间
|
|||
//defer LogProcessDataTimeCost(nodeConn.NArgs.Addr, "[]aggData", time.Now())
|
|||
// RPC 调用 Node.ProcessData,传递 []*pb.AggData
|
|||
resultChan := make(chan error, 1) |
|||
log.Printf("[AggData-Publishing][sendAggData]3.开始调用 RPC[Node.HandleAggData] %s", dataLog) |
|||
callStartTime := time.Now() |
|||
callErr := nodeConn.CallHandleAggData(structId, data) |
|||
log.Printf("[AggData-Publishing][sendAggData]4.RPC调用耗时: %v,%s", time.Since(callStartTime), dataLog) |
|||
resultChan <- callErr |
|||
|
|||
// NodeUnRegister 节点RPC 注销
|
|||
func (the *EtMaster) NodeUnRegister(nodeArgs *common_models.NodeArgs, reply *bool) error { |
|||
value, ok := the.nodeMap.Load(nodeArgs.Addr) |
|||
node := value.(*NodeRpc) |
|||
if ok && node.client != nil { |
|||
err := node.client.Close() |
|||
if err != nil { |
|||
log.Printf("节点[%s] client关闭异常 %s", nodeArgs.Addr, err.Error()) |
|||
select { |
|||
case callErr := <-resultChan: |
|||
if callErr != nil { |
|||
log.Printf("[AggData-Publishing][sendAggData]4.RPC调用结束,错误: %+v,%s", callErr, dataLog) |
|||
return callErr |
|||
} |
|||
the.nodeMap.Delete(nodeArgs.Addr) |
|||
//log.Printf("[sendAggData]4.RPC调用成功")
|
|||
case <-time.After(5 * time.Minute): // 设置超时
|
|||
log.Printf("[AggData-Publishing][sendAggData]请求超过5分钟。%s", dataLog) |
|||
return errors.New("请求超时5m") |
|||
} |
|||
|
|||
log.Printf("node服务[%v] 注销成功", nodeArgs) |
|||
*reply = true |
|||
return nil |
|||
} |
|||
|
|||
func (the *EtMaster) WaitNodeRegister() { |
|||
log.Println("等待 node进行注册") |
|||
// monitorSendStatus 监控发送状态
|
|||
func (mm *ETMaster) monitorSendStatus(status *SendStatus, statusName string) { |
|||
for { |
|||
if the.nodeMapCount() > 0 { |
|||
break |
|||
inProgressCount := atomic.LoadInt32(&status.inProgressCount) |
|||
if inProgressCount > status.limitThreshold { |
|||
atomic.StoreInt32(&status.receiving, 0) |
|||
log.Printf("[%s] 未完成消息数量超过阈值,暂停接收新的消息。%+v\n", statusName, status) |
|||
} else { |
|||
atomic.StoreInt32(&status.receiving, 1) |
|||
} |
|||
time.Sleep(time.Second * 10) |
|||
time.Sleep(500 * time.Millisecond) |
|||
} |
|||
} |
|||
|
|||
func (the *EtMaster) ConnectNode() { |
|||
the.nodeMap.Range(func(key, value interface{}) bool { |
|||
node := value.(*NodeRpc) |
|||
nodeAddr := key.(string) |
|||
// MonitorShutdown 监控退出信号
|
|||
func (mm *ETMaster) MonitorShutdown() { |
|||
sigChan := make(chan os.Signal, 1) |
|||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) |
|||
|
|||
if node.client == nil { |
|||
client, err := rpc.Dial("tcp", nodeAddr) |
|||
if err != nil { |
|||
log.Printf("链接node失败-> node[%v]", nodeAddr) |
|||
return true |
|||
} |
|||
sig := <-sigChan |
|||
log.Printf("************ 接收到信号: %s,正在关闭服务器...", sig) |
|||
|
|||
node.client = client |
|||
the.nodeMap.Store(nodeAddr, node) |
|||
} |
|||
mm.closeDataHandlers(&mm.rawDataHandlers, "DLP_DATA_RAW") |
|||
mm.closeDataHandlers(&mm.aggDataHandlers, "DLP_DATA_AGG") |
|||
|
|||
return true |
|||
}) |
|||
mm.errMessagesKafkaProducer.Close() |
|||
mm.grpcServer.GracefulStop() |
|||
log.Println("************ 服务器已成功关闭") |
|||
} |
|||
|
|||
func (the *EtMaster) addOrUpdate(key string, newNode *NodeRpc) { |
|||
if val, ok := the.nodeMap.Load(key); ok { |
|||
hisNode := val.(*NodeRpc) |
|||
hisNode.client = newNode.client |
|||
the.nodeMap.Store(key, hisNode) |
|||
} else { |
|||
the.nodeMap.Store(key, newNode) |
|||
} |
|||
} |
|||
func (the *EtMaster) nodeMapCount() int { |
|||
count := 0 |
|||
the.nodeMap.Range(func(key, value interface{}) bool { |
|||
count++ |
|||
return true |
|||
}) |
|||
return count |
|||
} |
|||
func (the *EtMaster) clientIsValid(address string) bool { |
|||
val, ok := the.nodeMap.Load(address) |
|||
if !ok { |
|||
return false |
|||
} |
|||
// closeDataHandlers 关闭数据处理器
|
|||
func (mm *ETMaster) closeDataHandlers(handlers *sync.Map, dlpTopic string) { |
|||
handlers.Range(func(key, value any) bool { |
|||
handler := value.(data_source.IMessageHandler) |
|||
ch := handler.GetDataChannel() |
|||
close(ch) |
|||
|
|||
if val.(*NodeRpc).client == nil { |
|||
return false |
|||
} |
|||
return true |
|||
} |
|||
|
|||
// 获取最少things的节点
|
|||
func (the *EtMaster) getNodeWithMinThings() *NodeRpc { |
|||
var minNode *NodeRpc |
|||
minThings := math.MaxInt64 // 初始化为最大值
|
|||
|
|||
the.nodeMap.Range(func(key, value interface{}) bool { |
|||
node := value.(*NodeRpc) |
|||
if len(node.args.ThingIds) < minThings { |
|||
minThings = len(node.args.ThingIds) |
|||
minNode = node |
|||
for data := range ch { |
|||
mm.errMessagesKafkaProducer.SendStringArrayMessage(dlpTopic, data.Id, data.Messages) |
|||
} |
|||
|
|||
return true |
|||
}) |
|||
|
|||
return minNode |
|||
} |
|||
func (the *EtMaster) printNodes() { |
|||
count := 0 |
|||
info := "" |
|||
the.nodeMap.Range(func(key, value interface{}) bool { |
|||
count++ |
|||
node := value.(*NodeRpc) |
|||
info += fmt.Sprintf("%s,%s\n", node.args.ID, node.args.Addr) |
|||
return true |
|||
}) |
|||
countInfo := fmt.Sprintf("共[%d]个节点:\n ", count) |
|||
log.Printf("%s %s\n", countInfo, info) |
|||
} |
|||
func (the *EtMaster) errorHandle(errCode int, address string, dataDesc string) { |
|||
val, ok := the.nodeMap.Load(address) |
|||
if !ok { |
|||
log.Printf("【tidyNodes】Error:不存在的node[%s]\n", address) |
|||
return |
|||
} |
|||
node := val.(*NodeRpc) |
|||
|
|||
//发送 stop 信号
|
|||
the.sleepCH <- true |
|||
log.Println("=============================================") |
|||
|
|||
// 100 故障:程序内部错误
|
|||
// 200 故障:网络通信问题
|
|||
// 300 故障:处理超时
|
|||
if errCode == 200 { |
|||
log.Printf("node[%v]连接已中断,休眠5秒后,将删除该节点。消息:%s", node.args.Addr, dataDesc) |
|||
time.Sleep(time.Second * 5) |
|||
the.nodeMap.Delete(address) |
|||
} else if errCode == 300 { |
|||
log.Printf("node[%s]处理超时,将休眠5秒后,将删除该节点。消息:%s", address, dataDesc) |
|||
time.Sleep(time.Second * 5) |
|||
the.nodeMap.Delete(address) |
|||
} |
|||
|
|||
the.printNodes() |
|||
} |
|||
|
|||
func contains(arr []string, target string) bool { |
|||
for _, value := range arr { |
|||
if value == target { |
|||
return true |
|||
} |
|||
} |
|||
return false |
|||
} |
|||
func timeCost(nodeId, deviceId string, start time.Time) { |
|||
tc := time.Since(start) |
|||
log.Printf("master调用node[%s],处理[%s]耗时%v", nodeId, deviceId, tc) |
|||
} |
|||
func boolToInt(b bool) int { |
|||
if b { |
|||
return 1 |
|||
} |
|||
return 0 |
|||
} |
|||
|
@ -0,0 +1,137 @@ |
|||
package app |
|||
|
|||
import ( |
|||
"context" |
|||
"et_rpc" |
|||
"et_rpc/pb" |
|||
"fmt" |
|||
"log" |
|||
"master/node_manager" |
|||
) |
|||
|
|||
// 实现 gRPC 服务接口
|
|||
type MasterRPCService struct { |
|||
pb.UnimplementedMasterServiceServer |
|||
nodeManager *node_manager.NodeManager // 用于存储节点信息
|
|||
} |
|||
|
|||
func NewMasterRPCService(nodeManager *node_manager.NodeManager) *MasterRPCService { |
|||
return &MasterRPCService{ |
|||
nodeManager: nodeManager, |
|||
} |
|||
} |
|||
|
|||
// 实现 RegisterNode 方法
|
|||
func (s *MasterRPCService) RegisterNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { |
|||
// 创建响应对象
|
|||
response := &pb.RpcResponse{ |
|||
Status: pb.RpcResponse_SUCCESS, |
|||
ErrorMessage: "", |
|||
} |
|||
|
|||
// 检查请求是否有效
|
|||
if req == nil { |
|||
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "节点注册失败:req 为 nil") |
|||
} |
|||
|
|||
// 添加节点
|
|||
nodeArgs := &et_rpc.NodeArgs{ |
|||
ID: req.Id, |
|||
Addr: req.Address, |
|||
Load: 0, |
|||
ResourceJson: "", |
|||
Weight: 0, |
|||
Status: 0, |
|||
ErrCode: 0, |
|||
ErrMessage: "", |
|||
} |
|||
|
|||
if err := s.nodeManager.AddNode(nodeArgs); err != nil { |
|||
msg := fmt.Sprintf("[%s]节点注册失败:%s", nodeArgs.Addr, err.Error()) |
|||
return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg) |
|||
} |
|||
|
|||
response.Status = pb.RpcResponse_SUCCESS |
|||
response.ErrorMessage = fmt.Sprintf("[%s]节点注册成功!!", req.Address) |
|||
log.Printf(response.ErrorMessage) |
|||
return response, nil |
|||
} |
|||
|
|||
// 实现 HeartbeatNode 方法
|
|||
func (s *MasterRPCService) HeartbeatNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { |
|||
// 创建响应对象
|
|||
response := &pb.RpcResponse{ |
|||
Status: pb.RpcResponse_SUCCESS, |
|||
ErrorMessage: "", |
|||
} |
|||
|
|||
// 检查请求是否有效
|
|||
if req == nil { |
|||
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil") |
|||
} |
|||
|
|||
// 尝试更新节点状态
|
|||
if !s.nodeManager.NodeExists(req.Address) { |
|||
msg := fmt.Sprintf("未注册的节点: %s", req.Address) |
|||
return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg) |
|||
} |
|||
|
|||
log.Printf("收到 Node[%s] 心跳", req.Address) |
|||
response.Status = pb.RpcResponse_SUCCESS |
|||
return response, nil |
|||
} |
|||
|
|||
// 实现 UnregisterNode 方法
|
|||
func (s *MasterRPCService) UnregisterNode(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { |
|||
// 创建响应对象
|
|||
response := &pb.RpcResponse{ |
|||
Status: pb.RpcResponse_SUCCESS, |
|||
ErrorMessage: "", |
|||
} |
|||
|
|||
// 检查请求是否有效
|
|||
if req == nil { |
|||
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil") |
|||
} |
|||
|
|||
// 尝试更新节点状态
|
|||
if !s.nodeManager.RemoveNode(req.Address) { |
|||
log.Printf("删除节点Node[%s],节点不存在", req.Address) |
|||
//return s.createErrorResponse(pb.RpcResponse_NOT_FOUND, msg)
|
|||
} |
|||
|
|||
log.Printf("节点Node[%s]已删除", req.Address) |
|||
response.Status = pb.RpcResponse_SUCCESS |
|||
return response, nil |
|||
} |
|||
|
|||
// 实现 CheckMasterStatus 方法
|
|||
func (s *MasterRPCService) CheckMasterStatus(ctx context.Context, req *pb.NodeRequest) (*pb.RpcResponse, error) { |
|||
// 创建响应对象
|
|||
response := &pb.RpcResponse{ |
|||
Status: pb.RpcResponse_SUCCESS, |
|||
ErrorMessage: "", |
|||
} |
|||
|
|||
// 检查请求是否有效
|
|||
if req == nil { |
|||
return s.createErrorResponse(pb.RpcResponse_INVALID_ARGUMENT, "请求无效: req 为 nil") |
|||
} |
|||
|
|||
// 记录主节点状态检查信息
|
|||
log.Printf("主节点状态被节点检查: ID=%s", req.Address) |
|||
return response, nil |
|||
} |
|||
|
|||
// mustEmbedUnimplementedMasterServiceServer 是一个占位符方法
|
|||
func (s *MasterRPCService) mustEmbedUnimplementedMasterServiceServer() {} |
|||
|
|||
// createErrorResponse 用于创建错误响应
|
|||
func (s *MasterRPCService) createErrorResponse(status pb.RpcResponse_Status, message string) (*pb.RpcResponse, error) { |
|||
response := &pb.RpcResponse{ |
|||
Status: status, |
|||
ErrorMessage: message, |
|||
} |
|||
log.Printf(message) // 记录错误信息
|
|||
return response, fmt.Errorf(message) |
|||
} |
Loading…
Reference in new issue