Browse Source

环节处理改为批处理

dev
yfh 2 months ago
parent
commit
da861825af
  1. 29
      et_calc/dataCalc.go
  2. 36
      et_calc/group/groupCalc.go

29
et_calc/dataCalc.go

@ -17,6 +17,7 @@ import (
"node/stages" "node/stages"
"sort" "sort"
"strings" "strings"
"sync"
) )
type CalcHandler struct { type CalcHandler struct {
@ -26,25 +27,37 @@ type CalcHandler struct {
stage *stages.Stage stage *stages.Stage
} }
func NewCalcHandler() *CalcHandler { func NewCalcHandler(configHelper *common_utils.ConfigHelper, cacheServer *cacheSer.CacheServer, esESHelper *dbHelper.ESHelper) *CalcHandler {
redisAddr := configLoad.LoadConfig().GetString("redis.address")
esAddresses := configLoad.LoadConfig().GetStringSlice("es.addresses")
configHp := common_utils.NewConfigHelper(redisAddr)
the := &CalcHandler{ the := &CalcHandler{
cacheServer: cacheSer.NewCacheServer(configHp), cacheServer: cacheServer,
unitHelper: common_utils.NewUnitHelper(), esESHelper: esESHelper,
esESHelper: dbHelper.NewESHelper(esAddresses, "", ""), unitHelper: common_utils.NewUnitHelper(configHelper),
stage: stages.NewStage("单测点计算"), stage: stages.NewStage("单测点计算"),
} }
the.stage.AddProcess(the.calcFormula) the.stage.AddProcess(the.calcFormulaForStations)
return the return the
} }
func (the *CalcHandler) GetStage() stages.Stage { func (the *CalcHandler) GetStage() stages.Stage {
return *the.stage return *the.stage
} }
func (the *CalcHandler) calcFormulaForStations(data []*common_models.ProcessData) []*common_models.ProcessData {
var wg sync.WaitGroup // 初始化 WaitGroup
for _, processData := range data {
wg.Add(1)
go func(pd *common_models.ProcessData) {
defer wg.Done()
the.calcFormula(pd)
}(processData)
}
wg.Wait()
return data
}
// 单设备测点 // 单设备测点
func (the *CalcHandler) calcFormula(p *common_models.ProcessData) *common_models.ProcessData { func (the *CalcHandler) calcFormula(p *common_models.ProcessData) *common_models.ProcessData {
for i := range p.Stations { for i := range p.Stations {
for _, device := range p.Stations[i].Info.Devices { for _, device := range p.Stations[i].Info.Devices {
//计算结果 //计算结果

36
et_calc/group/groupCalc.go

@ -62,16 +62,16 @@ type GroupCalc struct {
esHandler *et_sink.SinkHandler esHandler *et_sink.SinkHandler
} }
func NewGroupCalc() *GroupCalc { func NewGroupCalc(configHelper *common_utils.ConfigHelper) *GroupCalc {
calcTaskManager := &GroupCalc{ calcTaskManager := &GroupCalc{
stage: stages.NewStage("测点分组计算"), stage: stages.NewStage("测点分组计算"),
configHelper: GetConfigHelper(), configHelper: configHelper,
signCalc: make(chan bool), signCalc: make(chan bool),
esHandler: et_sink.NewSinkGroupHandler(), esHandler: et_sink.NewSinkGroupHandler(),
} }
// 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES) // 添加到 etNode 处理环节,实现数据加工 (缓存group各分项的主题数据 -> 分组计算、分组数据存储ES)
calcTaskManager.stage.AddProcess(calcTaskManager.processData) calcTaskManager.stage.AddProcess(calcTaskManager.calcGroups)
return calcTaskManager return calcTaskManager
} }
@ -79,9 +79,25 @@ func (gc *GroupCalc) GetStage() stages.Stage {
return *gc.stage return *gc.stage
} }
func (gc *GroupCalc) calcGroups(data []*common_models.ProcessData) []*common_models.ProcessData {
var wg sync.WaitGroup
result := make([]*common_models.ProcessData, len(data))
for i, p := range data {
wg.Add(1)
go func(i int, p *common_models.ProcessData) {
defer wg.Done()
result[i] = gc.processData(p)
}(i, p)
}
wg.Wait()
return result
}
// processData 的 stations 被改变了 // processData 的 stations 被改变了
func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData { func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_models.ProcessData {
log.Printf("************* 82行 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations)) log.Printf("********* 分组一次处理开始 len(inData.Stations)=%d *************", len(inData.Stations))
resultStations := make([]common_models.Station, 0) resultStations := make([]common_models.Station, 0)
// 分组超时任务 // 分组超时任务
resultStations = append(resultStations, gc.processTimeoutTasks()...) resultStations = append(resultStations, gc.processTimeoutTasks()...)
@ -95,12 +111,12 @@ func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_mode
calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime) calcRet, err := gc.cacheAndCalc(&station, inData.DeviceData.DimensionId, inData.DeviceData.TaskId, inData.DeviceData.AcqTime)
if err == nil { if err == nil {
log.Printf("************* 95行 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet)) log.Printf("******* 沉降分组计算成功,返回记录数 len(calcRet)={%d} *************", len(calcRet))
resultStations = append(resultStations, calcRet...) resultStations = append(resultStations, calcRet...)
} else { } else {
println(err) println(err)
} }
log.Printf("************* 98行 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations)) log.Printf("******** 分组一次处理,缓存记录数 len(resultStations)={%d} *************", len(resultStations))
} }
} }
@ -112,22 +128,22 @@ func (gc *GroupCalc) processData(inData *common_models.ProcessData) *common_mode
//filterSensorIds := []int{18, 20, 21} //filterSensorIds := []int{18, 20, 21}
//for _, number := range filterSensorIds { //for _, number := range filterSensorIds {
// if number == station.Info.Id { // if number == station.Info.Id {
// log.Printf("*************** 110行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) // log.Printf("******* 122行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
// break // break
// } // }
//} //}
//// #TEST END //// #TEST END
// 需要返回的测点 // 需要返回的测点
//log.Printf("*************** 121行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) //log.Printf("******* 128行 沉降分组测点有返回 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
result = append(result, station) result = append(result, station)
} else { } else {
log.Printf("*************** 119行 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data) log.Printf("****** 分组计算异常的测点数据 %v %v %v %+v *************", station.Info.Id, station.Info.Name, station.Data.CollectTime, station.Data)
} }
} }
// 计算后的 result 可能为空 // 计算后的 result 可能为空
log.Printf("************* 124行 分组一次处理结束 len(inData.Stations)=%d *************", len(result)) log.Printf("****** 分组一次处理结束 len(inData.Stations)=%d *************", len(result))
inData.Stations = result inData.Stations = result
return inData return inData
} }

Loading…
Cancel
Save