package group import ( "errors" "et_sink" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models/constant/groupType" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "log" "node/stages" "sync" "time" ) var ( configHelperInstance *common_utils.ConfigHelper once sync.Once calcTasks sync.Map ) func GetConfigHelper() *common_utils.ConfigHelper { once.Do(func() { configYaml := configLoad.LoadConfig() redisAdd := configYaml.GetString("redis.address") configHelperInstance = common_utils.NewConfigHelper(redisAdd) }) return configHelperInstance } // 分组测点 type GroupStation struct { Group common_models.StationGroup Station common_models.Station } // 获取测点分组信息 func GetGroupInfo(station common_models.Station) []GroupStation { var gps []common_models.StationGroup gps = append(gps, station.Info.Group) gps = append(gps, station.Info.CorrGroups...) result := make([]GroupStation, 0) if gps != nil { for i := 0; i < len(gps); i++ { result = append(result, GroupStation{ Group: gps[i], Station: station, }) } } return result } type GroupCalc struct { stage *stages.Stage configHelper *common_utils.ConfigHelper signCalc chan bool esHandler *et_sink.SinkHandler } func NewGroupCalc() *GroupCalc { calcTaskManager := &GroupCalc{ stage: stages.NewStage("测点分组计算"), configHelper: GetConfigHelper(), signCalc: make(chan bool), esHandler: et_sink.NewSinkGroupHandler(), } // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES) calcTaskManager.stage.AddProcess(calcTaskManager.processData) return calcTaskManager } func (gc *GroupCalc) GetStage() stages.Stage { return *gc.stage } // processData 的 stations 被改变了 func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations)) resultStations := make([]common_models.Station, 0) // 分组超时任务 resultStations = append(resultStations, gc.processTimeoutTasks()...) // 常规分组计算 for _, station := range inData.Stations { if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement { //log.Printf("非【液压传感器测量沉降】分组。") resultStations = append(resultStations, station) } else { calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) if err == nil { log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet)) resultStations = append(resultStations, calcRet...) } log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations)) } } // 计算失败的测点不返回 result := make([]common_models.Station, 0) for _, station := range resultStations { if len(station.Data.ThemeData) > 0 { ////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。 //filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14} //for _, number := range filterSensorIds { // if number == station.Info.Id { // log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) // break // } //} //// #TEST END // 需要返回的测点 result = append(result, station) } else { log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) } } // 计算后的 result 可能为空 log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result)) inData.Stations = result return inData } func (gc *GroupCalc) processTimeoutTasks() []common_models.Station { resultStations := make([]common_models.Station, 0) calcTasks.Range(func(key, value interface{}) bool { //log.Println("Key:", key, "Value:", value) task := value.(CalcTask) if task.IsTimeout() { calcRet := task.Calc() resultStations = append(resultStations, calcRet...) // ES存储分组主题数据 gc.dumpGroupTheme(&task, calcRet) calcTasks.Delete(key) } return true // 返回 true 继续遍历,返回 false 停止遍历 }) return resultStations } // ES存储分组主题 func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) { dumpStr, esDoc := task.ToDump(calcedStations) log.Println(dumpStr) if len(esDoc.Data) > 0 { gc.esHandler.SinkGroupDataToES(esDoc) } } // cacheAndCalc 缓存和计算 // station 测点 // dimensionId 采集策略 // taskId 一次周期采集任务 // acqTime 采集时间 func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId string, taskId string, acqTime time.Time) ([]common_models.Station, error) { scg := GetGroupInfo(*station) var resultStations []common_models.Station for _, groupStation := range scg { sGroup := groupStation.Group if sGroup.Id == 0 || sGroup.GroupType != groupType.Settlement { return nil, errors.New("非【液压传感器测量沉降】分组") } key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId} if value, ok := calcTasks.Load(key); ok { calcTask := value.(CalcTask) calcTask.AddStationData(*station) // 分组计算 if calcTask.CheckIntegrity() { secs := calcTask.ElapsedSecs() if secs > 10 { log.Printf("[groupCalc.processData] wait %f秒, %s\n", secs, key.R()) } calcRet := calcTask.Calc() if calcRet != nil && len(calcRet) > 0 { resultStations = append(resultStations, calcRet...) } // ES存储分组主题数据 gc.dumpGroupTheme(&calcTask, calcRet) calcTasks.Delete(key) } } else { // 新计算任务:取到第一个的采集策略(维度),以便后面获得过期时长 task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) task.AddStationData(*station) if task.CheckIntegrity() { calcRet := task.Calc() if calcRet != nil && len(calcRet) > 0 { resultStations = append(resultStations, calcRet...) } // ES存储分组主题数据 gc.dumpGroupTheme(task, calcRet) } else { task.SetTimeout() log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime) calcTasks.Store(key, *task) } } } return resultStations, nil }