package app import ( "et_rpc" "et_rpc/pb" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "google.golang.org/grpc" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "log" "net" "os" "os/signal" "strings" "sync" "syscall" "time" ) type ETNode struct { // node rpc server 相关信息 grpcServer *grpc.Server nodeServer *NodeServiceServer grpcServerStarted chan struct{} // 通知主程序RPC已经启动 processChannels []chan []*common_models.ProcessData groupDataChan chan []*common_models.ProcessData // 分组数据 // node 信息 nodeInfo *et_rpc.NodeArgs Addr string // master 信息 masterAddr string masterConn *MasterConnection } func NewETNode() *ETNode { const processChannelsCount = 1 processBufSize := configLoad.LoadConfig().GetInt("performance.node.processBufSize") processChannels := make([]chan []*common_models.ProcessData, processChannelsCount) for i := 0; i < processChannelsCount; i++ { processChannels[i] = make(chan []*common_models.ProcessData, processBufSize) } nodeServer := NewNodeServer(processChannels) grpcServer := grpc.NewServer() pb.RegisterNodeServiceServer(grpcServer, nodeServer) // 创建grpc健康检查服务 healthServer := health.NewServer() grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) // 设置初始健康状态 healthServer.SetServingStatus("NodeService", grpc_health_v1.HealthCheckResponse_SERVING) m := &ETNode{ grpcServer: grpcServer, nodeServer: nodeServer, grpcServerStarted: make(chan struct{}), processChannels: processChannels, //groupDataChan: make(chan []*common_models.ProcessData, 500), } // 初始化 Node 信息 m.nodeInfo = m.initNodeInfo() return m } func (n *ETNode) startRPCServer() { port := configLoad.LoadConfig().GetUint16("node.port") listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { log.Panicf("启动 Node RPC 服务失败:%v", err) } defer listener.Close() log.Printf("启动 Node RPC 服务成功,服务端口:%d", port) time.Sleep(100 * time.Millisecond) close(n.grpcServerStarted) // 启动 gRPC 服务器 if err := n.grpcServer.Serve(listener); err != nil { log.Panicf("gRPC 服务器服务失败:%v", err) } } func (n *ETNode) initNodeInfo() *et_rpc.NodeArgs { // 获取主机的 IP 地址前缀 ipPrefix := configLoad.LoadConfig().GetString("node.hostIpPrefix") ip4 := common_utils.ReadIP4WithPrefixFirst(ipPrefix) // 获取主机名 hostName, err := os.Hostname() if err != nil { log.Fatalf("获取主机名失败: %v", err) } // 获取配置的端口号 log.Printf("node [%s] ip=%s\n", hostName, ip4) port := configLoad.LoadConfig().GetUint16("node.port") if port == 0 { log.Fatalf("未配置有效的端口号") } // 构造 Node 的地址 nodeAddr := fmt.Sprintf("%s:%d", ip4, port) log.Printf("node 的地址为 %s", nodeAddr) n.Addr = nodeAddr // 初始化 Node 信息 return &et_rpc.NodeArgs{ ID: hostName + time.Now().Format("_20060102_150405"), Status: et_rpc.NodeState_Healthy, ErrCode: et_rpc.RPCReply_Success, ResourceJson: "", Addr: nodeAddr, } } func (n *ETNode) connectAndRegisterToMaster() { // 获取master配置 masterHost := configLoad.LoadConfig().GetString("node.remoteMasterHost") masterPort := configLoad.LoadConfig().GetUint16("master.port") if masterHost == "" { masterHost = "127.0.0.1" } if masterPort == 0 { masterPort = 50000 } masterAddr := fmt.Sprintf("%s:%d", masterHost, masterPort) // node 建立与 master 的连接 masterConn, err := NewMasterConnection(masterAddr) if err != nil { log.Printf("ERROR: 建立与 Master[%s] 的连接失败!!\n", masterAddr) } else { n.masterConn = masterConn log.Printf("建立与 Master[%s] 的连接成功!\n", masterAddr) } n.masterAddr = masterAddr n.masterConn = masterConn time.Sleep(500 * time.Millisecond) // node 向 master 发送注册请求(node 调用 master 的注册服务) err = n.register() //尝试3次 if err != nil { // 3次尝试失败后退出程序 log.Fatalf("node[%s]->master[%s] 注册失败,Error: %v", n.nodeInfo.Addr, masterAddr, err) //log.Println(err) } } func (n *ETNode) register() error { if n.masterConn == nil { return fmt.Errorf("n.masterConn is nil") } //n.nodeInfo.Load = len(n.chIotaData) const maxRetries = 3 retries := 0 for { err := n.masterConn.CallRegister(n.nodeInfo) if err == nil { log.Println("注册成功") return nil // 注册成功,返回 nil } else { log.Printf("注册失败: %v", err) retries++ } if retries >= maxRetries { log.Println("达到最大重试次数,停止注册尝试") return fmt.Errorf("注册失败,达到最大重试次数: %v", err) // 返回错误 } // 每5秒发送一次注册消息 time.Sleep(5 * time.Second) } } func (n *ETNode) unregister() { reply := new(et_rpc.NodeArgs) err := n.masterConn.CallUnregister() if err != nil { log.Printf("node[%s] 从master注销异常,err=%v", n.nodeInfo.Addr, err.Error()) } else { log.Printf("node[%s] 从master注销成功。reply=%+v", n.nodeInfo.Addr, reply) } } func (n *ETNode) heartbeat(interval time.Duration) { // 心跳3次失败后,发送注册消息 const maxRetries = 3 retries := 0 var err error for { if n.masterConn == nil { log.Println("ERROR: masterConn is nil") time.Sleep(1 * time.Second) continue } err = n.masterConn.CallHeartbeatNode(n.nodeInfo.Addr) if err == nil { log.Println("心跳成功!!") retries = 0 } else { log.Printf("心跳消息发送失败: %v", err) if strings.Contains(err.Error(), "未注册的节点") { retries = 3 } else { retries++ } } if retries >= maxRetries { // 发送注注册、重置重试计数 err = n.register() if err == nil { log.Println("重新注册成功", n.Addr) } else { log.Println("重新注册失败", n.Addr) } retries = 0 } // 每60秒发送一次心跳 time.Sleep(interval) } } func (n *ETNode) startMonitorExit() { c := make(chan os.Signal, 1) // 通过signal.Notify函数将信号通道c注册到系统相关的退出信号上 // 这里使用了两个退出信号:syscall.SIGINT(Ctrl+C)和syscall.SIGTERM(系统发送的退出信号) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGKILL) // 阻塞等待接收信号 s := <-c log.Printf("接收到退出信号:%v,进行清理工作", s) // 注销 n.unregister() // 等待所有数据处理完毕 n.waitForDataProcessing() log.Printf("退出前,通道中数据已经处理完毕!!") os.Exit(0) } func (n *ETNode) waitForDataProcessing() { const waitInterval = 10 * time.Second var wg sync.WaitGroup // 处理 IotaData 通道 wg.Add(1) go func() { defer wg.Done() for _, ch := range n.processChannels { for len(ch) > 0 { time.Sleep(waitInterval) } } }() // 处理 groupDataChan 通道 wg.Add(1) go func() { defer wg.Done() for len(n.groupDataChan) > 0 { time.Sleep(waitInterval) } }() // 等待所有 goroutine 完成 wg.Wait() } //func LogProcessDataTimeCost(nodeId, deviceId string, start time.Time) { // tc := time.Since(start) // log.Printf("******** [%s][%s]装载设备信息耗时: %v", nodeId, deviceId, tc) //} // 是沉降测试数据 //func isSettleData(data map[string]interface{}) bool { // // {"pressure":23.09,"temperature":24.93,"ssagee":16.44} // validKeys := map[string]bool{ // "pressure": true, // "temperature": true, // "ssagee": true, // } // // if len(data) != 3 { // return false // } // // for key := range data { // if !validKeys[key] { // return false // } // } // return true //}