package app import ( "context" "encoding/json" "et_analyze" "et_rpc/pb" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils/configLoad" "golang.org/x/time/rate" "log" "math" "node/et_worker/et_recv" "strings" "sync" "time" ) type UTCTime time.Time // UnmarshalJSON 自定义日期解析 func (ut *UTCTime) UnmarshalJSON(data []byte) error { // 去掉 JSON 字符串的引号 str := string(data) if len(str) < 2 || str[0] != '"' || str[len(str)-1] != '"' { return fmt.Errorf("invalid time format: %s", str) } str = str[1 : len(str)-1] // 解析自定义日期格式 t, err := time.Parse("2006-01-02T15:04:05.999-0700", str) if err != nil { return fmt.Errorf("failed to parse time: %v", err) } // 赋值 *ut = UTCTime(t) return nil } // aggDataMsg: {"date":"2024-09-19T09:39:59.999+0000","sensorId":106,"structId":1,"factorId":11,"aggTypeId":2006,"aggMethodId":3004,"agg":{"strain":-19.399999618530273},"changed":{"strain":-3}} type AggDataJson struct { Date UTCTime SensorId int StructId int FactorId int AggTypeId int // 聚集类型 : 10分钟/30分钟/3小时/6小时/12小时/时/日/周/月聚集 AggMethodId int // 聚集方法 : 平均值/最大值/最小值 Agg map[string]float64 // 聚集数据 Changed map[string]float64 // 变化量 } // NodeServiceServer 实现了 NodeServiceServer 接口 type NodeServiceServer struct { pb.UnimplementedNodeServiceServer // 嵌入未实现的接口,以便将来兼容性 Addr string Load int32 iotaDataHandler *et_recv.RecvDataHandler aggDataHandler *et_analyze.AggThresholdHandler iotaChannels []chan []common_models.IotaData processChannels []chan []*common_models.ProcessData outProcessChannel chan []*common_models.ProcessData nextProcessChannel int // 记录下一个要尝试的通道索引 nextChannel int // 记录下一个要尝试的通道索引 mu sync.Mutex // 保护 nextChannel 的并发访问 } func NewNodeServer(processChannels []chan []*common_models.ProcessData) *NodeServiceServer { stageResultBufSize := configLoad.LoadConfig().GetInt("performance.node.stageResultBufSize") iotaBufSize := configLoad.LoadConfig().GetInt("performance.node.iotaBufSize") s := &NodeServiceServer{ iotaDataHandler: et_recv.NewRecvDataHandler(), aggDataHandler: et_analyze.NewAggThresholdHandler(), processChannels: processChannels, outProcessChannel: make(chan []*common_models.ProcessData, stageResultBufSize), } s.iotaChannels = s.NewIotaChannels(len(processChannels), iotaBufSize) // 启动 DeviceInfo 缓存更新协程,设置为每 10 分钟更新一次 s.iotaDataHandler.RecvConfigHelper.StartUpdateDeviceInfo(10*time.Minute, 30) // 处理 process data s.RunStageManager() time.Sleep(500 * time.Millisecond) // 将 IotaData 转换为 DeviceData, 转换后数据发送到 s.processChannels go s.HandleIotaChannels() return s } func (s *NodeServiceServer) NewIotaChannels(count, bufferSize int) []chan []common_models.IotaData { channels := make([]chan []common_models.IotaData, count) for i := 0; i < count; i++ { channels[i] = make(chan []common_models.IotaData, bufferSize) } return channels } func (s *NodeServiceServer) HandleIotaChannels() { iotaWorkerCount := configLoad.LoadConfig().GetInt("performance.node.iotaWorkerCount") for index, ch := range s.iotaChannels { for w := 0; w < iotaWorkerCount; w++ { go func(c chan []common_models.IotaData) { s.HandleIotaChan(c, index) }(ch) } } } // func (s *NodeServiceServer) HandleProcessChannels() { func (s *NodeServiceServer) RunStageManager() { for _, ch := range s.processChannels { go func(c chan []*common_models.ProcessData) { stageMgr := CreateStages(c, s.outProcessChannel) stageMgr.RunStages() }(ch) } } func (s *NodeServiceServer) sendToIotaChannels(data []common_models.IotaData) (chan []common_models.IotaData, bool) { startTime := time.Now() defer func() { elapsedTime := time.Since(startTime) log.Printf("sendToIotaChannels elapsedTime= %s\n", elapsedTime) //log.Printf("Final iotaData channel states: ") //for _, ch := range s.iotaChannels { // log.Printf("iotaChan[%p]: %d/%d\n", ch, len(ch), cap(ch)) //} }() var selectedChannel chan []common_models.IotaData minLength := math.MaxInt32 // 选择最空闲的通道 for _, ch := range s.iotaChannels { if len(ch) < minLength { minLength = len(ch) selectedChannel = ch } } // 尝试发送数据 select { case selectedChannel <- data: return selectedChannel, true case <-time.After(100 * time.Millisecond): // 设置超时时间 log.Println("Timeout while trying to send iotaData.") return nil, false } } // 将 IotaData 转换为 DeviceData, 转换后数据发送到 s.processChannels func (s *NodeServiceServer) HandleIotaChan(ch chan []common_models.IotaData, index int) { // 创建一个速率限制器,每秒允许处理 100 条数据 limiter := rate.NewLimiter(10, 1) // 100 次/秒,突发容量为 1 for data := range ch { // 等待直到可以处理下一条数据 if err := limiter.Wait(context.Background()); err != nil { log.Printf("处理速率限制错误: %v", err) continue } go func(iotaDataArray []common_models.IotaData) { dataHandleTime := time.Now() // 记录 data 的处理开始时间 formattedTime := dataHandleTime.Format("2006-01-02 15:04:05.999999999") defer func() { if r := recover(); r != nil { log.Printf("Recovered from panic: %v", r) } log.Printf("4.iotaDataArray[%v] 处理耗时:%v", formattedTime, time.Since(dataHandleTime)) }() log.Printf("1.iotaDataArray[%v] 准备处理。processChannel[%p]数据量:%d/%d", formattedTime, ch, len(ch), cap(ch)) processDataArray := make([]*common_models.ProcessData, 0, len(iotaDataArray)) for _, r := range iotaDataArray { ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() iotaHandleTime := time.Now() // 记录 iota 数据处理开始时间 deviceData, err := s.iotaDataHandler.OnDataHandler(ctx, r) iotaHandleElapse := time.Since(iotaHandleTime) // 计算数据处理耗时 if err != nil { log.Printf("IotaData->DeviceData[%s] 转换错误:%s. 耗时:%v。", r.DeviceId, err.Error(), iotaHandleElapse) return } if deviceData == nil { log.Printf("IotaData->DeviceData[%s] 转换错误:deviceData is nil. 耗时:%v。", r.DeviceId, iotaHandleElapse) return } // 将数据写入 processDataChannel pd := &common_models.ProcessData{ DeviceData: *deviceData, Stations: []common_models.Station{}, } processDataArray = append(processDataArray, pd) } log.Printf("2.iotaDataArray[%v] 已处理完 IotaData->DeviceData ", formattedTime) sendTime := time.Now() processChannel, ok := s.sendToProcessChannels(processDataArray, index) // 这里会一直等到有资源 if !ok { log.Printf("3.iotaDataArray[%v] s.processChannels %d个通道都已满,被阻塞。", formattedTime, len(s.processChannels)) } else { log.Printf("3.iotaDataArray[%v] 已发送至s.processChannels。processChannel[%p]数据量:%d/%d, \n发送耗时:%v ,iotaDataArray处理耗时:%v", formattedTime, processChannel, len(processChannel), cap(processChannel), time.Since(sendTime), time.Since(dataHandleTime)) } }(data) } } func (s *NodeServiceServer) sendToProcessChannels(data []*common_models.ProcessData, index int) (chan []*common_models.ProcessData, bool) { startTime := time.Now() //timeoutDuration := configLoad.LoadConfig().GetUint16("performance.node.processTimeout") // 设置超时时间为 60 秒 defer func() { elapsedTime := time.Since(startTime) log.Printf("[sendToProcessChannels] elapsedTime= %s\n", elapsedTime) //log.Printf("[sendToProcessChannels] Final processData channel states:") //for _, ch := range s.processChannels { // log.Printf("[sendToProcessChannels] processChan[%p]: %d/%d\n", ch, len(ch), cap(ch)) //} }() selectedChannel := s.processChannels[index] log.Printf("[sendToProcessChannels] 尝试发送。 channel[%p]: %d/%d\n", selectedChannel, len(selectedChannel), cap(selectedChannel)) // 超时限制 //timeout := time.After(time.Duration(timeoutDuration)) for { select { case selectedChannel <- data: // 发送成功 log.Printf("[sendToProcessChannels] 发送成功。channel[%p]: %d/%d\n", selectedChannel, len(selectedChannel), cap(selectedChannel)) time.Sleep(50 * time.Millisecond) return selectedChannel, true default: //log.Printf("[sendToProcessChannels] channel[%p] 已满 cap=%d,继续尝试发送。\n", selectedChannel, cap(selectedChannel)) time.Sleep(200 * time.Millisecond) // 等待一段时间后再尝试 //case <-timeout: // log.Printf("[sendToProcessChannels] 发送超时超过1分钟,将停止尝试。\n") // return nil, false // 超时返回 nil 和 false //case <-time.After(500 * time.Millisecond): // log.Printf("[sendToProcessChannels] 发送超时500ms,将继续尝试channel[%p]。\n", selectedChannel) // continue } } } var limiter = rate.NewLimiter(rate.Limit(500), 1) // 每秒最多处理 1000 条数据 func (s *NodeServiceServer) HandleIotaData(ctx context.Context, req *pb.HandleDataRequest) (*pb.HandleDataResponse, error) { if err := limiter.Wait(ctx); err != nil { return nil, fmt.Errorf("请求速率过高,请稍后重试") } startTime := time.Now() // 1. 数据转换 conversionStart := time.Now() iotaDataList := s.convertMessages2IotaData(req.Messages) conversionDuration := time.Since(conversionStart) log.Printf("[INFO][HandleIotaData] 1.数据转换耗时: %v, 共 %d 条数据。", conversionDuration, len(req.Messages)) // 2. 发送到 Iota 通道 sendStart := time.Now() _, ok := s.sendToIotaChannels(iotaDataList) sendDuration := time.Since(sendStart) log.Printf("[INFO] [HandleIotaData] 2.sendToIotaChannels耗时: %v。", sendDuration) //log.Printf("[INFO] [HandleIotaData] 2.sendToIotaChannels耗时: %v。通道状态: %v, 通道指针: %p, 当前长度: %d/%d", // sendDuration, ok, ch, len(ch), cap(ch)) if !ok { log.Printf("[WARN] [HandleIotaData] 2.所有 Iota 通道已满,无法发送数据,通道数量: %d", len(s.iotaChannels)) return &pb.HandleDataResponse{ Addr: s.Addr, Load: s.Load, Status: pb.HandleDataResponse_SUCCESS, ErrorMessage: "s.iotaChannels 通道已满", }, nil } // 3. 计算总处理时长 totalDuration := time.Since(startTime) log.Printf("[INFO] [HandleIotaData] 3.总处理时长: %v", totalDuration) return &pb.HandleDataResponse{ Addr: s.Addr, Load: s.Load, Status: pb.HandleDataResponse_SUCCESS, ErrorMessage: "", }, nil } // HandleAggData 处理聚集数据并返回节点响应 func (s *NodeServiceServer) HandleAggData(ctx context.Context, req *pb.HandleDataRequest) (*pb.HandleDataResponse, error) { if err := limiter.Wait(ctx); err != nil { return nil, fmt.Errorf("请求速率过高,请稍后重试") } startTime := time.Now() // 1. 数据转换 conversionStart := time.Now() aggDataList := s.convertMessages2AggData(req.Messages) conversionDuration := time.Since(conversionStart) log.Printf("[INFO][HandleAggData] 1.数据转换耗时: %v, 共 %d 条数据。", conversionDuration, len(req.Messages)) // 2. 发送到 Iota 通道 analyzeStart := time.Now() for _, aggData := range aggDataList { err := s.aggDataHandler.ProcessData(&aggData) if err != nil { errmsg := fmt.Sprintf("[etNode.AggDataHandler] 2.变化速率阈值分析%s[aggTypeId:%d]ERROR: %v", aggData.R(), aggData.AggTypeId, err) log.Println(errmsg) } //} else { // log.Printf("[etNode.AggDataHandler]变化速率阈值分析SUCCESS。%s[aggTypeId:%d]changed[%v]", aggData.R(), aggData.AggTypeId, aggData.Changed) //} } analyzeDuration := time.Since(analyzeStart) log.Printf("[INFO][HandleAggData] 2. 变化速率阈值分析耗时: %v。", analyzeDuration) // 3. 计算总处理时长 totalDuration := time.Since(startTime) log.Printf("[INFO][HandleAggData] 3.总处理时长: %v", totalDuration) // 返回响应 return &pb.HandleDataResponse{ Addr: s.Addr, Load: s.Load, Status: pb.HandleDataResponse_SUCCESS, ErrorMessage: "", }, nil } // mustEmbedUnimplementedNodeServiceServer 确保实现了接口 func (s *NodeServiceServer) mustEmbedUnimplementedNodeServiceServer() {} // createErrorResponse 用于创建错误响应 func (s *NodeServiceServer) createErrorResponse(status pb.HandleDataResponse_Status, message string) (*pb.HandleDataResponse, error) { response := &pb.HandleDataResponse{ Addr: s.Addr, Load: s.Load, Status: status, ErrorMessage: message, } log.Printf(message) // 记录错误信息 return response, fmt.Errorf(message) } func (s *NodeServiceServer) convertMessages2IotaData(messages []string) []common_models.IotaData { st := time.Now() dataArray := make([]common_models.IotaData, 0, len(messages)) // 尝试批量解析 jsonArray := fmt.Sprintf("[%s]", strings.Join(messages, ",")) if err := json.Unmarshal([]byte(jsonArray), &dataArray); err != nil { // 批量解析失败,逐个解析 for _, msg := range messages { var data common_models.IotaData if err := json.Unmarshal([]byte(msg), &data); err != nil { log.Printf("逐个 JSON 反序列化失败:%v", err) continue } dataArray = append(dataArray, data) } } log.Printf("[convertMessages2IotaData] 序列化耗时:%v ,共解析出 %d 个 IotaData。", time.Since(st), len(dataArray)) return dataArray } func (s *NodeServiceServer) convertMessages2AggData(messages []string) []common_models.AggData { //log.Printf("[convertMessages2AggData] len(messages)=%d ,start ...", len(messages)) st := time.Now() // 预分配 aggDatas 的容量 aggDatas := make([]common_models.AggData, 0, len(messages)) // 尝试批量解析 JSON 数组 jsonArray := fmt.Sprintf("[%s]", strings.Join(messages, ",")) var tmpDatas []AggDataJson err := json.Unmarshal([]byte(jsonArray), &tmpDatas) if err != nil { log.Printf("JSON 数组反序列化失败,尝试逐个解析:%v", err) // 如果批量解析失败,逐个解析 JSON 字符串 var wg sync.WaitGroup var mu sync.Mutex for _, val := range messages { wg.Add(1) go func(msg string) { defer wg.Done() var data AggDataJson if err := json.Unmarshal([]byte(msg), &data); err != nil { log.Printf("逐个 JSON 反序列化失败:%v", err) return } // 加锁保护 aggDatas mu.Lock() aggDatas = append(aggDatas, common_models.AggData{ Date: time.Time(data.Date), SensorId: data.SensorId, StructId: data.StructId, FactorId: data.FactorId, AggTypeId: data.AggTypeId, AggMethodId: data.AggMethodId, Agg: data.Agg, Changed: data.Changed, ThingId: "", }) mu.Unlock() }(val) } // 等待所有 Goroutine 完成 wg.Wait() } else { // 批量解析成功,直接转换 aggDatas = make([]common_models.AggData, len(tmpDatas)) for i, data := range tmpDatas { aggDatas[i] = common_models.AggData{ Date: time.Time(data.Date), SensorId: data.SensorId, StructId: data.StructId, FactorId: data.FactorId, AggTypeId: data.AggTypeId, AggMethodId: data.AggMethodId, Agg: data.Agg, Changed: data.Changed, ThingId: "", } } } log.Printf("[convertMessages2AggData] 序列化耗时:%v ,共解析出 %d 个 AggData。", time.Since(st), len(aggDatas)) return aggDatas }