diff --git a/et_calc/group/calcTask.go b/et_calc/group/calcTask.go index 513be54..4d6c2f1 100644 --- a/et_calc/group/calcTask.go +++ b/et_calc/group/calcTask.go @@ -4,7 +4,9 @@ import ( "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_models" + "gitea.anxinyun.cn/container/common_models/constant/groupType" "gitea.anxinyun.cn/container/common_models/constant/settlementParam" + "gitea.anxinyun.cn/container/common_utils" "log" "time" ) @@ -22,6 +24,7 @@ type CalcTask struct { taskId string acqTime time.Time stationMap map[int]common_models.Station + configHelper *common_utils.ConfigHelper } func NewGroupCalcTask(group *common_models.StationGroup, dimensionId string, taskId string, acqTime time.Time) *CalcTask { @@ -32,6 +35,7 @@ func NewGroupCalcTask(group *common_models.StationGroup, dimensionId string, tas taskId: taskId, acqTime: acqTime, stationMap: make(map[int]common_models.Station), + configHelper: GetConfigHelper(), } } @@ -39,17 +43,9 @@ func (t *CalcTask) AddStationData(data common_models.Station) { if data.Data.ThemeData == nil || len(data.Data.ThemeData) == 0 { return } - t.stationMap[data.Info.Id] = data } -func (t *CalcTask) GetStationData(stationId int) *common_models.Station { - if station, ok := t.stationMap[stationId]; ok { - return &station - } - return nil -} - // CheckIntegrity 检查计算项是否完整 func (t *CalcTask) CheckIntegrity() bool { for _, item := range t.stationGroup.AllCorrItems() { @@ -78,39 +74,136 @@ func (t *CalcTask) SetTimeout() int { return expireSeconds } -// ToDump 输出分组计算信息 -func (t *CalcTask) ToDump() string { - corrItemsBytes, err := json.Marshal(t.stationGroup.AllCorrItems()) +func (t *CalcTask) R() string { + cachedCount := len(t.stationMap) + corrItemCount := len(t.stationGroup.AllCorrItems()) + groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime) + msg := fmt.Sprintf("[%s]缓存[%d/%d]个元素", groupInfo, cachedCount, corrItemCount) + return msg +} + +// 输出分组计算信息 +func (t *CalcTask) ToDump(calcedStations []common_models.Station) (string, common_models.EsGroupTheme) { + // 分组基本信息 + var structId = 0 + for _, station := range t.stationMap { + structId = station.Info.StructureId + break + } + groupInfo := map[string]any{ + "structId": structId, + "groupId": t.stationGroup.Id, + "groupName": t.stationGroup.Name, + "groupType": t.stationGroup.GroupType, + "taskId": t.taskId, + "acqTime": t.acqTime, + } + groupInfoBytes, err := json.Marshal(groupInfo) + if err != nil { + log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] groupInfo marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err) + } + + // groupCorrItems 分组项信息,对应 Redis 中的 sg:stationId + groupCorrItems := make([]common_models.StationGroupInfo, 0) + for _, item := range t.stationGroup.AllCorrItems() { + sg, err := t.configHelper.GetStationGroupInfo(item.StationId) + if err == nil { + groupCorrItems = append(groupCorrItems, sg) + } + } + itemsBytes, err := json.Marshal(groupCorrItems) + if err != nil { + log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] groupCorrItems marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err) + } + + // 分组计算结果 + data := make([]common_models.CorrItemData, 0) + for _, s := range calcedStations { + var isBasePtr *bool + for _, item := range groupCorrItems { + if item.StationId == s.Info.Id { + if baseVal, ok := item.Params[settlementParam.Base]; ok { + isBase := baseVal.(bool) + isBasePtr = &isBase + } + break + } + } + + sd := common_models.CorrItemData{ + StationId: s.Info.Id, + StationName: s.Info.Name, + IsBase: isBasePtr, + PhyData: s.Data.PhyData, + ThemeData: s.Data.ThemeData, + } + data = append(data, sd) + } + dataBytes, err := json.Marshal(data) if err != nil { - fmt.Println("[et_calc.group.CalcTask.ToDump()] Error marshalling JSON:", err) + log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] calcedResult marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err) + } + + groupTheme := common_models.EsGroupTheme{ + StructId: structId, + GroupId: t.stationGroup.Id, + GroupName: t.stationGroup.Name, + GroupType: t.stationGroup.GroupType, + TaskId: t.taskId, + CorrItems: groupCorrItems, + Data: data, + CollectTime: t.acqTime.Truncate(time.Millisecond), + CreateTime: time.Now().Truncate(time.Millisecond), } - return fmt.Sprintf("【groupId:%d taskId:%s acqTime:%s】 %d/%v", t.stationGroup.Id, t.taskId, t.acqTime, len(t.stationMap), string(corrItemsBytes)) + + logStr := fmt.Sprintf("[%s]分组计算完毕。\n groupInfo: %s,\n corrItems(分组项+级联项): %s,\n calcedResult(计算后分组项): %s", t.stationGroup.Name, string(groupInfoBytes), string(itemsBytes), string(dataBytes)) + return logStr, groupTheme } -// Calc 沉降分组计算 TODO 计算完成后返回分组的数据(要重新定义结构体) func (t *CalcTask) Calc() []common_models.Station { - // 基点信息异常时数据处理:将 themeData 转储到 phyData, 再将 themeData 设置为 map[string]any{}; - funcSetErrorData := func() []common_models.Station { - var result []common_models.Station - for _, objStation := range t.stationMap { - objStation.Data.PyhData = objStation.Data.ThemeData + switch t.stationGroup.GroupType { + case groupType.Settlement: + return t.calcSettlement() + case groupType.DeepDisplace: + return t.calcDeepDisplace() + case groupType.AngleToSettlement: + return t.calcAngleToSettlement() + default: + log.Printf(fmt.Sprintf("[%s]错误的分组类型。", t.stationGroup.GroupType)) + return make([]common_models.Station, 0) + } +} + +// SettlementCalc 沉降分组计算 +func (t *CalcTask) calcSettlement() []common_models.Station { + // 基点信息异常时数据处理:将 themeData 设置为 map[string]any{}; + funcResetThemeData := func() []common_models.Station { + result := make([]common_models.Station, 0) + for _, groupItem := range t.stationGroup.Items { + objStation, ok := t.stationMap[groupItem.StationId] + if !ok { + continue + } + objStation.Data.ThemeData = map[string]any{} result = append(result, objStation) } + return result } - var resultStations []common_models.Station + resultStations := make([]common_models.Station, 0) baseItem := t.stationGroup.GetSettlementBaseItem() if baseItem == nil || t.stationMap[baseItem.StationId].Info.Id == 0 { - // 无基点的分组数据处理 - return funcSetErrorData() - } else { // 有基点,进行减基点计算 + // 未设置基点,重置主题数据为 map[string]any{} + return funcResetThemeData() + } else { + // 基点主题数据无效 baseStation := t.stationMap[baseItem.StationId] _, ok := baseStation.Data.GetValidThemeData() if !ok { log.Printf("基点数据计算失败,无效的主题数据。%s\n", baseStation.LogMsg()) - return funcSetErrorData() + return funcResetThemeData() } // baseValidFields 有效的监测项,主题数据的 field 必须在监测原型中存在 @@ -119,57 +212,60 @@ func (t *CalcTask) Calc() []common_models.Station { baseValidFields, ok := t.filterFields(base_themeData_fields, base_proto_fields) if !ok { log.Printf("基点数据计算失败,无效的监测项(数据 field 必须在监测原型中存在)。%s\n", baseStation.LogMsg()) - return funcSetErrorData() + return funcResetThemeData() } - // 基点数据(包含虚拟基点的计算) + // 包含虚拟基点的计算 virtualBase := t.calcVirtualSettlementBase(*baseItem, baseValidFields) if virtualBase == nil || len(virtualBase) == 0 { log.Printf("虚拟基点数据计算失败。%s\n", baseStation.LogMsg()) - return funcSetErrorData() + return funcResetThemeData() } // 减基点计算 for _, groupItem := range t.stationGroup.Items { - objStation, ok := t.stationMap[groupItem.StationId] + station, ok := t.stationMap[groupItem.StationId] if !ok { continue } - if groupItem.ParamsValue[settlementParam.Base] == true { // 基点主题数据处理 - for key, _ := range virtualBase { - objStation.Data.PyhData[key] = objStation.Data.ThemeData[key] - objStation.Data.ThemeData[key] = 0.0 + if groupItem.ParamsValue[settlementParam.Base] == true { + // 基点主题数据处理: 基点主题值 = 虚拟基点值 + station.Data.ThemeData = map[string]any{} + for key, val := range virtualBase { + station.Data.ThemeData[key] = val } - } else { // 测点数据处理 - themeData, ok := objStation.Data.GetValidThemeData() + resultStations = append(resultStations, station) + } else { + // 测点数据处理 + themeData, ok := station.Data.GetValidThemeData() if !ok { - log.Printf("减基点计算失败,无有效的主题数据。%s\n", objStation.LogMsg()) - objStation.Data.PyhData = objStation.Data.ThemeData - objStation.Data.ThemeData = map[string]any{} + log.Printf("减基点计算失败,无有效的主题数据。%s\n", station.LogMsg()) + station.Data.ThemeData = map[string]any{} + resultStations = append(resultStations, station) continue } // validFields 有效的监测项,主题数据的 field 必须在 baseValidFields 中存在 - station_themeData_fields := objStation.Data.GetThemeFields() - validFields, ok := t.filterFields(baseValidFields, station_themeData_fields) + station_themeData_fields := station.Data.GetThemeFields() + validFields, ok := t.filterFields(station_themeData_fields, baseValidFields) if !ok { log.Printf("基点数据计算失败,主题数据的 field 必须在 baseValidFields 中存在;"+ "baseValidFields:%v, station_themeData_fields:%v 。 %s\n", baseValidFields, station_themeData_fields, baseStation.LogMsg()) - objStation.Data.PyhData = objStation.Data.ThemeData - objStation.Data.ThemeData = map[string]any{} + station.Data.ThemeData = map[string]any{} + resultStations = append(resultStations, station) continue } // 减基点 - var resultData map[string]any + resultData := make(map[string]interface{}) for _, field := range validFields { resultData[field] = virtualBase[field] - themeData[field] } - objStation.Data.PyhData = objStation.Data.ThemeData - objStation.Data.ThemeData = resultData + station.Data.ThemeData = resultData + resultStations = append(resultStations, station) } } // for t.stationGroup.Items } @@ -182,8 +278,8 @@ func (t *CalcTask) Calc() []common_models.Station { // Base - 测量系统基点 // RefS - 远端参考系统测点 // RefB - 远端参考系统基点 -func (t *CalcTask) calcVirtualSettlementBase(item common_models.GroupItem, fields []string) map[string]float64 { - baseStation := t.stationMap[item.StationId] +func (t *CalcTask) calcVirtualSettlementBase(baseItem common_models.GroupItem, fields []string) map[string]float64 { + baseStation := t.stationMap[baseItem.StationId] data, ok := baseStation.Data.GetValidThemeData() if !ok { log.Printf("基点无数据。%s\n", baseStation.LogMsg()) @@ -191,7 +287,7 @@ func (t *CalcTask) calcVirtualSettlementBase(item common_models.GroupItem, field } // SubItem 为 nil,表示未参照任何分组,虚拟基点值等于自己 - if item.SubItems == nil { + if baseItem.SubItems == nil { result := make(map[string]float64) for _, f := range fields { result[f] = data[f] @@ -199,10 +295,10 @@ func (t *CalcTask) calcVirtualSettlementBase(item common_models.GroupItem, field return result } - logMsg, _ := json.Marshal(item) + logMsg, _ := json.Marshal(baseItem) log.Printf("级联分组:%s\n", string(logMsg)) - refS := t.calcVirtualSettlementBase(item.SubItems[settlementParam.Ref_point], fields) - refB := t.calcVirtualSettlementBase(item.SubItems[settlementParam.Ref_base], fields) + refS := t.calcVirtualSettlementBase(baseItem.SubItems[settlementParam.Ref_point], fields) + refB := t.calcVirtualSettlementBase(baseItem.SubItems[settlementParam.Ref_base], fields) if len(refS) == 0 || len(refB) == 0 { return make(map[string]float64) } @@ -231,3 +327,25 @@ func (t *CalcTask) filterFields(arrA []string, arrB []string) ([]string, bool) { return result, len(result) > 0 } + +func (t *CalcTask) calcDeepDisplace() []common_models.Station { + groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime) + fmt.Printf("深部位移分组计算未开发。[%s]\n", groupInfo) + //TODO 深部位移分组计算 + stations := make([]common_models.Station, 0) + for _, s := range t.stationMap { + stations = append(stations, s) + } + return stations +} + +func (t *CalcTask) calcAngleToSettlement() []common_models.Station { + groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime) + fmt.Printf("测斜测沉降或挠度 分组计算未开发。[%s]\n", groupInfo) + //TODO 测斜测沉降或挠度分组计算 + stations := make([]common_models.Station, 0) + for _, s := range t.stationMap { + stations = append(stations, s) + } + return stations +} diff --git a/et_calc/group/calcTask_test.go b/et_calc/group/calcTask_test.go index 39eb6a6..16173f2 100644 --- a/et_calc/group/calcTask_test.go +++ b/et_calc/group/calcTask_test.go @@ -2,6 +2,7 @@ package group import ( "encoding/json" + "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_utils" "github.com/stretchr/testify/assert" @@ -27,5 +28,7 @@ func Test_CalcTask_CheckIntegrity(t *testing.T) { func Test_CalcTask_ToDump(t *testing.T) { task := NewGroupCalcTask(&group, "5分钟周期", "1", time.Now()) - println(task.ToDump()) + logStr, groupTheme := task.ToDump(make([]common_models.Station, 0)) + println(logStr) + fmt.Printf("%v", groupTheme) } diff --git a/et_calc/group/groupCalc.go b/et_calc/group/groupCalc.go index 886546a..acca284 100644 --- a/et_calc/group/groupCalc.go +++ b/et_calc/group/groupCalc.go @@ -1,7 +1,10 @@ package group import ( + "errors" + "et_sink" "gitea.anxinyun.cn/container/common_models" + "gitea.anxinyun.cn/container/common_models/constant/groupType" "gitea.anxinyun.cn/container/common_utils" "gitea.anxinyun.cn/container/common_utils/configLoad" "log" @@ -25,26 +28,50 @@ func GetConfigHelper() *common_utils.ConfigHelper { return configHelperInstance } +// 分组测点 +type GroupStation struct { + Group common_models.StationGroup + Station common_models.Station +} + +// 获取测点分组信息 +func GetGroupInfo(station common_models.Station) []GroupStation { + var gps []common_models.StationGroup + gps = append(gps, station.Info.Group) + gps = append(gps, station.Info.CorrGroups...) + + result := make([]GroupStation, 0) + if gps != nil { + for i := 0; i < len(gps); i++ { + result = append(result, GroupStation{ + Group: gps[i], + Station: station, + }) + } + } + + return result +} + type GroupCalc struct { stage *stages.Stage configHelper *common_utils.ConfigHelper signCalc chan bool calcTasks map[GroupCalcTaskKey]CalcTask + esHandler *et_sink.SinkHandler } func NewGroupCalc() *GroupCalc { calcTaskManager := &GroupCalc{ stage: stages.NewStage("测点分组计算"), configHelper: GetConfigHelper(), - signCalc: make(chan bool, 1), + signCalc: make(chan bool), calcTasks: map[GroupCalcTaskKey]CalcTask{}, + esHandler: et_sink.NewSinkGroupHandler(), } - // 处理超期任务 - //go calcTaskManager.onClearDueTask() - // 添加到 etNode 处理环境,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据) + // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据) calcTaskManager.stage.AddProcess(calcTaskManager.processData) - return calcTaskManager } @@ -54,102 +81,110 @@ func (gc *GroupCalc) GetStage() stages.Stage { // processData 的 stations 被改变了 func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { - var resultStations []common_models.Station + resultStations := make([]common_models.Station, 0) + + // 分组超时任务 + for key, task := range gc.calcTasks { + if task.IsTimeout() { + calcedStations := task.Calc() + resultStations = append(resultStations, calcedStations...) + // ES存储分组主题数据 + //dumpStr, groupTheme := task.ToDump(calcedStations) + //gc.esHandler.SinkGroupDataToES(groupTheme) + //log.Printf("[groupCalc.processData]group timeout calc。 %s \n", dumpStr) + gc.dumpGroupTheme(&task, calcedStations) + delete(gc.calcTasks, key) + } + } + for _, station := range inData.Stations { - calcedStations := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) - log.Printf("ProcessData中的测点个数:%d 计算返回的测点个数: %d", len(inData.Stations), len(calcedStations)) - resultStations = append(resultStations, calcedStations...) + if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement { + //log.Printf("非【液压传感器测量沉降】分组。") + resultStations = append(resultStations, station) + } else { + calcedStations, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) + if err != nil { + resultStations = append(resultStations, station) + } else { + resultStations = append(resultStations, calcedStations...) + } + } } - // 返回处理后的数据 + // 返回处理后的数据,计算后的resultStations可能为空 inData.Stations = resultStations return inData } +// ES存储分组主题 +func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) { + dumpStr, groupTheme := task.ToDump(calcedStations) + log.Println(dumpStr) + if len(groupTheme.Data) > 0 { + gc.esHandler.SinkGroupDataToES(groupTheme) + } +} + // cacheAndCalc 缓存和计算 // station 测点 // dimensionId 采集策略 // taskId 一次周期采集任务 // acqTime 采集时间 -func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId string, taskId string, acqTime time.Time) []common_models.Station { - sGroup := station.Info.Group - corrGroups := station.Info.CorrGroups - if sGroup.Id == 0 || corrGroups == nil || len(corrGroups) == 0 { - // 非分组测点 - return []common_models.Station{*station} - } - +func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId string, taskId string, acqTime time.Time) ([]common_models.Station, error) { + scg := GetGroupInfo(*station) var resultStations []common_models.Station - key := GroupCalcTaskKey{ - GroupId: sGroup.Id, - TaskId: taskId, - } - if calcTask, ok := gc.calcTasks[key]; ok { - // 添加元素 - calcTask.AddStationData(*station) - - // 分组计算 - if calcTask.CheckIntegrity() { - secs := calcTask.ElapsedSecs() - if secs > 10 { - log.Printf("[dataHandler] group calc wait %f秒, %s\n", secs, key.R()) - } - - calcedStations := calcTask.Calc() - if calcedStations != nil { - resultStations = append(resultStations, calcedStations...) - } - - delete(gc.calcTasks, key) + for _, groupStation := range scg { + sGroup := groupStation.Group + if sGroup.Id == 0 || sGroup.GroupType != groupType.Settlement { + return nil, errors.New("非【液压传感器测量沉降】分组") } - } else { - // 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长 - task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) - task.AddStationData(*station) - - if task.CheckIntegrity() { - calcedStations := task.Calc() - if calcedStations != nil { - resultStations = append(resultStations, calcedStations...) + key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId} + if calcTask, ok := gc.calcTasks[key]; ok { + calcTask.AddStationData(*station) + log.Println(calcTask.R()) + // 分组计算 + if calcTask.CheckIntegrity() { + secs := calcTask.ElapsedSecs() + if secs > 10 { + log.Printf("[groupCalc.processData] group calc wait %f秒, %s\n", secs, key.R()) + } + + calcedStations := calcTask.Calc() + // 缓存已计算数据 + if calcedStations != nil && len(calcedStations) > 0 { + resultStations = append(resultStations, calcedStations...) + } + // ES存储分组主题数据 + //dumpStr, groupTheme := calcTask.ToDump(calcedStations) + //gc.esHandler.SinkGroupDataToES(groupTheme) + //log.Println(dumpStr) + gc.dumpGroupTheme(&calcTask, calcedStations) + + delete(gc.calcTasks, key) } } else { - task.SetTimeout() - gc.calcTasks[key] = *task + // 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长 + task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) + task.AddStationData(*station) + log.Println(task.R()) + + if task.CheckIntegrity() { + calcedStations := task.Calc() + // 缓存已计算数据 + if calcedStations != nil && len(calcedStations) > 0 { + resultStations = append(resultStations, calcedStations...) + } + + // ES存储分组主题数据 + gc.dumpGroupTheme(task, calcedStations) + + } else { + task.SetTimeout() + gc.calcTasks[key] = *task + } } } - return resultStations + return resultStations, nil } - -//func (gc *GroupCalc) clearDueTask() { -// for key, task := range gc.calcTasks { -// if task.IsTimeout() { -// result := task.Calc() -// if result != nil { -// // TODO 处理完的数据要传递出去 -// station.Data.GroupData = result -// } -// // 不管计算是否成功,到期的任务都要清除 -// delete(gc.calcTasks, key) -// log.Printf("[dataHandler] group timeout calcTask:%s\n", key.R()) -// } -// } -//} - -//func (gc *GroupCalc) injectGroupData(calcTaskKey string, result map[string]any) { -// // -//} - -//// onClearDueTask 处理超期任务 -//func (gc *GroupCalc) onClearDueTask() { -// for { -// select { -// case <-gc.signCalc: -// case <-time.After(time.Second): -// } -// -// // 过期任务处理 -// gc.clearDueTask() -// } -//} diff --git a/et_calc/group/groupDueTask.go b/et_calc/group/groupDueTask.go deleted file mode 100644 index 654521e..0000000 --- a/et_calc/group/groupDueTask.go +++ /dev/null @@ -1 +0,0 @@ -package group diff --git a/node/et_worker/processors/groupProcess.go b/node/et_worker/processors/groupProcess.go index 7cc7351..70ef018 100644 --- a/node/et_worker/processors/groupProcess.go +++ b/node/et_worker/processors/groupProcess.go @@ -15,11 +15,11 @@ func NewGroupProcess(name string) *Process { return &Process{name: name} } -// Process 分组处理者 , ProcessData -> PyhData +// Process 分组处理者 , ProcessData -> PhyData func (p *GroupProcess) Process(ctx context.Context, params any) (any, error) { log.Printf("[%s]开始执行", p.name) if ProcessData, ok := params.(*common_models.ProcessData); !ok { - return nil, errors.New("不是[PyhData]类型数据") + return nil, errors.New("不是[PhyData]类型数据") } else { log.Printf("[%s]-[%s]处理...", ProcessData.DeviceData.DeviceId, ProcessData.DeviceData.Name) return ProcessData, nil