et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

209 lines
6.8 KiB

package group
import (
"errors"
"et_sink"
"gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_models/constant/groupType"
"gitea.anxinyun.cn/container/common_utils"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log"
"node/stages"
"sync"
"time"
)
var (
configHelperInstance *common_utils.ConfigHelper
once sync.Once
calcTasks sync.Map
)
func GetConfigHelper() *common_utils.ConfigHelper {
once.Do(func() {
configYaml := configLoad.LoadConfig()
redisAdd := configYaml.GetString("redis.address")
configHelperInstance = common_utils.NewConfigHelper(redisAdd)
})
return configHelperInstance
}
// 分组测点
type GroupStation struct {
Group common_models.StationGroup
Station common_models.Station
}
// 获取测点分组信息
func GetGroupInfo(station common_models.Station) []GroupStation {
var gps []common_models.StationGroup
gps = append(gps, station.Info.Group)
gps = append(gps, station.Info.CorrGroups...)
result := make([]GroupStation, 0)
if gps != nil {
for i := 0; i < len(gps); i++ {
result = append(result, GroupStation{
Group: gps[i],
Station: station,
})
}
}
return result
}
type GroupCalc struct {
stage *stages.Stage
configHelper *common_utils.ConfigHelper
signCalc chan bool
esHandler *et_sink.SinkHandler
}
func NewGroupCalc() *GroupCalc {
calcTaskManager := &GroupCalc{
stage: stages.NewStage("测点分组计算"),
configHelper: GetConfigHelper(),
signCalc: make(chan bool),
esHandler: et_sink.NewSinkGroupHandler(),
}
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES)
calcTaskManager.stage.AddProcess(calcTaskManager.processData)
return calcTaskManager
}
func (gc *GroupCalc) GetStage() stages.Stage {
return *gc.stage
}
// processData 的 stations 被改变了
func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData {
log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations))
resultStations := make([]common_models.Station, 0)
// 分组超时任务
resultStations = append(resultStations, gc.processTimeoutTasks()...)
// 常规分组计算
for _, station := range inData.Stations {
if station.Info.Group.Id == 0 || station.Info.Group.GroupType != groupType.Settlement {
//log.Printf("非【液压传感器测量沉降】分组。")
resultStations = append(resultStations, station)
} else {
calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime)
if err == nil {
log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet))
resultStations = append(resultStations, calcRet...)
}
log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations))
}
}
// 计算失败的测点不返回
result := make([]common_models.Station, 0)
for _, station := range resultStations {
if len(station.Data.ThemeData) > 0 {
////TODO #TEST BEGIN | filterSensorIds 需要排查的测点ID,主要排查分组测点主题数据未入ES的问题。
//filterSensorIds := []int{42, 15, 32, 43, 13, 33, 17, 14}
//for _, number := range filterSensorIds {
// if number == station.Info.Id {
// log.Printf("*************** 116行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
// break
// }
//}
//// #TEST END
// 需要返回的测点
result = append(result, station)
} else {
log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
}
}
// 计算后的 result 可能为空
log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result))
inData.Stations = result
return inData
}
func (gc *GroupCalc) processTimeoutTasks() []common_models.Station {
resultStations := make([]common_models.Station, 0)
calcTasks.Range(func(key, value interface{}) bool {
//log.Println("Key:", key, "Value:", value)
task := value.(CalcTask)
if task.IsTimeout() {
calcRet := task.Calc()
resultStations = append(resultStations, calcRet...)
// ES存储分组主题数据
gc.dumpGroupTheme(&task, calcRet)
calcTasks.Delete(key)
}
return true // 返回 true 继续遍历,返回 false 停止遍历
})
return resultStations
}
// ES存储分组主题
func (gc *GroupCalc) dumpGroupTheme(task *CalcTask, calcedStations []common_models.Station) {
dumpStr, esDoc := task.ToDump(calcedStations)
log.Println(dumpStr)
if len(esDoc.Data) > 0 {
gc.esHandler.SinkGroupDataToES(esDoc)
}
}
// cacheAndCalc 缓存和计算
// station 测点
// dimensionId 采集策略
// taskId 一次周期采集任务
// acqTime 采集时间
func (gc *GroupCalc) cacheAndCalc(station *common_models.Station, dimensionId string, taskId string, acqTime time.Time) ([]common_models.Station, error) {
scg := GetGroupInfo(*station)
var resultStations []common_models.Station
for _, groupStation := range scg {
sGroup := groupStation.Group
if sGroup.Id == 0 || sGroup.GroupType != groupType.Settlement {
return nil, errors.New("非【液压传感器测量沉降】分组")
}
key := GroupCalcTaskKey{GroupId: sGroup.Id, TaskId: taskId}
if value, ok := calcTasks.Load(key); ok {
calcTask := value.(CalcTask)
calcTask.AddStationData(*station)
// 分组计算
if calcTask.CheckIntegrity() {
secs := calcTask.ElapsedSecs()
if secs > 10 {
log.Printf("[groupCalc.processData] wait %f秒, %s\n", secs, key.R())
}
calcRet := calcTask.Calc()
if calcRet != nil && len(calcRet) > 0 {
resultStations = append(resultStations, calcRet...)
}
// ES存储分组主题数据
gc.dumpGroupTheme(&calcTask, calcRet)
calcTasks.Delete(key)
}
} else {
// 新计算任务:取到第一个的采集策略(维度),以便后面获得过期时长
task := NewGroupCalcTask(&sGroup, dimensionId, taskId, acqTime)
task.AddStationData(*station)
if task.CheckIntegrity() {
calcRet := task.Calc()
if calcRet != nil && len(calcRet) > 0 {
resultStations = append(resultStations, calcRet...)
}
// ES存储分组主题数据
gc.dumpGroupTheme(task, calcRet)
} else {
task.SetTimeout()
log.Println("****沉降分组计算任务超期策略dimensionId,taskId,acqTime,injectTime,deadlineTime****", task.stationGroup.Id, task.stationGroup.Name, task.dimensionId, task.taskId, task.acqTime, task.InjectTime, task.DeadlineTime)
calcTasks.Store(key, *task)
}
}
}
return resultStations, nil
}