package group import ( "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "log" "node/stages" "sync" "time" ) var ( configHelperInstance *common_utils.ConfigHelper once sync.Once mu sync.Mutex ) func GetConfigHelper() *common_utils.ConfigHelper { once.Do(func() { configYaml := configLoad.LoadConfig() redisAdd := configYaml.GetString("redis.address") configHelperInstance = common_utils.NewConfigHelper(redisAdd) }) return configHelperInstance } type GroupCalc struct { stage *stages.Stage configHelper *common_utils.ConfigHelper signCalc chan bool calcTasks map[GroupCalcTaskKey]CalcTask } func NewGroupCalc() *GroupCalc { calcTaskManager := &GroupCalc{ stage: stages.NewStage("测点分组计算"), configHelper: GetConfigHelper(), signCalc: make(chan bool), calcTasks: map[GroupCalcTaskKey]CalcTask{}, } // 处理超期任务 //go calcTaskManager.onClearDueTask() // 添加到 etNode 处理环境,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据) calcTaskManager.stage.AddProcess(calcTaskManager.processData) return calcTaskManager } func (gc *GroupCalc) GetStage() stages.Stage { return *gc.stage } // processData 的 stations 被改变了 func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { var resultStations []common_models.Station for _, station := range inData.Stations { calcedStations := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) log.Printf("ProcessData中的测点个数:%d 计算返回的测点个数: %d", len(inData.Stations), len(calcedStations)) resultStations = append(resultStations, calcedStations...) } // 返回处理后的数据 inData.Stations = resultStations return inData } // cacheAndCalc 缓存和计算 // station 测点 // dimensionId 采集策略 // taskId 一次周期采集任务 // acqTime 采集时间 func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId string, taskId string, acqTime time.Time) []common_models.Station { sGroup := station.Info.Group corrGroups := station.Info.CorrGroups if sGroup.Id == 0 || corrGroups == nil || len(corrGroups) == 0 { // 非分组测点 return []common_models.Station{*station} } var resultStations []common_models.Station key := GroupCalcTaskKey{ GroupId: sGroup.Id, TaskId: taskId, } if calcTask, ok := gc.calcTasks[key]; ok { // 添加元素 calcTask.AddStationData(*station) // 分组计算 if calcTask.CheckIntegrity() { secs := calcTask.ElapsedSecs() if secs > 10 { log.Printf("[dataHandler] group calc wait %f秒, %s\n", secs, key.R()) } calcedStations := calcTask.Calc() if calcedStations != nil { resultStations = append(resultStations, calcedStations...) } delete(gc.calcTasks, key) } } else { // 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长 task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) task.AddStationData(*station) if task.CheckIntegrity() { calcedStations := task.Calc() if calcedStations != nil { resultStations = append(resultStations, calcedStations...) } } else { task.SetTimeout() gc.calcTasks[key] = *task } } return resultStations } //func (gc *GroupCalc) clearDueTask() { // for key, task := range gc.calcTasks { // if task.IsTimeout() { // result := task.Calc() // if result != nil { // // TODO 处理完的数据要传递出去 // station.Data.GroupData = result // } // // 不管计算是否成功,到期的任务都要清除 // delete(gc.calcTasks, key) // log.Printf("[dataHandler] group timeout calcTask:%s\n", key.R()) // } // } //} //func (gc *GroupCalc) injectGroupData(calcTaskKey string, result map[string]any) { // // //} //// onClearDueTask 处理超期任务 //func (gc *GroupCalc) onClearDueTask() { // for { // select { // case <-gc.signCalc: // case <-time.After(time.Second): // } // // // 过期任务处理 // gc.clearDueTask() // } //}