From d00f169f8fa475c6b548f4ee8f2b9a86f5c101f5 Mon Sep 17 00:00:00 2001 From: yfh Date: Thu, 3 Oct 2024 20:36:31 +0800 Subject: [PATCH] =?UTF-8?q?calcTasks=E6=94=B9=E4=B8=BAsync.Map?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- et_calc/group/groupCalc.go | 123 +++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 52 deletions(-) diff --git a/et_calc/group/groupCalc.go b/et_calc/group/groupCalc.go index acca284..420d8c9 100644 --- a/et_calc/group/groupCalc.go +++ b/et_calc/group/groupCalc.go @@ -16,7 +16,7 @@ import ( var ( configHelperInstance *common_utils.ConfigHelper once sync.Once - mu sync.Mutex + calcTasks sync.Map ) func GetConfigHelper() *common_utils.ConfigHelper { @@ -57,7 +57,6 @@ type GroupCalc struct { stage *stages.Stage configHelper *common_utils.ConfigHelper signCalc chan bool - calcTasks map[GroupCalcTaskKey]CalcTask esHandler *et_sink.SinkHandler } @@ -66,11 +65,10 @@ func NewGroupCalc() *GroupCalc { stage: stages.NewStage("测点分组计算"), configHelper: GetConfigHelper(), signCalc: make(chan bool), - calcTasks: map[GroupCalcTaskKey]CalcTask{}, esHandler: et_sink.NewSinkGroupHandler(), } - // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据) + // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES) calcTaskManager.stage.AddProcess(calcTaskManager.processData) return calcTaskManager } @@ -81,47 +79,77 @@ func (gc *GroupCalc) GetStage() stages.Stage { // processData 的 stations 被改变了 func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { + log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations)) resultStations := make([]common_models.Station, 0) - // 分组超时任务 - for key, task := range gc.calcTasks { - if task.IsTimeout() { - calcedStations := task.Calc() - resultStations = append(resultStations, calcedStations...) - // ES存储分组主题数据 - //dumpStr, groupTheme := task.ToDump(calcedStations) - //gc.esHandler.SinkGroupDataToES(groupTheme) - //log.Printf("[groupCalc.processData]group timeout calc。 %s \n", dumpStr) - gc.dumpGroupTheme(&task, calcedStations) - delete(gc.calcTasks, key) - } - } + resultStations = append(resultStations, gc.processTimeoutTasks()...) + // 常规分组计算 for _, station := range inData.Stations { if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement { //log.Printf("非【液压传感器测量沉降】分组。") resultStations = append(resultStations, station) } else { - calcedStations, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) - if err != nil { - resultStations = append(resultStations, station) - } else { - resultStations = append(resultStations, calcedStations...) + calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) + if err == nil { + log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet)) + resultStations = append(resultStations, calcRet...) } + log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations)) } } - // 返回处理后的数据,计算后的resultStations可能为空 - inData.Stations = resultStations + // 计算失败的测点不返回 + result := make([]common_models.Station, 0) + for _, station := range resultStations { + if len(station.Data.ThemeData) > 0 { + ////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。 + //filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14} + //for _, number := range filterSensorIds { + // if number == station.Info.Id { + // log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) + // break + // } + //} + //// #TEST END + // 需要返回的测点 + result = append(result, station) + + } else { + log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) + } + } + + // 计算后的 result 可能为空 + log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result)) + inData.Stations = result return inData } +func (gc *GroupCalc) processTimeoutTasks() []common_models.Station { + resultStations := make([]common_models.Station, 0) + calcTasks.Range(func(key, value interface{}) bool { + //log.Println("Key:", key, "Value:", value) + task := value.(CalcTask) + if task.IsTimeout() { + calcRet := task.Calc() + resultStations = append(resultStations, calcRet...) + // ES存储分组主题数据 + gc.dumpGroupTheme(&task, calcRet) + calcTasks.Delete(key) + } + return true // 返回 true 继续遍历,返回 false 停止遍历 + }) + + return resultStations +} + // ES存储分组主题 func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) { - dumpStr, groupTheme := task.ToDump(calcedStations) + dumpStr, esDoc := task.ToDump(calcedStations) log.Println(dumpStr) - if len(groupTheme.Data) > 0 { - gc.esHandler.SinkGroupDataToES(groupTheme) + if len(esDoc.Data) > 0 { + gc.esHandler.SinkGroupDataToES(esDoc) } } @@ -140,48 +168,39 @@ func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId st } key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId} - if calcTask, ok := gc.calcTasks[key]; ok { + if value, ok := calcTasks.Load(key); ok { + calcTask := value.(CalcTask) calcTask.AddStationData(*station) - log.Println(calcTask.R()) // 分组计算 if calcTask.CheckIntegrity() { secs := calcTask.ElapsedSecs() if secs > 10 { - log.Printf("[groupCalc.processData] group calc wait %f秒, %s\n", secs, key.R()) + log.Printf("[groupCalc.processData] wait %f秒, %s\n", secs, key.R()) } - - calcedStations := calcTask.Calc() - // 缓存已计算数据 - if calcedStations != nil && len(calcedStations) > 0 { - resultStations = append(resultStations, calcedStations...) + calcRet := calcTask.Calc() + if calcRet != nil && len(calcRet) > 0 { + resultStations = append(resultStations, calcRet...) } - // ES存储分组主题数据 - //dumpStr, groupTheme := calcTask.ToDump(calcedStations) - //gc.esHandler.SinkGroupDataToES(groupTheme) - //log.Println(dumpStr) - gc.dumpGroupTheme(&calcTask, calcedStations) - delete(gc.calcTasks, key) + // ES存储分组主题数据 + gc.dumpGroupTheme(&calcTask, calcRet) + calcTasks.Delete(key) } } else { - // 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长 + // 新计算任务:取到第一个的采集策略(维度),以便后面获得过期时长 task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) task.AddStationData(*station) - log.Println(task.R()) - if task.CheckIntegrity() { - calcedStations := task.Calc() - // 缓存已计算数据 - if calcedStations != nil && len(calcedStations) > 0 { - resultStations = append(resultStations, calcedStations...) + calcRet := task.Calc() + if calcRet != nil && len(calcRet) > 0 { + resultStations = append(resultStations, calcRet...) } - // ES存储分组主题数据 - gc.dumpGroupTheme(task, calcedStations) - + gc.dumpGroupTheme(task, calcRet) } else { task.SetTimeout() - gc.calcTasks[key] = *task + log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime) + calcTasks.Store(key, *task) } } }