package group import ( "encoding/json" "fmt" "gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models/constant/groupType" "gitea.anxinyun.cn/container/common_models/constant/settlementParam" "gitea.anxinyun.cn/container/common_utils" "log" "time" ) //沉降分组业务处理说明: //dimensionId 对应WEB端配置:组网配置/采集策略 //taskId是维度下的schema每次调用的时候,生成的一个编码 //分组配置要求:分组计算中的测点的【采集策略】同一个周期采集 //特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致 type CalcTask struct { *BaseDueTask stationGroup *common_models.StationGroup dimensionId string // 第一个测点归属的采集维度 taskId string acqTime time.Time stationMap map[int]common_models.Station configHelper *common_utils.ConfigHelper } func NewGroupCalcTask(group *common_models.StationGroup, dimensionId string, taskId string, acqTime time.Time) *CalcTask { return &CalcTask{ BaseDueTask: NewBaseDueTask(), stationGroup: group, dimensionId: dimensionId, taskId: taskId, acqTime: acqTime, stationMap: make(map[int]common_models.Station), configHelper: GetConfigHelper(), } } func (t *CalcTask) AddStationData(data common_models.Station) { if data.Data.ThemeData == nil || len(data.Data.ThemeData) == 0 { return } t.stationMap[data.Info.Id] = data log.Println(t.R()) } // CheckIntegrity 检查计算项是否完整 func (t *CalcTask) CheckIntegrity() bool { for _, item := range t.stationGroup.AllCorrItems() { if _, ok := t.stationMap[item.StationId]; !ok { return false } } return true } // SetTimeout 根据第一个测点归属的采集维度类型,设置任务超时时长 func (t *CalcTask) SetTimeout() int { var expireSeconds int if t.stationMap == nil || len(t.stationMap) == 0 { expireSeconds = DefaultExpire() } if t.dimensionId == "" { expireSeconds = DefaultExpire() } else { expireSeconds = FromDimension(t.dimensionId) } // 小于3分钟,返回180秒 if expireSeconds < 60*3 { expireSeconds = 60 * 3 } t.SetDeadLineTime(expireSeconds) return expireSeconds } func (t *CalcTask) R() string { cachedCount := len(t.stationMap) corrItemCount := len(t.stationGroup.AllCorrItems()) groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime) msg := fmt.Sprintf("[%s]缓存[%d/%d]个元素", groupInfo, cachedCount, corrItemCount) return msg } // 输出分组计算信息 func (t *CalcTask) ToDump(calcedStations []common_models.Station) (string, common_models.EsGroupTheme) { // 分组基本信息 var structId = 0 for _, station := range t.stationMap { structId = station.Info.StructureId break } groupInfo := map[string]any{ "structId": structId, "groupId": t.stationGroup.Id, "groupName": t.stationGroup.Name, "groupType": t.stationGroup.GroupType, "taskId": t.taskId, "acqTime": t.acqTime, } groupInfoBytes, err := json.Marshal(groupInfo) if err != nil { log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] groupInfo marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err) } // groupCorrItems 分组项信息,对应 Redis 中的 sg:stationId groupCorrItems := make([]common_models.StationGroupInfo, 0) for _, item := range t.stationGroup.AllCorrItems() { sg, err := t.configHelper.GetStationGroupInfo(item.StationId) if err == nil { groupCorrItems = append(groupCorrItems, sg) } } itemsBytes, err := json.Marshal(groupCorrItems) if err != nil { log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] groupCorrItems marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err) } // 分组计算结果 data := make([]common_models.CorrItemData, 0) for _, s := range calcedStations { var isBasePtr *bool for _, item := range groupCorrItems { if item.StationId == s.Info.Id { if baseVal, ok := item.Params[settlementParam.Base]; ok { isBase := baseVal.(bool) isBasePtr = &isBase } break } } sd := common_models.CorrItemData{ StationId: s.Info.Id, StationName: s.Info.Name, IsBase: isBasePtr, PhyData: s.Data.PhyData, ThemeData: s.Data.ThemeData, } data = append(data, sd) } dataBytes, err := json.Marshal(data) if err != nil { log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] calcedResult marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err) } groupTheme := common_models.EsGroupTheme{ StructId: structId, GroupId: t.stationGroup.Id, GroupName: t.stationGroup.Name, GroupType: t.stationGroup.GroupType, TaskId: t.taskId, CorrItems: groupCorrItems, Data: data, CollectTime: t.acqTime.Truncate(time.Millisecond), CreateTime: time.Now().Truncate(time.Millisecond), } logStr := fmt.Sprintf("[%s]分组计算完毕。\n groupInfo: %s,\n corrItems(分组项+级联项): %s,\n calcedResult(计算后分组项): %s", t.stationGroup.Name, string(groupInfoBytes), string(itemsBytes), string(dataBytes)) return logStr, groupTheme } func (t *CalcTask) Calc() []common_models.Station { switch t.stationGroup.GroupType { case groupType.Settlement: return t.calcSettlement() case groupType.DeepDisplace: return t.calcDeepDisplace() case groupType.AngleToSettlement: return t.calcAngleToSettlement() default: log.Printf(fmt.Sprintf("[%s]错误的分组类型。", t.stationGroup.GroupType)) return make([]common_models.Station, 0) } } // SettlementCalc 沉降分组计算 func (t *CalcTask) calcSettlement() []common_models.Station { // 基点信息异常时数据处理:将 themeData 设置为 map[string]any{}; funcResetThemeData := func() []common_models.Station { result := make([]common_models.Station, 0) for _, groupItem := range t.stationGroup.Items { objStation, ok := t.stationMap[groupItem.StationId] if !ok { continue } objStation.Data.ThemeData = map[string]any{} result = append(result, objStation) } return result } resultStations := make([]common_models.Station, 0) baseItem := t.stationGroup.GetSettlementBaseItem() if baseItem == nil || t.stationMap[baseItem.StationId].Info.Id == 0 { // 未设置基点,重置主题数据为 map[string]any{} return funcResetThemeData() } else { // 基点主题数据无效 baseStation := t.stationMap[baseItem.StationId] _, ok := baseStation.Data.GetValidThemeData() if !ok { log.Printf("基点数据计算失败,无效的主题数据。%s\n", baseStation.LogMsg()) return funcResetThemeData() } // baseValidFields 有效的监测项,主题数据的 field 必须在监测原型中存在 base_themeData_fields := baseStation.Data.GetThemeFields() base_proto_fields := baseStation.GetProtoFields() baseValidFields, ok := t.filterFields(base_themeData_fields, base_proto_fields) if !ok { log.Printf("基点数据计算失败,无效的监测项(数据 field 必须在监测原型中存在)。%s\n", baseStation.LogMsg()) return funcResetThemeData() } // 包含虚拟基点的计算 virtualBase := t.calcVirtualSettlementBase(*baseItem, baseValidFields) if virtualBase == nil || len(virtualBase) == 0 { log.Printf("虚拟基点数据计算失败。%s\n", baseStation.LogMsg()) return funcResetThemeData() } // 减基点计算 for _, groupItem := range t.stationGroup.Items { station, ok := t.stationMap[groupItem.StationId] if !ok { continue } if groupItem.ParamsValue[settlementParam.Base] == true { // 基点主题数据处理: 基点主题值 = 虚拟基点值 station.Data.ThemeData = map[string]any{} for key, val := range virtualBase { station.Data.ThemeData[key] = val } resultStations = append(resultStations, station) } else { // 测点数据处理 themeData, ok := station.Data.GetValidThemeData() if !ok { log.Printf("减基点计算失败,无有效的主题数据。%s\n", station.LogMsg()) station.Data.ThemeData = map[string]any{} resultStations = append(resultStations, station) continue } // validFields 有效的监测项,主题数据的 field 必须在 baseValidFields 中存在 station_themeData_fields := station.Data.GetThemeFields() validFields, ok := t.filterFields(station_themeData_fields, baseValidFields) if !ok { log.Printf("基点数据计算失败,主题数据的 field 必须在 baseValidFields 中存在;"+ "baseValidFields:%v, station_themeData_fields:%v 。 %s\n", baseValidFields, station_themeData_fields, baseStation.LogMsg()) station.Data.ThemeData = map[string]any{} resultStations = append(resultStations, station) continue } // 减基点 resultData := make(map[string]interface{}) for _, field := range validFields { resultData[field] = virtualBase[field] - themeData[field] } station.Data.ThemeData = resultData resultStations = append(resultStations, station) } } // for t.stationGroup.Items } return resultStations } // calcVirtualSettlementBase 计算虚拟基点 // Result = Base + (RefB - RefS) // Base - 测量系统基点 // RefS - 远端参考系统测点 // RefB - 远端参考系统基点 func (t *CalcTask) calcVirtualSettlementBase(baseItem common_models.GroupItem, fields []string) map[string]float64 { baseStation := t.stationMap[baseItem.StationId] data, ok := baseStation.Data.GetValidThemeData() if !ok { log.Printf("基点无数据。%s\n", baseStation.LogMsg()) return make(map[string]float64) } // SubItem 为 nil,表示未参照任何分组,虚拟基点值等于自己 if baseItem.SubItems == nil { result := make(map[string]float64) for _, f := range fields { result[f] = data[f] } return result } logMsg, _ := json.Marshal(baseItem) log.Printf("级联分组:%s\n", string(logMsg)) refS := t.calcVirtualSettlementBase(baseItem.SubItems[settlementParam.Ref_point], fields) refB := t.calcVirtualSettlementBase(baseItem.SubItems[settlementParam.Ref_base], fields) if len(refS) == 0 || len(refB) == 0 { return make(map[string]float64) } result := make(map[string]float64) for _, f := range fields { // 虚拟基点 = 当前分组的基点测量值 + 参考点沉降 result[f] = data[f] + (refB[f] - refS[f]) } return result } // filterFields 过滤有效的数据项(交集操作) func (t *CalcTask) filterFields(arrA []string, arrB []string) ([]string, bool) { setA := make(map[string]bool) for _, a := range arrA { setA[a] = true } var result []string for _, b := range arrB { if setA[b] { result = append(result, b) } } return result, len(result) > 0 } func (t *CalcTask) calcDeepDisplace() []common_models.Station { groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime) fmt.Printf("深部位移分组计算未开发。[%s]\n", groupInfo) //TODO 深部位移分组计算 stations := make([]common_models.Station, 0) for _, s := range t.stationMap { stations = append(stations, s) } return stations } func (t *CalcTask) calcAngleToSettlement() []common_models.Station { groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime) fmt.Printf("测斜测沉降或挠度 分组计算未开发。[%s]\n", groupInfo) //TODO 测斜测沉降或挠度分组计算 stations := make([]common_models.Station, 0) for _, s := range t.stationMap { stations = append(stations, s) } return stations }