diff --git a/node/agg_worker/agg_node.go b/node/agg_worker/agg_node.go deleted file mode 100644 index 02e0604..0000000 --- a/node/agg_worker/agg_node.go +++ /dev/null @@ -1,85 +0,0 @@ -package agg_worker - -import ( - "et_analyze" - "gitea.anxinyun.cn/container/common_models" - "github.com/google/uuid" - "log" - "net/rpc" - "node/et_worker/et_recv" - "os" - "time" -) - -type AggNode struct { - recvDataHandler *et_recv.RecvDataHanler -} - -func NewAggWorker() *AggNode { - return &AggNode{ - recvDataHandler: et_recv.NewRecvDataHanler(), - } -} - -// Handler 是 RPC 接口,由 master 远程调用 -func (the *AggNode) Handler(aggData common_models.AggData, replay *bool) error { - *replay = true - err := the.ConsumerProcess(&aggData) - if err != nil { - return err - } - return nil -} - -// ConsumerProcess 处理阈值判断业务 -func (the *AggNode) ConsumerProcess(aggData *common_models.AggData) error { - aggHandler := et_analyze.NewAggThresholdHandler() - aggHandler.ProcessData(aggData) - log.Printf("rpc聚集阈值分析[%d]-time[%s]-[%v]", aggData.SensorId, aggData.Date, aggData.Agg) - return nil -} - -// RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister -func (the *AggNode) RegisterToMaster() { - connectCount := 0 - for { - connectCount++ - if connectCount > 3 { - log.Printf("RegisterToMaster 失败 超过%d次,准备退出", connectCount-1) - time.Sleep(time.Second * 10) - os.Exit(1) - } - masterAddr := os.Getenv("masterAddr") - if masterAddr == "" { - masterAddr = "127.0.0.1:50000" - } - - time.Sleep(time.Second * 1) - master, err := rpc.Dial("tcp", masterAddr) - if err != nil { - log.Printf("链接失败-> node[%s]", masterAddr) - continue - } - - //todo 获取node自己地址 - nodeAddr := "127.0.0.1:40001" - status := `{"health_status":"healthy","load_average":{"1_min":0.75,"5_min":1.2,"15_min":0.9},"availability":"available","last_check_time":"2022-01-01T12:00:00Z"}` - resources := `{"cpu":{"cores":4,"usage":"50%","temperature":"60°C"},"memory":{"total":"8GB","used":"4GB","available":"4GB"},"storage":{"total":"256GB","used":"100GB","available":"156GB"}}` - nodeArgs := &common_models.NodeArgs{ - ID: uuid.New().String(), - NodeType: "aggNode", - Status: status, - Resources: resources, - Addr: nodeAddr, - ThingIds: []string{}, - } - - var result bool - err = master.Call("master.NodeRegister", &nodeArgs, &result) - if err != nil { - log.Printf("node[%s]注册到master[%s]异常:%v", masterAddr, nodeAddr, result) - continue - } - break - } -} diff --git a/node/app/app.go b/node/app/app.go index b02c884..bfe2fb6 100644 --- a/node/app/app.go +++ b/node/app/app.go @@ -1,23 +1,9 @@ package app import ( - "encoding/gob" - "et_Info" - "et_analyze" - "et_cache" - "et_calc" - "et_calc/group" - "et_print" - "et_push" - "et_sink" - "fmt" - "gitea.anxinyun.cn/container/common_utils/configLoad" "gopkg.in/natefinch/lumberjack.v2" "io" "log" - "net" - "net/rpc" - "node/stages" "os" "time" ) @@ -25,7 +11,7 @@ import ( func init() { multiWriter := io.MultiWriter(os.Stdout, &lumberjack.Logger{ Filename: "./logs/logInfo.log", - MaxSize: 10, // megabytes + MaxSize: 30, // megabytes MaxBackups: 20, MaxAge: 30, //days //Compress: true, @@ -36,85 +22,25 @@ func init() { } func Start() { - // etNode 注册 - nodeWorker := NewEtWorker() - // etNode 数据后处理环节 - nodeStageManage := stages.NewStageManager() - nodeStageManage.AddSource(nodeWorker.ch) - //add 业务环节 - nodeStageManage = addWorkStages(nodeStageManage) + // 启动 Node RPC 服务 + nodeManager := NewETNode() + go nodeManager.startRPCServer() + <-nodeManager.grpcServerStarted - //add 测试环节 - //nodeStageManage = addTestPrintStages(nodeStageManage) + // 初始化与 master 的连接 + nodeManager.connectAndRegisterToMaster() - // 启动 etNode 处理 - nodeStageManage.Run() + // 每 60 秒向 master 发送一次心跳 + go nodeManager.heartbeat(60 * time.Second) - gob.Register([]interface{}{}) - err := rpc.RegisterName("etNode", nodeWorker) - if err != nil { - log.Panicf("注册 etNode rpc 异常") - } - go nodeSerRpcListen() + // 启动退出监听的协程 + nodeManager.startMonitorExit() - //后移注册流程,避免node启动异常的无效注册 - nodeWorker.RegisterToMaster() + //go func() { + // for g := range chGroupData { + // log.Printf("groupItem: %v", g.Stations[0].Info.Name) + // log.Printf("chGroupData=%p,通道数据:%d/%d", chGroupData, len(chGroupData), cap(chGroupData)) + // } + //}() - for { - time.Sleep(time.Hour) - } -} -func nodeSerRpcListen() { - port := configLoad.LoadConfig().GetUint16("node.port") - listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) - if err != nil { - log.Panicf("服务启动rpc 异常=%s", err.Error()) - } - log.Printf("服务监听=> :%d", port) - for { - conn, err := listener.Accept() - if err != nil { - log.Println("rpc Accept异常") - } - log.Printf("node 建立链接 from master[%s]", conn.RemoteAddr()) - go rpc.ServeConn(conn) - } -} - -func addWorkStages(nodeStageManage *stages.StageManager) *stages.StageManager { - // raws 数据存储 - sinkRawHandler := et_sink.NewSinkRawHandler() - nodeStageManage.AddStages(sinkRawHandler.GetStage()) - // 测点信息获取 - infoHandler := et_Info.NewInfoHandler() - nodeStageManage.AddStages(infoHandler.GetStage()) - // 单测点计算 - calcHandler := et_calc.NewCalcHandler() - nodeStageManage.AddStages(calcHandler.GetStage()) - // 滑窗过滤 - cacheHandler := et_cache.NewCacheHandler() - nodeStageManage.AddStages(cacheHandler.GetStage()) - // 测点分组计算 - groupCalcHandler := group.NewGroupCalc() - nodeStageManage.AddStages(groupCalcHandler.GetStage()) - - // Theme 数据存储 - sinkThemeHandler := et_sink.NewSinkThemeHandler() - nodeStageManage.AddStages(sinkThemeHandler.GetStage()) - - // 测点阈值分析 - stationAnalyzeHandler := et_analyze.NewThresholdHandler() - nodeStageManage.AddStages(stationAnalyzeHandler.GetStage()) - - // 数据推送 - publishHandler := et_push.NewPushHandler() - nodeStageManage.AddStages(publishHandler.GetStage()) - return nodeStageManage -} - -func addTestPrintStages(nodeStageManage *stages.StageManager) *stages.StageManager { - printHandler := et_print.NewPrintHandler() - nodeStageManage.AddStages(printHandler.GetStage()) - - return nodeStageManage } diff --git a/node/app/et_node.go b/node/app/et_node.go index 4727f10..427e934 100644 --- a/node/app/et_node.go +++ b/node/app/et_node.go @@ -1,273 +1,306 @@ package app import ( - "context" - "et_analyze" + "et_rpc" + "et_rpc/pb" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" "log" - "net/rpc" - "node/et_worker/et_recv" + "net" "os" "os/signal" + "strings" + "sync" "syscall" "time" ) -type EtNode struct { - nodeInfo *common_models.NodeArgs - master *rpcMaster - ch chan *common_models.ProcessData - recvDataHandler *et_recv.RecvDataHanler - aggAnalyzeHandler *et_analyze.AggThresholdHandler +type ETNode struct { + // node rpc server 相关信息 + grpcServer *grpc.Server + nodeServer *NodeServiceServer + grpcServerStarted chan struct{} // 通知主程序RPC已经启动 + processChannels []chan []*common_models.ProcessData + groupDataChan chan []*common_models.ProcessData // 分组数据 + // node 信息 + nodeInfo *et_rpc.NodeArgs + Addr string + // master 信息 + masterAddr string + masterConn *MasterConnection } -type rpcMaster struct { - conn *rpc.Client - addr string -} +func NewETNode() *ETNode { + const processChannelsCount = 1 + processBufSize := configLoad.LoadConfig().GetInt("performance.node.processBufSize") + processChannels := make([]chan []*common_models.ProcessData, processChannelsCount) + for i := 0; i < processChannelsCount; i++ { + processChannels[i] = make(chan []*common_models.ProcessData, processBufSize) + } + nodeServer := NewNodeServer(processChannels) + + grpcServer := grpc.NewServer() + pb.RegisterNodeServiceServer(grpcServer, nodeServer) -const chSize = 1 + // 创建grpc健康检查服务 + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + // 设置初始健康状态 + healthServer.SetServingStatus("NodeService", grpc_health_v1.HealthCheckResponse_SERVING) -func NewEtWorker() *EtNode { - node := &EtNode{ - ch: make(chan *common_models.ProcessData, chSize), - recvDataHandler: et_recv.NewRecvDataHanler(), - aggAnalyzeHandler: et_analyze.NewAggThresholdHandler(), + m := &ETNode{ + grpcServer: grpcServer, + nodeServer: nodeServer, + grpcServerStarted: make(chan struct{}), + processChannels: processChannels, + //groupDataChan: make(chan []*common_models.ProcessData, 500), } - node.exitMonitor() - node.heartMonitor() - return node + + // 初始化 Node 信息 + m.nodeInfo = m.initNodeInfo() + + return m } -// IotaDataHandler 是 RPC 服务方法,由 master 远程调用 -func (the *EtNode) IotaDataHandler(iotaData common_models.IotaData, reply *bool) error { - *reply = true - err := the.ConsumerProcess(&iotaData) +func (n *ETNode) startRPCServer() { + port := configLoad.LoadConfig().GetUint16("node.port") + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { - *reply = false + log.Panicf("启动 Node RPC 服务失败:%v", err) } - return err -} + defer listener.Close() + log.Printf("启动 Node RPC 服务成功,服务端口:%d", port) -// 是沉降测试数据 -func isSettleData(data map[string]interface{}) bool { - // {"pressure":23.09,"temperature":24.93,"ssagee":16.44} - validKeys := map[string]bool{ - "pressure": true, - "temperature": true, - "ssagee": true, + time.Sleep(100 * time.Millisecond) + close(n.grpcServerStarted) + + // 启动 gRPC 服务器 + if err := n.grpcServer.Serve(listener); err != nil { + log.Panicf("gRPC 服务器服务失败:%v", err) } +} - if len(data) != 3 { - return false +func (n *ETNode) initNodeInfo() *et_rpc.NodeArgs { + // 获取主机的 IP 地址前缀 + ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix") + ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix) + // 获取主机名 + hostName, err := os.Hostname() + if err != nil { + log.Fatalf("获取主机名失败: %v", err) } + // 获取配置的端口号 + log.Printf("node [%s] ip=%s\n", hostName, ip4) + port := configLoad.LoadConfig().GetUint16("node.port") + if port == 0 { + log.Fatalf("未配置有效的端口号") + } + // 构造 Node 的地址 + nodeAddr := fmt.Sprintf("%s:%d", ip4, port) + log.Printf("node 的地址为 %s", nodeAddr) + n.Addr = nodeAddr - for key := range data { - if !validKeys[key] { - return false - } + // 初始化 Node 信息 + return &et_rpc.NodeArgs{ + ID: hostName + time.Now().Format("_20060102_150405"), + Status: et_rpc.NodeState_Healthy, + ErrCode: et_rpc.RPCReply_Success, + ResourceJson: "", + Addr: nodeAddr, } - return true } -// ConsumerProcess 将 IotaData -> ProcessData -func (the *EtNode) ConsumerProcess(iotaData *common_models.IotaData) error { - // 记录方法开始时间 - startTime := time.Now() - - //TODO #TEST BEGIN 测试静力水准仪 (现在有计算公式的单测点计算有问题,为了能跑通 沉降分组计算 测试) - //if !isSettleData(iotaData.Data.Data) { - // return nil - //} - // #TEST END +func (n *ETNode) connectAndRegisterToMaster() { + // 获取master配置 + masterHost := configLoad.LoadConfig().GetString("node.remoteMasterHost") + masterPort := configLoad.LoadConfig().GetUint16("master.port") + if masterHost == "" { + masterHost = "127.0.0.1" + } + if masterPort == 0 { + masterPort = 50000 + } + masterAddr := fmt.Sprintf("%s:%d", masterHost, masterPort) - deviceData, err := the.recvDataHandler.OnDataHandler(*iotaData) + // node 建立与 master 的连接 + masterConn, err := NewMasterConnection(masterAddr) if err != nil { - return err + log.Printf("ERROR: 建立与 Master[%s] 的连接失败!!\n", masterAddr) + } else { + n.masterConn = masterConn + log.Printf("建立与 Master[%s] 的连接成功!\n", masterAddr) } - if deviceData == nil { - return nil - } + n.masterAddr = masterAddr + n.masterConn = masterConn - log.Printf("rpc处理设备数据[%s]-time[%v]-[%v]", deviceData.DeviceId, deviceData.AcqTime, deviceData.Raw) + time.Sleep(500 * time.Millisecond) - the.ch <- &common_models.ProcessData{ - DeviceData: *deviceData, - Stations: []common_models.Station{}, + // node 向 master 发送注册请求(node 调用 master 的注册服务) + err = n.register() //尝试3次 + if err != nil { + // 3次尝试失败后退出程序 + log.Fatalf("node[%s]->master[%s] 注册失败,Error: %v", n.nodeInfo.Addr, masterAddr, err) + //log.Println(err) } - - defer func() { - duration := time.Since(startTime) - log.Printf("ConsumerProcess(iotaData *common_models.IotaData)执行时长: %v", duration) - }() - return nil } -// AggDataHandler 聚集阈值处理者,被 master 远程调用 -func (the *EtNode) AggDataHandler(aggData common_models.AggData, reply *bool) error { - *reply = true - err := the.aggAnalyzeHandler.ProcessData(&aggData) - if err != nil { - errmsg := fmt.Sprintf("[etNode.AggDataHandler]变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err) - log.Println(errmsg) - return err +func (n *ETNode) register() error { + if n.masterConn == nil { + return fmt.Errorf("n.masterConn is nil") } - log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed) - return nil -} + //n.nodeInfo.Load = len(n.chIotaData) + const maxRetries = 3 + retries := 0 -// 实现源接口 -func (the *EtNode) Process(ctx context.Context) (<-chan any, error) { - source := make(chan any, chSize) - go func() { - defer close(source) - for { - select { - case a := <-the.ch: - source <- a - log.Printf("存储数据=>source out,len=%d,%d", len(source), len(the.ch)) - case <-ctx.Done(): - log.Println("退出[source] EtNode.Process") - return - } + for { + err := n.masterConn.CallRegister(n.nodeInfo) + if err == nil { + log.Println("注册成功") + return nil // 注册成功,返回 nil + } else { + log.Printf("注册失败: %v", err) + retries++ + } + if retries >= maxRetries { + log.Println("达到最大重试次数,停止注册尝试") + return fmt.Errorf("注册失败,达到最大重试次数: %v", err) // 返回错误 } - }() - return source, nil + + // 每5秒发送一次注册消息 + time.Sleep(5 * time.Second) + } +} + +func (n *ETNode) unregister() { + reply := new(et_rpc.NodeArgs) + err := n.masterConn.CallUnregister() + if err != nil { + log.Printf("node[%s] 从master注销异常,err=%v", n.nodeInfo.Addr, err.Error()) + } else { + log.Printf("node[%s] 从master注销成功。reply=%+v", n.nodeInfo.Addr, reply) + } } -// RegisterToMaster 调用 master 发布的RPC服务方法 master.NodeRegister -func (the *EtNode) RegisterToMaster() { - maxCount := 3 - connectCount := 0 +func (n *ETNode) heartbeat(interval time.Duration) { + // 心跳3次失败后,发送注册消息 + const maxRetries = 3 + retries := 0 + var err error + for { - connectCount++ - if connectCount > maxCount { - log.Printf("RegisterToMaster 失败 超过%d次,准备退出", maxCount) - time.Sleep(time.Second * 10) - os.Exit(1) - } - masterAddr := loadMasterAddr() - masterConn, err := rpc.Dial("tcp", masterAddr) - if err != nil { - log.Printf("链接失败-> node[%s]", masterAddr) - time.Sleep(time.Second * 5) + if n.masterConn == nil { + log.Println("ERROR: masterConn is nil") + time.Sleep(1 * time.Second) continue } - the.master = &rpcMaster{ - conn: masterConn, - addr: masterAddr, - } - time.Sleep(time.Millisecond * 200) - //获取node自己地址 - ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix") - ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix) - hostName, err := os.Hostname() - log.Printf("node [%s] ip=%s\n", hostName, ip4) - port := configLoad.LoadConfig().GetUint16("node.port") - callNodeAddr := fmt.Sprintf("%s:%d", ip4, port) - - if the.nodeInfo == nil { - the.nodeInfo = &common_models.NodeArgs{ - ID: hostName + time.Now().Format("_20060102_150405"), - NodeType: "etNode", - Status: "", - Resources: "", - Addr: callNodeAddr, - ThingIds: []string{}, + + err = n.masterConn.CallHeartbeatNode(n.nodeInfo.Addr) + if err == nil { + log.Println("心跳成功!!") + retries = 0 + } else { + log.Printf("心跳消息发送失败: %v", err) + if strings.Contains(err.Error(), "未注册的节点") { + retries = 3 + } else { + retries++ } } - var result bool - err = the.master.conn.Call("master.NodeRegister", the.nodeInfo, &result) - if err != nil { - log.Printf("node[%s] 注册到 master[%s]异常:%v", the.nodeInfo.Addr, the.master.addr, result) - continue - } - break - } -} -func (the *EtNode) heartToMaster() { - maxCount := 3 - connectCount := 0 - reRegister := false - for { - connectCount++ - if connectCount > maxCount { - log.Printf("heartToMaster 失败 超过%d次", maxCount) - reRegister = true - break - } - var result bool - err := the.master.conn.Call("master.NodeHeart", the.nodeInfo, &result) - if err != nil { - log.Printf("node[%s] 心跳到 master[%s]异常:%v", the.nodeInfo.Addr, the.master.addr, result) - time.Sleep(time.Second * 5) - continue + if retries >= maxRetries { + // 发送注注册、重置重试计数 + err = n.register() + if err == nil { + log.Println("重新注册成功", n.Addr) + } else { + log.Println("重新注册失败", n.Addr) + } + + retries = 0 } - break - } - if reRegister { //触发重新注册 - log.Printf("node[%s] 心跳失败触发-重新注册到 master[%s]", the.nodeInfo.Addr, the.master.addr) - the.RegisterToMaster() - } -} -func (the *EtNode) UnRegisterToMaster() { - var result bool - if err := the.master.conn.Call("master.NodeUnRegister", the.nodeInfo, &result); err != nil { - log.Printf("node[%s] 从master注销,异常:%v", the.nodeInfo.Addr, err.Error()) - } else { - log.Printf("node[%s] 从master注销,结果:%v", the.nodeInfo.Addr, result) + // 每60秒发送一次心跳 + time.Sleep(interval) } } -func (the *EtNode) exitMonitor() { - go func() { - c := make(chan os.Signal, 1) - // 通过signal.Notify函数将信号通道c注册到系统相关的退出信号上 - // 这里使用了两个退出信号:syscall.SIGINT(Ctrl+C)和syscall.SIGTERM(系统发送的退出信号) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGKILL) - // 阻塞等待接收信号 - s := <-c - log.Printf("接收到退出信号:%v,进行清理工作", s) - the.UnRegisterToMaster() - time.Sleep(3 * time.Second) - os.Exit(0) - }() + +func (n *ETNode) startMonitorExit() { + c := make(chan os.Signal, 1) + // 通过signal.Notify函数将信号通道c注册到系统相关的退出信号上 + // 这里使用了两个退出信号:syscall.SIGINT(Ctrl+C)和syscall.SIGTERM(系统发送的退出信号) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGKILL) + // 阻塞等待接收信号 + s := <-c + log.Printf("接收到退出信号:%v,进行清理工作", s) + + // 注销 + n.unregister() + + // 等待所有数据处理完毕 + n.waitForDataProcessing() + log.Printf("退出前,通道中数据已经处理完毕!!") + + os.Exit(0) } -func (the *EtNode) heartMonitor() { + +func (n *ETNode) waitForDataProcessing() { + const waitInterval = 10 * time.Second + var wg sync.WaitGroup + + // 处理 IotaData 通道 + wg.Add(1) go func() { - ticker := time.NewTicker(time.Minute) - defer ticker.Stop() - for range ticker.C { - if the.master != nil { - log.Printf("node[%s] 心跳触发-> master[%s]", the.nodeInfo.Addr, the.master.addr) - the.heartToMaster() + defer wg.Done() + for _, ch := range n.processChannels { + for len(ch) > 0 { + time.Sleep(waitInterval) } } + }() + // 处理 groupDataChan 通道 + wg.Add(1) + go func() { + defer wg.Done() + for len(n.groupDataChan) > 0 { + time.Sleep(waitInterval) + } }() + // 等待所有 goroutine 完成 + wg.Wait() } -// LoadCh test用 -func (the *EtNode) LoadCh() chan *common_models.ProcessData { - return the.ch -} +//func LogProcessDataTimeCost(nodeId, deviceId string, start time.Time) { +// tc := time.Since(start) +// log.Printf("******** [%s][%s]装载设备信息耗时: %v", nodeId, deviceId, tc) +//} -func loadMasterAddr() string { - masterHost := configLoad.LoadConfig().GetString("node.remoteMasterHost") - masterPort := configLoad.LoadConfig().GetUint16("master.port") - if masterHost == "" { - masterHost = "127.0.0.1" - } - if masterPort == 0 { - masterPort = 50000 - } - return fmt.Sprintf("%s:%d", masterHost, masterPort) -} +// 是沉降测试数据 +//func isSettleData(data map[string]interface{}) bool { +// // {"pressure":23.09,"temperature":24.93,"ssagee":16.44} +// validKeys := map[string]bool{ +// "pressure": true, +// "temperature": true, +// "ssagee": true, +// } +// +// if len(data) != 3 { +// return false +// } +// +// for key := range data { +// if !validKeys[key] { +// return false +// } +// } +// return true +//} diff --git a/node/app/master_connection_grpc.go b/node/app/master_connection_grpc.go new file mode 100644 index 0000000..5d2e590 --- /dev/null +++ b/node/app/master_connection_grpc.go @@ -0,0 +1,242 @@ +package app + +import ( + "context" + "et_rpc" + "et_rpc/pb" + "fmt" + pool "github.com/jolestar/go-commons-pool" + "google.golang.org/grpc/health/grpc_health_v1" + "log" + "sync" + "time" +) + +type MasterConnection struct { + MasterAddr string + Addr string + NArgs *et_rpc.NodeArgs + rpcPool *pool.ObjectPool + lastUpdate time.Time // 节点信息更新时间 + ctx context.Context + mu sync.Mutex + factory *MasterGRPCClientFactory +} + +// TODO NewMasterConnection 从配置文件中获取 pool 参数 +func NewMasterConnection(masterAddr string) (*MasterConnection, error) { + ctx := context.Background() + factory := NewMasterGRPCClientFactory(masterAddr) + p := pool.NewObjectPoolWithDefaultConfig(ctx, factory) + p.Config.MaxTotal = 10 // 最大连接数 + p.Config.MaxIdle = 5 // 最大空闲连接数据 + p.Config.MinIdle = 1 // 最小空闲连接数据 + p.Config.TestOnBorrow = true + p.Config.TestOnReturn = false + p.Config.TestWhileIdle = true // 是否在空闲时检查连接有效性 + p.Config.MinEvictableIdleTime = 30 * time.Minute //空闲连接最小可驱逐时间 + //p.Config.SoftMinEvictableIdleTime = 15 * time.Minute //空闲连接软最小可驱逐时间 + + conn := &MasterConnection{ + ctx: ctx, + MasterAddr: masterAddr, + rpcPool: p, + } + + // 获取连接进行简单的测试 + obj, err := conn.rpcPool.BorrowObject(ctx) + if err != nil { + return nil, fmt.Errorf("建立RPC连接失败:%w", err) + } + defer conn.rpcPool.ReturnObject(ctx, obj) + + grpcPoolObj, ok := obj.(*MasterGRPCPoolObject) + if !ok { + log.Fatalf("类型断言失败,obj 不是 *MasterGRPCPoolObject 类型") + } + + // 健康检查 + healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn) + resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{ + Service: "MasterService", + }) + + if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + return nil, fmt.Errorf("健康检查失败: %v, 状态: %v", err, resp.Status) + } + + conn.factory = factory + return conn, nil +} + +func (n *MasterConnection) BorrowValidConnection(ctx context.Context) (*pool.PooledObject, error) { + var obj1 interface{} + var err error + + // 尝试借用连接,最多重试 3 次 + for attempts := 0; attempts < 3; attempts++ { + obj1, err = n.rpcPool.BorrowObject(ctx) + if err == nil { + break + } + log.Printf("Attempt %d: Failed to borrow object from pool: %v", attempts+1, err) + time.Sleep(1 * time.Second) + } + + if err != nil { + return nil, fmt.Errorf("borrow object error after 3 attempts: %w", err) + } + + pooledObject, ok := obj1.(*pool.PooledObject) + if !ok { + return nil, log.Output(2, "Invalid object type from pool") // 类型不匹配,返回错误 + } + + if !n.factory.ValidateObject(ctx, pooledObject) { + err = n.factory.DestroyObject(ctx, pooledObject) + if err != nil { + return nil, err + } + + obj1, err = n.factory.MakeObject(ctx) + if err != nil { + return nil, err + } + + pooledObject, ok = obj1.(*pool.PooledObject) + if !ok { + return nil, log.Output(2, "Invalid object type from pool after recreation") // 类型不匹配,返回错误 + } + } + + return pooledObject, nil +} + +func (n *MasterConnection) CallRegister(nodeInfo *et_rpc.NodeArgs) error { + n.NArgs = nodeInfo + + // 创建新的上下文并设置超时 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // 从连接池中借用一个连接 + obj1, err := n.rpcPool.BorrowObject(ctx) + if err != nil { + return fmt.Errorf("gRPC[CallRegister] 借用对象错误: %w", err) + } + + // 使用连接相关处理 + rpcPoolObj := obj1.(*MasterGRPCPoolObject) + + defer func() { + if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { + log.Printf("gRPC[CallRegister] 归还对象到连接池失败: %v", err) + } + }() + + // 进行 RPC 调用 + request := &pb.NodeRequest{ + Id: fmt.Sprintf("master-%s", n.MasterAddr), + Address: nodeInfo.Addr, + ThingIds: make([]string, 0), + } + resp, err := rpcPoolObj.Client.RegisterNode(ctx, request) + if err != nil { + return fmt.Errorf("调用 gRPC[CallRegister] 错误: %w", err) + } + log.Printf("调用 gRPC[CallRegister] resp=%+v, err=%+v\n", resp, err) + + // 归还连接 + //err = n.rpcPool.ReturnObject(ctx, obj1) + //if err != nil { + // log.Printf("归还对象到连接池失败: %v", err) + // return fmt.Errorf("归还对象错误: %w", err) + //} + + return nil +} + +func (n *MasterConnection) CallUnregister() error { + // 创建新的上下文并设置超时 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // 从连接池中借用一个连接 + obj1, err := n.rpcPool.BorrowObject(ctx) + if err != nil { + return fmt.Errorf("gRPC[CallUnregister] 借用对象错误: %w", err) + } + + // 使用连接相关处理 + rpcPoolObj := obj1.(*MasterGRPCPoolObject) + + defer func() { + if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { + log.Printf("gRPC[CallUnregister] 归还对象到连接池失败: %v", err) + } + }() + + // 进行 RPC 调用 + request := &pb.NodeRequest{ + Id: "", + Address: n.Addr, + ThingIds: make([]string, 0), + } + resp, err := rpcPoolObj.Client.UnregisterNode(ctx, request) + if err != nil { + return fmt.Errorf("调用 gRPC[CallUnregister] 错误: %w", err) + } + log.Printf("调用 gRPC[CallUnregister] resp=%+v, err=%+v\n", resp, err) + + // 归还连接 + err = n.rpcPool.ReturnObject(ctx, obj1) + if err != nil { + log.Printf("归还对象到连接池失败: %v", err) + return fmt.Errorf("归还对象错误: %w", err) + } + + return nil +} + +func (n *MasterConnection) CallHeartbeatNode(nodeAddr string) error { + // 创建新的上下文并设置超时 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // 从连接池中借用一个连接 + obj1, err := n.rpcPool.BorrowObject(ctx) + if err != nil { + return fmt.Errorf("gRPC[CallHeartbeatNode] 借用对象错误: %w", err) + } + + // 使用连接相关处理 + rpcPoolObj := obj1.(*MasterGRPCPoolObject) + + defer func() { + if err := n.rpcPool.ReturnObject(ctx, obj1); err != nil { + log.Printf("gRPC[CallHeartbeatNode] 归还对象到连接池失败: %v", err) + } + log.Printf("gRPC[CallHeartbeatNode] 已归还对象 obj1 。") + }() + + // 进行 RPC 调用 + request := &pb.NodeRequest{ + Id: "", + Address: nodeAddr, + ThingIds: make([]string, 0), + } + resp, err := rpcPoolObj.Client.HeartbeatNode(ctx, request) + if err != nil { + return fmt.Errorf("调用 gRPC[CallHeartbeatNode] 错误: %w", err) + } + log.Printf("调用 gRPC[CallHeartbeatNode] resp=%+v, err=%+v\n", resp, err) + + // 归还连接 + //err = n.rpcPool.ReturnObject(ctx, obj1) + //if err != nil { + // log.Printf("归还对象到连接池失败: %v", err) + // return fmt.Errorf("归还对象错误: %w", err) + //} + + return nil +} diff --git a/node/app/master_connection_pool_grpc.go b/node/app/master_connection_pool_grpc.go new file mode 100644 index 0000000..882f141 --- /dev/null +++ b/node/app/master_connection_pool_grpc.go @@ -0,0 +1,112 @@ +package app + +import ( + "context" + "et_rpc/pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" + "log" + "time" + + pool "github.com/jolestar/go-commons-pool" +) + +type MasterGRPCPoolObject struct { + Conn *grpc.ClientConn // 保存 gRPC 连接 + Client pb.MasterServiceClient // gRPC 客户端 +} + +type MasterGRPCClientFactory struct { + address string +} + +// NewGRPCClientFactory 创建新的 gRPC 连接工厂 +func NewMasterGRPCClientFactory(address string) *MasterGRPCClientFactory { + return &MasterGRPCClientFactory{ + address: address, + } +} + +func (f *MasterGRPCClientFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // 定义重试策略 + serviceConfig := `{ + "methodConfig": [{ + "name": [{"service": "MasterService", "method": "*"}], + "retryPolicy": { + "maxAttempts": 2, + "initialBackoff": "1s", + "maxBackoff": "10s", + "backoffMultiplier": 2, + "retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"] + } + }] + }` + + conn, err := grpc.NewClient( + f.address, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(serviceConfig), + ) + + if err != nil { + return nil, err // 如果3次都失败,返回错误 + } + + client := pb.NewMasterServiceClient(conn) + return pool.NewPooledObject( + &MasterGRPCPoolObject{ + Conn: conn, + Client: client, + }, + ), nil +} + +// 销毁 gRPC 连接 +func (f *MasterGRPCClientFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error { + grpcPoolObj := object.Object.(*MasterGRPCPoolObject) + if grpcPoolObj.Client != nil { + // 关闭连接 + grpcPoolObj.Conn.Close() // gRPC 客户端连接关闭 + } + return nil +} + +// 验证 gRPC 连接的有效性 +func (f *MasterGRPCClientFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool { + grpcPoolObj := object.Object.(*MasterGRPCPoolObject) + + select { + case <-ctx.Done(): + return false // 如果上下文已经取消,返回无效 + default: + // 继续进行有效性检查 + } + + healthClient := grpc_health_v1.NewHealthClient(grpcPoolObj.Conn) + resp, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{ + Service: "MasterService", + }) + + if err != nil || resp.Status != grpc_health_v1.HealthCheckResponse_SERVING { + log.Println("ValidateObject failed:", err) + return false + } + + return true +} + +// 激活 gRPC 连接 +func (f *MasterGRPCClientFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error { + // 可以在这里发送心跳请求以确保连接有效 + return nil +} + +// 非激活 gRPC 连接 +func (f *MasterGRPCClientFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error { + // 可以在这里进行连接的重置,例如清除状态或缓存 + return nil +} diff --git a/node/app/node_process_stage.go b/node/app/node_process_stage.go new file mode 100644 index 0000000..9375814 --- /dev/null +++ b/node/app/node_process_stage.go @@ -0,0 +1,68 @@ +package app + +import ( + "et_Info" + "et_analyze" + "et_cache" + "et_cache/cacheSer" + "et_calc" + "et_calc/group" + "et_push" + "et_sink" + "gitea.anxinyun.cn/container/common_models" + "gitea.anxinyun.cn/container/common_utils" + "gitea.anxinyun.cn/container/common_utils/configLoad" + "gitea.anxinyun.cn/container/common_utils/dbHelper" + "gitea.anxinyun.cn/container/common_utils/storage/storageDBs" + "node/stages" +) + +func CreateStages(inChan chan []*common_models.ProcessData, outChan chan []*common_models.ProcessData) *stages.StageManager { + redisAddr := configLoad.LoadConfig().GetString("redis.address") + configHelper := common_utils.NewConfigHelper(redisAddr) + cacheServer := cacheSer.NewCacheServer(configHelper) + esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses") + esESHelper := dbHelper.NewESHelper(esAddresses, "", "") + storageConsumers := storageDBs.LoadIStorageConsumer() + + // etNode 数据后处理环节 + nodeStageManage := stages.NewStageManager(outChan) + nodeStageManage.AddSource(inChan) + + // raws 数据存储 + sinkRawHandler := et_sink.NewSinkRawHandler(storageConsumers) + nodeStageManage.AddStages(sinkRawHandler.GetStage()) + + // 测点信息获取 + infoHandler := et_Info.NewInfoHandler(configHelper) + nodeStageManage.AddStages(infoHandler.GetStage()) + + // 单测点计算 + calcHandler := et_calc.NewCalcHandler(configHelper, cacheServer, esESHelper) + nodeStageManage.AddStages(calcHandler.GetStage()) + + // 测点数据缓存(滑窗过滤) + cacheHandler := et_cache.NewCacheHandler(cacheServer) + nodeStageManage.AddStages(cacheHandler.GetStage()) + + // 测点分组计算 + groupCalcHandler := group.NewGroupCalc(configHelper) + nodeStageManage.AddStages(groupCalcHandler.GetStage()) + + // Theme 数据存储 + sinkThemeHandler := et_sink.NewSinkThemeHandler(storageConsumers) + nodeStageManage.AddStages(sinkThemeHandler.GetStage()) + + // 测点阈值分析 + stationAnalyzeHandler := et_analyze.NewThresholdHandler() + nodeStageManage.AddStages(stationAnalyzeHandler.GetStage()) + + //数据推送 + pushEnable := configLoad.LoadConfig().GetBool("push.enable") + if pushEnable { + publishHandler := et_push.NewPushHandler() + nodeStageManage.AddStages(publishHandler.GetStage()) + } + + return nodeStageManage +} diff --git a/node/app/node_server.go b/node/app/node_server.go new file mode 100644 index 0000000..98316a5 --- /dev/null +++ b/node/app/node_server.go @@ -0,0 +1,459 @@ +package app + +import ( + "context" + "encoding/json" + "et_analyze" + "et_rpc/pb" + "fmt" + "gitea.anxinyun.cn/container/common_models" + "gitea.anxinyun.cn/container/common_utils/configLoad" + "golang.org/x/time/rate" + "log" + "math" + "node/et_worker/et_recv" + "strings" + "sync" + "time" +) + +type UTCTime time.Time + +// UnmarshalJSON 自定义日期解析 +func (ut *UTCTime) UnmarshalJSON(data []byte) error { + // 去掉 JSON 字符串的引号 + str := string(data) + if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' { + return fmt.Errorf("invalid time format: %s", str) + } + str = str[1 : len(str)-1] + + // 解析自定义日期格式 + t, err := time.Parse("2006-01-02T15:04:05.999-0700", str) + if err != nil { + return fmt.Errorf("failed to parse time: %v", err) + } + + // 赋值 + *ut = UTCTime(t) + return nil +} + +// aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}} +type AggDataJson struct { + Date UTCTime + SensorId int + StructId int + FactorId int + AggTypeId int // 聚集类型 : 10分钟/30分钟/3小时/6小时/12小时/时/日/周/月聚集 + AggMethodId int // 聚集方法 : 平均值/最大值/最小值 + Agg map[string]float64 // 聚集数据 + Changed map[string]float64 // 变化量 +} + +// NodeServiceServer 实现了 NodeServiceServer 接口 +type NodeServiceServer struct { + pb.UnimplementedNodeServiceServer // 嵌入未实现的接口,以便将来兼容性 + + Addr string + Load int32 + + iotaDataHandler *et_recv.RecvDataHandler + aggDataHandler *et_analyze.AggThresholdHandler + + iotaChannels []chan []common_models.IotaData + processChannels []chan []*common_models.ProcessData + outProcessChannel chan []*common_models.ProcessData + + nextProcessChannel int // 记录下一个要尝试的通道索引 + nextChannel int // 记录下一个要尝试的通道索引 + mu sync.Mutex // 保护 nextChannel 的并发访问 +} + +func NewNodeServer(processChannels []chan []*common_models.ProcessData) *NodeServiceServer { + stageResultBufSize := configLoad.LoadConfig().GetInt("performance.node.stageResultBufSize") + iotaBufSize := configLoad.LoadConfig().GetInt("performance.node.iotaBufSize") + + s := &NodeServiceServer{ + iotaDataHandler: et_recv.NewRecvDataHandler(), + aggDataHandler: et_analyze.NewAggThresholdHandler(), + processChannels: processChannels, + outProcessChannel: make(chan []*common_models.ProcessData, stageResultBufSize), + } + s.iotaChannels = s.NewIotaChannels(len(processChannels), iotaBufSize) + + // 启动 DeviceInfo 缓存更新协程,设置为每 10 分钟更新一次 + s.iotaDataHandler.RecvConfigHelper.StartUpdateDeviceInfo(10*time.Minute, 30) + + // 处理 process data + s.RunStageManager() + time.Sleep(500 * time.Millisecond) + + // 将 IotaData 转换为 DeviceData, 转换后数据发送到 s.processChannels + go s.HandleIotaChannels() + + return s +} + +func (s *NodeServiceServer) NewIotaChannels(count, bufferSize int) []chan []common_models.IotaData { + channels := make([]chan []common_models.IotaData, count) + for i := 0; i < count; i++ { + channels[i] = make(chan []common_models.IotaData, bufferSize) + } + return channels +} + +func (s *NodeServiceServer) HandleIotaChannels() { + iotaWorkerCount := configLoad.LoadConfig().GetInt("performance.node.iotaWorkerCount") + + for index, ch := range s.iotaChannels { + for w := 0; w < iotaWorkerCount; w++ { + go func(c chan []common_models.IotaData) { + s.HandleIotaChan(c, index) + }(ch) + } + } +} + +// func (s *NodeServiceServer) HandleProcessChannels() { +func (s *NodeServiceServer) RunStageManager() { + for _, ch := range s.processChannels { + go func(c chan []*common_models.ProcessData) { + stageMgr := CreateStages(c, s.outProcessChannel) + stageMgr.RunStages() + }(ch) + } +} + +func (s *NodeServiceServer) sendToIotaChannels(data []common_models.IotaData) (chan []common_models.IotaData, bool) { + startTime := time.Now() + defer func() { + elapsedTime := time.Since(startTime) + log.Printf("sendToIotaChannels elapsedTime= %s\n", elapsedTime) + //log.Printf("Final iotaData channel states: ") + //for _, ch := range s.iotaChannels { + // log.Printf("iotaChan[%p]: %d/%d\n", ch, len(ch), cap(ch)) + //} + }() + + var selectedChannel chan []common_models.IotaData + minLength := math.MaxInt32 + + // 选择最空闲的通道 + for _, ch := range s.iotaChannels { + if len(ch) < minLength { + minLength = len(ch) + selectedChannel = ch + } + } + + // 尝试发送数据 + select { + case selectedChannel <- data: + return selectedChannel, true + case <-time.After(100 * time.Millisecond): // 设置超时时间 + log.Println("Timeout while trying to send iotaData.") + return nil, false + } + + return nil, false // 如果所有通道都满了,返回 nil 和 false +} + +// 将 IotaData 转换为 DeviceData, 转换后数据发送到 s.processChannels +func (s *NodeServiceServer) HandleIotaChan(ch chan []common_models.IotaData, index int) { + // 创建一个速率限制器,每秒允许处理 100 条数据 + limiter := rate.NewLimiter(10, 1) // 100 次/秒,突发容量为 1 + + for data := range ch { + // 等待直到可以处理下一条数据 + if err := limiter.Wait(context.Background()); err != nil { + log.Printf("处理速率限制错误: %v", err) + continue + } + + go func(iotaDataArray []common_models.IotaData) { + dataHandleTime := time.Now() // 记录 data 的处理开始时间 + formattedTime := dataHandleTime.Format("2006-01-02 15:04:05.999999999") + defer func() { + if r := recover(); r != nil { + log.Printf("Recovered from panic: %v", r) + } + log.Printf("4.iotaDataArray[%v] 处理耗时:%v", formattedTime, time.Since(dataHandleTime)) + }() + + log.Printf("1.iotaDataArray[%v] 准备处理。processChannel[%p]数据量:%d/%d", formattedTime, ch, len(ch), cap(ch)) + processDataArray := make([]*common_models.ProcessData, 0, len(iotaDataArray)) + for _, r := range iotaDataArray { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + iotaHandleTime := time.Now() // 记录 iota 数据处理开始时间 + deviceData, err := s.iotaDataHandler.OnDataHandler(ctx, r) + iotaHandleElapse := time.Since(iotaHandleTime) // 计算数据处理耗时 + + if err != nil { + log.Printf("IotaData->DeviceData[%s] 转换错误:%s. 耗时:%v。", r.DeviceId, err.Error(), iotaHandleElapse) + return + } + if deviceData == nil { + log.Printf("IotaData->DeviceData[%s] 转换错误:deviceData is nil. 耗时:%v。", r.DeviceId, iotaHandleElapse) + return + } + + // 将数据写入 processDataChannel + pd := &common_models.ProcessData{ + DeviceData: *deviceData, + Stations: []common_models.Station{}, + } + processDataArray = append(processDataArray, pd) + } + + log.Printf("2.iotaDataArray[%v] 已处理完 IotaData->DeviceData ", formattedTime) + + sendTime := time.Now() + processChannel, ok := s.sendToProcessChannels(processDataArray, index) // 这里会一直等到有资源 + if !ok { + // TODO processChannels 满了之后做什么处理? + log.Printf("3.iotaDataArray[%v] s.processChannels %d个通道都已满,被阻塞。", formattedTime, len(s.processChannels)) + + } else { + log.Printf("3.iotaDataArray[%v] 已发送至s.processChannels。processChannel[%p]数据量:%d/%d, \n发送耗时:%v ,iotaDataArray处理耗时:%v", + formattedTime, processChannel, len(processChannel), cap(processChannel), time.Since(sendTime), time.Since(dataHandleTime)) + } + }(data) + } +} + +func (s *NodeServiceServer) sendToProcessChannels(data []*common_models.ProcessData, index int) (chan []*common_models.ProcessData, bool) { + startTime := time.Now() + //timeoutDuration := configLoad.LoadConfig().GetUint16("performance.node.processTimeout") // 设置超时时间为 60 秒 + + defer func() { + elapsedTime := time.Since(startTime) + log.Printf("[sendToProcessChannels] elapsedTime= %s\n", elapsedTime) + //log.Printf("[sendToProcessChannels] Final processData channel states:") + //for _, ch := range s.processChannels { + // log.Printf("[sendToProcessChannels] processChan[%p]: %d/%d\n", ch, len(ch), cap(ch)) + //} + }() + + selectedChannel := s.processChannels[index] + log.Printf("[sendToProcessChannels] 尝试发送。 channel[%p]: %d/%d\n", selectedChannel, len(selectedChannel), cap(selectedChannel)) + + // 超时限制 + //timeout := time.After(time.Duration(timeoutDuration)) + + for { + select { + case selectedChannel <- data: + // 发送成功 + log.Printf("[sendToProcessChannels] 发送成功。channel[%p]: %d/%d\n", selectedChannel, len(selectedChannel), cap(selectedChannel)) + time.Sleep(50 * time.Millisecond) + return selectedChannel, true + default: + //log.Printf("[sendToProcessChannels] channel[%p] 已满 cap=%d,继续尝试发送。\n", selectedChannel, cap(selectedChannel)) + time.Sleep(200 * time.Millisecond) // 等待一段时间后再尝试 + //case <-timeout: + // log.Printf("[sendToProcessChannels] 发送超时超过1分钟,将停止尝试。\n") + // return nil, false // 超时返回 nil 和 false + //case <-time.After(500 * time.Millisecond): + // log.Printf("[sendToProcessChannels] 发送超时500ms,将继续尝试channel[%p]。\n", selectedChannel) + // continue + } + } +} + +var limiter = rate.NewLimiter(rate.Limit(500), 1) // 每秒最多处理 1000 条数据 + +func (s *NodeServiceServer) HandleIotaData(ctx context.Context, req *pb.HandleDataRequest) (*pb.HandleDataResponse, error) { + if err := limiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("请求速率过高,请稍后重试") + } + + startTime := time.Now() + + // 1. 数据转换 + conversionStart := time.Now() + iotaDataList := s.convertMessages2IotaData(req.Messages) + conversionDuration := time.Since(conversionStart) + log.Printf("[INFO][HandleIotaData] 1.数据转换耗时: %v, 共 %d 条数据。", conversionDuration, len(req.Messages)) + + // 2. 发送到 Iota 通道 + sendStart := time.Now() + _, ok := s.sendToIotaChannels(iotaDataList) + sendDuration := time.Since(sendStart) + log.Printf("[INFO] [HandleIotaData] 2.sendToIotaChannels耗时: %v。", sendDuration) + //log.Printf("[INFO] [HandleIotaData] 2.sendToIotaChannels耗时: %v。通道状态: %v, 通道指针: %p, 当前长度: %d/%d", + // sendDuration, ok, ch, len(ch), cap(ch)) + + if !ok { + log.Printf("[WARN] [HandleIotaData] 2.所有 Iota 通道已满,无法发送数据,通道数量: %d", len(s.iotaChannels)) + return &pb.HandleDataResponse{ + Addr: s.Addr, + Load: s.Load, + Status: pb.HandleDataResponse_SUCCESS, + ErrorMessage: "s.iotaChannels 通道已满", + }, nil + } + + // 3. 计算总处理时长 + totalDuration := time.Since(startTime) + log.Printf("[INFO] [HandleIotaData] 3.总处理时长: %v", totalDuration) + + return &pb.HandleDataResponse{ + Addr: s.Addr, + Load: s.Load, + Status: pb.HandleDataResponse_SUCCESS, + ErrorMessage: "", + }, nil +} + +// HandleAggData 处理聚集数据并返回节点响应 +func (s *NodeServiceServer) HandleAggData(ctx context.Context, req *pb.HandleDataRequest) (*pb.HandleDataResponse, error) { + if err := limiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("请求速率过高,请稍后重试") + } + + startTime := time.Now() + + // 1. 数据转换 + conversionStart := time.Now() + aggDataList := s.convertMessages2AggData(req.Messages) + conversionDuration := time.Since(conversionStart) + log.Printf("[INFO][HandleAggData] 1.数据转换耗时: %v, 共 %d 条数据。", conversionDuration, len(req.Messages)) + + // 2. 发送到 Iota 通道 + analyzeStart := time.Now() + for _, aggData := range aggDataList { + err := s.aggDataHandler.ProcessData(&aggData) + if err != nil { + errmsg := fmt.Sprintf("[etNode.AggDataHandler] 2.变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err) + log.Println(errmsg) + } + //} else { + // log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed) + //} + } + analyzeDuration := time.Since(analyzeStart) + log.Printf("[INFO][HandleAggData] 2. 变化速率阈值分析耗时: %v。", analyzeDuration) + + // 3. 计算总处理时长 + totalDuration := time.Since(startTime) + log.Printf("[INFO][HandleAggData] 3.总处理时长: %v", totalDuration) + + // 返回响应 + return &pb.HandleDataResponse{ + Addr: s.Addr, + Load: s.Load, + Status: pb.HandleDataResponse_SUCCESS, + ErrorMessage: "", + }, nil +} + +// mustEmbedUnimplementedNodeServiceServer 确保实现了接口 +func (s *NodeServiceServer) mustEmbedUnimplementedNodeServiceServer() {} + +// createErrorResponse 用于创建错误响应 +func (s *NodeServiceServer) createErrorResponse(status pb.HandleDataResponse_Status, message string) (*pb.HandleDataResponse, error) { + response := &pb.HandleDataResponse{ + Addr: s.Addr, + Load: s.Load, + Status: status, + ErrorMessage: message, + } + log.Printf(message) // 记录错误信息 + return response, fmt.Errorf(message) +} + +func (s *NodeServiceServer) convertMessages2IotaData(messages []string) []common_models.IotaData { + st := time.Now() + dataArray := make([]common_models.IotaData, 0, len(messages)) + + // 尝试批量解析 + jsonArray := fmt.Sprintf("[%s]", strings.Join(messages, ",")) + if err := json.Unmarshal([]byte(jsonArray), &dataArray); err != nil { + // 批量解析失败,逐个解析 + for _, msg := range messages { + var data common_models.IotaData + if err := json.Unmarshal([]byte(msg), &data); err != nil { + log.Printf("逐个 JSON 反序列化失败:%v", err) + continue + } + dataArray = append(dataArray, data) + } + } + + log.Printf("[convertMessages2IotaData] 序列化耗时:%v ,共解析出 %d 个 IotaData。", time.Since(st), len(dataArray)) + return dataArray +} + +func (s *NodeServiceServer) convertMessages2AggData(messages []string) []common_models.AggData { + //log.Printf("[convertMessages2AggData] len(messages)=%d ,start ...", len(messages)) + st := time.Now() + + // 预分配 aggDatas 的容量 + aggDatas := make([]common_models.AggData, 0, len(messages)) + + // 尝试批量解析 JSON 数组 + jsonArray := fmt.Sprintf("[%s]", strings.Join(messages, ",")) + var tmpDatas []AggDataJson + err := json.Unmarshal([]byte(jsonArray), &tmpDatas) + + if err != nil { + log.Printf("JSON 数组反序列化失败,尝试逐个解析:%v", err) + + // 如果批量解析失败,逐个解析 JSON 字符串 + var wg sync.WaitGroup + var mu sync.Mutex + + for _, val := range messages { + wg.Add(1) + go func(msg string) { + defer wg.Done() + + var data AggDataJson + if err := json.Unmarshal([]byte(msg), &data); err != nil { + log.Printf("逐个 JSON 反序列化失败:%v", err) + return + } + + // 加锁保护 aggDatas + mu.Lock() + aggDatas = append(aggDatas, common_models.AggData{ + Date: time.Time(data.Date), + SensorId: data.SensorId, + StructId: data.StructId, + FactorId: data.FactorId, + AggTypeId: data.AggTypeId, + AggMethodId: data.AggMethodId, + Agg: data.Agg, + Changed: data.Changed, + ThingId: "", + }) + mu.Unlock() + }(val) + } + + // 等待所有 Goroutine 完成 + wg.Wait() + } else { + // 批量解析成功,直接转换 + aggDatas = make([]common_models.AggData, len(tmpDatas)) + for i, data := range tmpDatas { + aggDatas[i] = common_models.AggData{ + Date: time.Time(data.Date), + SensorId: data.SensorId, + StructId: data.StructId, + FactorId: data.FactorId, + AggTypeId: data.AggTypeId, + AggMethodId: data.AggMethodId, + Agg: data.Agg, + Changed: data.Changed, + ThingId: "", + } + } + } + + log.Printf("[convertMessages2AggData] 序列化耗时:%v ,共解析出 %d 个 AggData。", time.Since(st), len(aggDatas)) + return aggDatas +}