|
@ -16,7 +16,7 @@ import ( |
|
|
var ( |
|
|
var ( |
|
|
configHelperInstance *common_utils.ConfigHelper |
|
|
configHelperInstance *common_utils.ConfigHelper |
|
|
once sync.Once |
|
|
once sync.Once |
|
|
mu sync.Mutex |
|
|
calcTasks sync.Map |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
func GetConfigHelper() *common_utils.ConfigHelper { |
|
|
func GetConfigHelper() *common_utils.ConfigHelper { |
|
@ -57,7 +57,6 @@ type GroupCalc struct { |
|
|
stage *stages.Stage |
|
|
stage *stages.Stage |
|
|
configHelper *common_utils.ConfigHelper |
|
|
configHelper *common_utils.ConfigHelper |
|
|
signCalc chan bool |
|
|
signCalc chan bool |
|
|
calcTasks map[GroupCalcTaskKey]CalcTask |
|
|
|
|
|
esHandler *et_sink.SinkHandler |
|
|
esHandler *et_sink.SinkHandler |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -66,11 +65,10 @@ func NewGroupCalc() *GroupCalc { |
|
|
stage: stages.NewStage("测点分组计算"), |
|
|
stage: stages.NewStage("测点分组计算"), |
|
|
configHelper: GetConfigHelper(), |
|
|
configHelper: GetConfigHelper(), |
|
|
signCalc: make(chan bool), |
|
|
signCalc: make(chan bool), |
|
|
calcTasks: map[GroupCalcTaskKey]CalcTask{}, |
|
|
|
|
|
esHandler: et_sink.NewSinkGroupHandler(), |
|
|
esHandler: et_sink.NewSinkGroupHandler(), |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算 -> 分组数据)
|
|
|
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES)
|
|
|
calcTaskManager.stage.AddProcess(calcTaskManager.processData) |
|
|
calcTaskManager.stage.AddProcess(calcTaskManager.processData) |
|
|
return calcTaskManager |
|
|
return calcTaskManager |
|
|
} |
|
|
} |
|
@ -81,47 +79,77 @@ func (gc *GroupCalc) GetStage() stages.Stage { |
|
|
|
|
|
|
|
|
// processData 的 stations 被改变了
|
|
|
// processData 的 stations 被改变了
|
|
|
func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { |
|
|
func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { |
|
|
|
|
|
log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations)) |
|
|
resultStations := make([]common_models.Station, 0) |
|
|
resultStations := make([]common_models.Station, 0) |
|
|
|
|
|
|
|
|
// 分组超时任务
|
|
|
// 分组超时任务
|
|
|
for key, task := range gc.calcTasks { |
|
|
resultStations = append(resultStations, gc.processTimeoutTasks()...) |
|
|
if task.IsTimeout() { |
|
|
|
|
|
calcedStations := task.Calc() |
|
|
|
|
|
resultStations = append(resultStations, calcedStations...) |
|
|
|
|
|
// ES存储分组主题数据
|
|
|
|
|
|
//dumpStr, groupTheme := task.ToDump(calcedStations)
|
|
|
|
|
|
//gc.esHandler.SinkGroupDataToES(groupTheme)
|
|
|
|
|
|
//log.Printf("[groupCalc.processData]group timeout calc。 %s \n", dumpStr)
|
|
|
|
|
|
gc.dumpGroupTheme(&task, calcedStations) |
|
|
|
|
|
delete(gc.calcTasks, key) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 常规分组计算
|
|
|
for _, station := range inData.Stations { |
|
|
for _, station := range inData.Stations { |
|
|
if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement { |
|
|
if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement { |
|
|
//log.Printf("非【液压传感器测量沉降】分组。")
|
|
|
//log.Printf("非【液压传感器测量沉降】分组。")
|
|
|
resultStations = append(resultStations, station) |
|
|
resultStations = append(resultStations, station) |
|
|
} else { |
|
|
} else { |
|
|
calcedStations, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) |
|
|
calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) |
|
|
if err != nil { |
|
|
if err == nil { |
|
|
resultStations = append(resultStations, station) |
|
|
log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet)) |
|
|
} else { |
|
|
resultStations = append(resultStations, calcRet...) |
|
|
resultStations = append(resultStations, calcedStations...) |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations)) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 返回处理后的数据,计算后的resultStations可能为空
|
|
|
// 计算失败的测点不返回
|
|
|
inData.Stations = resultStations |
|
|
result := make([]common_models.Station, 0) |
|
|
|
|
|
for _, station := range resultStations { |
|
|
|
|
|
if len(station.Data.ThemeData) > 0 { |
|
|
|
|
|
////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。
|
|
|
|
|
|
//filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14}
|
|
|
|
|
|
//for _, number := range filterSensorIds {
|
|
|
|
|
|
// if number == station.Info.Id {
|
|
|
|
|
|
// log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
|
|
|
|
|
|
// break
|
|
|
|
|
|
// }
|
|
|
|
|
|
//}
|
|
|
|
|
|
//// #TEST END
|
|
|
|
|
|
// 需要返回的测点
|
|
|
|
|
|
result = append(result, station) |
|
|
|
|
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 计算后的 result 可能为空
|
|
|
|
|
|
log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result)) |
|
|
|
|
|
inData.Stations = result |
|
|
return inData |
|
|
return inData |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (gc *GroupCalc) processTimeoutTasks() []common_models.Station { |
|
|
|
|
|
resultStations := make([]common_models.Station, 0) |
|
|
|
|
|
calcTasks.Range(func(key, value interface{}) bool { |
|
|
|
|
|
//log.Println("Key:", key, "Value:", value)
|
|
|
|
|
|
task := value.(CalcTask) |
|
|
|
|
|
if task.IsTimeout() { |
|
|
|
|
|
calcRet := task.Calc() |
|
|
|
|
|
resultStations = append(resultStations, calcRet...) |
|
|
|
|
|
// ES存储分组主题数据
|
|
|
|
|
|
gc.dumpGroupTheme(&task, calcRet) |
|
|
|
|
|
calcTasks.Delete(key) |
|
|
|
|
|
} |
|
|
|
|
|
return true // 返回 true 继续遍历,返回 false 停止遍历
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
return resultStations |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// ES存储分组主题
|
|
|
// ES存储分组主题
|
|
|
func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) { |
|
|
func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) { |
|
|
dumpStr, groupTheme := task.ToDump(calcedStations) |
|
|
dumpStr, esDoc := task.ToDump(calcedStations) |
|
|
log.Println(dumpStr) |
|
|
log.Println(dumpStr) |
|
|
if len(groupTheme.Data) > 0 { |
|
|
if len(esDoc.Data) > 0 { |
|
|
gc.esHandler.SinkGroupDataToES(groupTheme) |
|
|
gc.esHandler.SinkGroupDataToES(esDoc) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -140,48 +168,39 @@ func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId st |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId} |
|
|
key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId} |
|
|
if calcTask, ok := gc.calcTasks[key]; ok { |
|
|
if value, ok := calcTasks.Load(key); ok { |
|
|
|
|
|
calcTask := value.(CalcTask) |
|
|
calcTask.AddStationData(*station) |
|
|
calcTask.AddStationData(*station) |
|
|
log.Println(calcTask.R()) |
|
|
|
|
|
// 分组计算
|
|
|
// 分组计算
|
|
|
if calcTask.CheckIntegrity() { |
|
|
if calcTask.CheckIntegrity() { |
|
|
secs := calcTask.ElapsedSecs() |
|
|
secs := calcTask.ElapsedSecs() |
|
|
if secs > 10 { |
|
|
if secs > 10 { |
|
|
log.Printf("[groupCalc.processData] group calc wait %f秒, %s\n", secs, key.R()) |
|
|
log.Printf("[groupCalc.processData] wait %f秒, %s\n", secs, key.R()) |
|
|
} |
|
|
} |
|
|
|
|
|
calcRet := calcTask.Calc() |
|
|
calcedStations := calcTask.Calc() |
|
|
if calcRet != nil && len(calcRet) > 0 { |
|
|
// 缓存已计算数据
|
|
|
resultStations = append(resultStations, calcRet...) |
|
|
if calcedStations != nil && len(calcedStations) > 0 { |
|
|
|
|
|
resultStations = append(resultStations, calcedStations...) |
|
|
|
|
|
} |
|
|
} |
|
|
// ES存储分组主题数据
|
|
|
|
|
|
//dumpStr, groupTheme := calcTask.ToDump(calcedStations)
|
|
|
|
|
|
//gc.esHandler.SinkGroupDataToES(groupTheme)
|
|
|
|
|
|
//log.Println(dumpStr)
|
|
|
|
|
|
gc.dumpGroupTheme(&calcTask, calcedStations) |
|
|
|
|
|
|
|
|
|
|
|
delete(gc.calcTasks, key) |
|
|
// ES存储分组主题数据
|
|
|
|
|
|
gc.dumpGroupTheme(&calcTask, calcRet) |
|
|
|
|
|
calcTasks.Delete(key) |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
|
// 不存在的计算任务:要取到首个元素的设备的采集策略(维度),以便后面获得过期时长
|
|
|
// 新计算任务:取到第一个的采集策略(维度),以便后面获得过期时长
|
|
|
task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) |
|
|
task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime) |
|
|
task.AddStationData(*station) |
|
|
task.AddStationData(*station) |
|
|
log.Println(task.R()) |
|
|
|
|
|
|
|
|
|
|
|
if task.CheckIntegrity() { |
|
|
if task.CheckIntegrity() { |
|
|
calcedStations := task.Calc() |
|
|
calcRet := task.Calc() |
|
|
// 缓存已计算数据
|
|
|
if calcRet != nil && len(calcRet) > 0 { |
|
|
if calcedStations != nil && len(calcedStations) > 0 { |
|
|
resultStations = append(resultStations, calcRet...) |
|
|
resultStations = append(resultStations, calcedStations...) |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// ES存储分组主题数据
|
|
|
// ES存储分组主题数据
|
|
|
gc.dumpGroupTheme(task, calcedStations) |
|
|
gc.dumpGroupTheme(task, calcRet) |
|
|
|
|
|
|
|
|
} else { |
|
|
} else { |
|
|
task.SetTimeout() |
|
|
task.SetTimeout() |
|
|
gc.calcTasks[key] = *task |
|
|
log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime) |
|
|
|
|
|
calcTasks.Store(key, *task) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|