You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
356 lines
12 KiB
356 lines
12 KiB
package group
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"gitea.anxinyun.cn/container/common_models"
|
|
"gitea.anxinyun.cn/container/common_models/constant/groupType"
|
|
"gitea.anxinyun.cn/container/common_models/constant/settlementParam"
|
|
"gitea.anxinyun.cn/container/common_utils"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
//沉降分组业务处理说明:
|
|
//dimensionId 对应WEB端配置:组网配置/采集策略
|
|
//taskId是维度下的schema每次调用的时候,生成的一个编码
|
|
//分组配置要求:分组计算中的测点的【采集策略】同一个周期采集
|
|
//特殊场景:上报类测点,要进行分组计算,需要在协议里处理,输出_acq_number确保一致
|
|
|
|
type CalcTask struct {
|
|
*BaseDueTask
|
|
stationGroup *common_models.StationGroup
|
|
dimensionId string // 第一个测点归属的采集维度
|
|
taskId string
|
|
acqTime time.Time
|
|
stationMap map[int]common_models.Station
|
|
configHelper *common_utils.ConfigHelper
|
|
}
|
|
|
|
func NewGroupCalcTask(group *common_models.StationGroup, dimensionId string, taskId string, acqTime time.Time) *CalcTask {
|
|
return &CalcTask{
|
|
BaseDueTask: NewBaseDueTask(),
|
|
stationGroup: group,
|
|
dimensionId: dimensionId,
|
|
taskId: taskId,
|
|
acqTime: acqTime,
|
|
stationMap: make(map[int]common_models.Station),
|
|
configHelper: GetConfigHelper(),
|
|
}
|
|
}
|
|
|
|
func (t *CalcTask) AddStationData(data common_models.Station) {
|
|
if data.Data.ThemeData == nil || len(data.Data.ThemeData) == 0 {
|
|
return
|
|
}
|
|
t.stationMap[data.Info.Id] = data
|
|
log.Println(t.R())
|
|
}
|
|
|
|
// CheckIntegrity 检查计算项是否完整
|
|
func (t *CalcTask) CheckIntegrity() bool {
|
|
for _, item := range t.stationGroup.AllCorrItems() {
|
|
if _, ok := t.stationMap[item.StationId]; !ok {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// SetTimeout 根据第一个测点归属的采集维度类型,设置任务超时时长
|
|
func (t *CalcTask) SetTimeout() int {
|
|
var expireSeconds int
|
|
|
|
if t.stationMap == nil || len(t.stationMap) == 0 {
|
|
expireSeconds = DefaultExpire()
|
|
}
|
|
|
|
if t.dimensionId == "" {
|
|
expireSeconds = DefaultExpire()
|
|
} else {
|
|
expireSeconds = FromDimension(t.dimensionId)
|
|
}
|
|
|
|
// 小于3分钟,返回180秒
|
|
if expireSeconds < 60*3 {
|
|
expireSeconds = 60 * 3
|
|
}
|
|
t.SetDeadLineTime(expireSeconds)
|
|
return expireSeconds
|
|
}
|
|
|
|
func (t *CalcTask) R() string {
|
|
cachedCount := len(t.stationMap)
|
|
corrItemCount := len(t.stationGroup.AllCorrItems())
|
|
groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime)
|
|
msg := fmt.Sprintf("[%s]缓存[%d/%d]个元素", groupInfo, cachedCount, corrItemCount)
|
|
return msg
|
|
}
|
|
|
|
// 输出分组计算信息
|
|
func (t *CalcTask) ToDump(calcedStations []common_models.Station) (string, common_models.EsGroupTheme) {
|
|
// 分组基本信息
|
|
var structId = 0
|
|
for _, station := range t.stationMap {
|
|
structId = station.Info.StructureId
|
|
break
|
|
}
|
|
groupInfo := map[string]any{
|
|
"structId": structId,
|
|
"groupId": t.stationGroup.Id,
|
|
"groupName": t.stationGroup.Name,
|
|
"groupType": t.stationGroup.GroupType,
|
|
"taskId": t.taskId,
|
|
"acqTime": t.acqTime,
|
|
}
|
|
groupInfoBytes, err := json.Marshal(groupInfo)
|
|
if err != nil {
|
|
log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] groupInfo marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err)
|
|
}
|
|
|
|
// groupCorrItems 分组项信息,对应 Redis 中的 sg:stationId
|
|
groupCorrItems := make([]common_models.StationGroupInfo, 0)
|
|
for _, item := range t.stationGroup.AllCorrItems() {
|
|
sg, err := t.configHelper.GetStationGroupInfo(item.StationId)
|
|
if err == nil {
|
|
groupCorrItems = append(groupCorrItems, sg)
|
|
}
|
|
}
|
|
itemsBytes, err := json.Marshal(groupCorrItems)
|
|
if err != nil {
|
|
log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] groupCorrItems marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err)
|
|
}
|
|
|
|
// 分组计算结果
|
|
data := make([]common_models.CorrItemData, 0)
|
|
for _, s := range calcedStations {
|
|
var isBasePtr *bool
|
|
for _, item := range groupCorrItems {
|
|
if item.StationId == s.Info.Id {
|
|
if baseVal, ok := item.Params[settlementParam.Base]; ok {
|
|
isBase := baseVal.(bool)
|
|
isBasePtr = &isBase
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
sd := common_models.CorrItemData{
|
|
StationId: s.Info.Id,
|
|
StationName: s.Info.Name,
|
|
IsBase: isBasePtr,
|
|
PhyData: s.Data.PhyData,
|
|
ThemeData: s.Data.ThemeData,
|
|
}
|
|
data = append(data, sd)
|
|
}
|
|
dataBytes, err := json.Marshal(data)
|
|
if err != nil {
|
|
log.Printf("[et_calc.group.CalcTask.ToDump()] [%d-%s] calcedResult marshal error:%v \n", t.stationGroup.Id, t.stationGroup.Name, err)
|
|
}
|
|
|
|
groupTheme := common_models.EsGroupTheme{
|
|
StructId: structId,
|
|
GroupId: t.stationGroup.Id,
|
|
GroupName: t.stationGroup.Name,
|
|
GroupType: t.stationGroup.GroupType,
|
|
TaskId: t.taskId,
|
|
CorrItems: groupCorrItems,
|
|
Data: data,
|
|
CollectTime: t.acqTime.Truncate(time.Millisecond),
|
|
CreateTime: time.Now().Truncate(time.Millisecond),
|
|
}
|
|
|
|
logStr := fmt.Sprintf("[%s]分组计算完毕。\n groupInfo: %s,\n corrItems(分组项+级联项): %s,\n calcedResult(计算后分组项): %s", t.stationGroup.Name, string(groupInfoBytes), string(itemsBytes), string(dataBytes))
|
|
return logStr, groupTheme
|
|
}
|
|
|
|
func (t *CalcTask) Calc() []common_models.Station {
|
|
switch t.stationGroup.GroupType {
|
|
case groupType.Settlement:
|
|
return t.calcSettlement()
|
|
case groupType.DeepDisplace:
|
|
return t.calcDeepDisplace()
|
|
case groupType.AngleToSettlement:
|
|
return t.calcAngleToSettlement()
|
|
default:
|
|
log.Printf(fmt.Sprintf("[%s]错误的分组类型。", t.stationGroup.GroupType))
|
|
return make([]common_models.Station, 0)
|
|
}
|
|
}
|
|
|
|
// SettlementCalc 沉降分组计算
|
|
func (t *CalcTask) calcSettlement() []common_models.Station {
|
|
// 基点信息异常时数据处理:将 themeData 设置为 map[string]any{};
|
|
funcResetThemeData := func() []common_models.Station {
|
|
result := make([]common_models.Station, 0)
|
|
for _, groupItem := range t.stationGroup.Items {
|
|
objStation, ok := t.stationMap[groupItem.StationId]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
objStation.Data.ThemeData = map[string]any{}
|
|
result = append(result, objStation)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
resultStations := make([]common_models.Station, 0)
|
|
baseItem := t.stationGroup.GetSettlementBaseItem()
|
|
if baseItem == nil || t.stationMap[baseItem.StationId].Info.Id == 0 {
|
|
// 未设置基点,重置主题数据为 map[string]any{}
|
|
return funcResetThemeData()
|
|
} else {
|
|
// 基点主题数据无效
|
|
baseStation := t.stationMap[baseItem.StationId]
|
|
_, ok := baseStation.Data.GetValidThemeData()
|
|
if !ok {
|
|
log.Printf("基点数据计算失败,无效的主题数据。%s\n", baseStation.LogMsg())
|
|
return funcResetThemeData()
|
|
}
|
|
|
|
// baseValidFields 有效的监测项,主题数据的 field 必须在监测原型中存在
|
|
base_themeData_fields := baseStation.Data.GetThemeFields()
|
|
base_proto_fields := baseStation.GetProtoFields()
|
|
baseValidFields, ok := t.filterFields(base_themeData_fields, base_proto_fields)
|
|
if !ok {
|
|
log.Printf("基点数据计算失败,无效的监测项(数据 field 必须在监测原型中存在)。%s\n", baseStation.LogMsg())
|
|
return funcResetThemeData()
|
|
}
|
|
|
|
// 包含虚拟基点的计算
|
|
virtualBase := t.calcVirtualSettlementBase(*baseItem, baseValidFields)
|
|
if virtualBase == nil || len(virtualBase) == 0 {
|
|
log.Printf("虚拟基点数据计算失败。%s\n", baseStation.LogMsg())
|
|
return funcResetThemeData()
|
|
}
|
|
|
|
// 减基点计算
|
|
for _, groupItem := range t.stationGroup.Items {
|
|
station, ok := t.stationMap[groupItem.StationId]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
if groupItem.ParamsValue[settlementParam.Base] == true {
|
|
// 基点主题数据处理: 基点主题值 = 虚拟基点值
|
|
station.Data.ThemeData = map[string]any{}
|
|
for key, val := range virtualBase {
|
|
station.Data.ThemeData[key] = val
|
|
}
|
|
resultStations = append(resultStations, station)
|
|
} else {
|
|
// 测点数据处理
|
|
themeData, ok := station.Data.GetValidThemeData()
|
|
if !ok {
|
|
log.Printf("减基点计算失败,无有效的主题数据。%s\n", station.LogMsg())
|
|
station.Data.ThemeData = map[string]any{}
|
|
resultStations = append(resultStations, station)
|
|
continue
|
|
}
|
|
|
|
// validFields 有效的监测项,主题数据的 field 必须在 baseValidFields 中存在
|
|
station_themeData_fields := station.Data.GetThemeFields()
|
|
validFields, ok := t.filterFields(station_themeData_fields, baseValidFields)
|
|
if !ok {
|
|
log.Printf("基点数据计算失败,主题数据的 field 必须在 baseValidFields 中存在;"+
|
|
"baseValidFields:%v, station_themeData_fields:%v 。 %s\n",
|
|
baseValidFields, station_themeData_fields, baseStation.LogMsg())
|
|
|
|
station.Data.ThemeData = map[string]any{}
|
|
resultStations = append(resultStations, station)
|
|
continue
|
|
}
|
|
|
|
// 减基点
|
|
resultData := make(map[string]interface{})
|
|
for _, field := range validFields {
|
|
resultData[field] = virtualBase[field] - themeData[field]
|
|
}
|
|
station.Data.ThemeData = resultData
|
|
resultStations = append(resultStations, station)
|
|
}
|
|
} // for t.stationGroup.Items
|
|
}
|
|
|
|
return resultStations
|
|
}
|
|
|
|
// calcVirtualSettlementBase 计算虚拟基点
|
|
// Result = Base + (RefB - RefS)
|
|
// Base - 测量系统基点
|
|
// RefS - 远端参考系统测点
|
|
// RefB - 远端参考系统基点
|
|
func (t *CalcTask) calcVirtualSettlementBase(baseItem common_models.GroupItem, fields []string) map[string]float64 {
|
|
baseStation := t.stationMap[baseItem.StationId]
|
|
data, ok := baseStation.Data.GetValidThemeData()
|
|
if !ok {
|
|
log.Printf("基点无数据。%s\n", baseStation.LogMsg())
|
|
return make(map[string]float64)
|
|
}
|
|
|
|
// SubItem 为 nil,表示未参照任何分组,虚拟基点值等于自己
|
|
if baseItem.SubItems == nil {
|
|
result := make(map[string]float64)
|
|
for _, f := range fields {
|
|
result[f] = data[f]
|
|
}
|
|
return result
|
|
}
|
|
|
|
logMsg, _ := json.Marshal(baseItem)
|
|
log.Printf("级联分组:%s\n", string(logMsg))
|
|
refS := t.calcVirtualSettlementBase(baseItem.SubItems[settlementParam.Ref_point], fields)
|
|
refB := t.calcVirtualSettlementBase(baseItem.SubItems[settlementParam.Ref_base], fields)
|
|
if len(refS) == 0 || len(refB) == 0 {
|
|
return make(map[string]float64)
|
|
}
|
|
|
|
result := make(map[string]float64)
|
|
for _, f := range fields {
|
|
// 虚拟基点 = 当前分组的基点测量值 + 参考点沉降
|
|
result[f] = data[f] + (refB[f] - refS[f])
|
|
}
|
|
return result
|
|
}
|
|
|
|
// filterFields 过滤有效的数据项(交集操作)
|
|
func (t *CalcTask) filterFields(arrA []string, arrB []string) ([]string, bool) {
|
|
setA := make(map[string]bool)
|
|
for _, a := range arrA {
|
|
setA[a] = true
|
|
}
|
|
|
|
var result []string
|
|
for _, b := range arrB {
|
|
if setA[b] {
|
|
result = append(result, b)
|
|
}
|
|
}
|
|
|
|
return result, len(result) > 0
|
|
}
|
|
|
|
func (t *CalcTask) calcDeepDisplace() []common_models.Station {
|
|
groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime)
|
|
fmt.Printf("深部位移分组计算未开发。[%s]\n", groupInfo)
|
|
//TODO 深部位移分组计算
|
|
stations := make([]common_models.Station, 0)
|
|
for _, s := range t.stationMap {
|
|
stations = append(stations, s)
|
|
}
|
|
return stations
|
|
}
|
|
|
|
func (t *CalcTask) calcAngleToSettlement() []common_models.Station {
|
|
groupInfo := fmt.Sprintf("%s分组:%d-%s taskId:%s acqTime:%s", t.stationGroup.GroupType, t.stationGroup.Id, t.stationGroup.Name, t.taskId, t.acqTime)
|
|
fmt.Printf("测斜测沉降或挠度 分组计算未开发。[%s]\n", groupInfo)
|
|
//TODO 测斜测沉降或挠度分组计算
|
|
stations := make([]common_models.Station, 0)
|
|
for _, s := range t.stationMap {
|
|
stations = append(stations, s)
|
|
}
|
|
return stations
|
|
}
|
|
|