Browse Source

processFuncs入参和输出都改为数组

dev
yfh 2 months ago
parent
commit
7a07f7fe96
  1. 43
      node/stages/stage.go

43
node/stages/stage.go

@ -1,7 +1,9 @@
package stages package stages
import ( import (
"fmt"
"gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models"
"gitea.anxinyun.cn/container/common_utils/configLoad"
"log" "log"
"time" "time"
) )
@ -9,31 +11,35 @@ import (
// 阶段处理 // 阶段处理
type Stage struct { type Stage struct {
Name string Name string
In <-chan *common_models.ProcessData In <-chan []*common_models.ProcessData
processFuncs []func(*common_models.ProcessData) *common_models.ProcessData processFuncs []func([]*common_models.ProcessData) []*common_models.ProcessData
Out chan *common_models.ProcessData Out chan []*common_models.ProcessData
execOver chan bool //阶段执行完毕,用以排查超时 execOver chan bool //阶段执行完毕,用以排查超时
} }
func NewStage(name string) *Stage { func NewStage(name string) *Stage {
stageBufSize := configLoad.LoadConfig().GetInt64("performance.node.stageBufSize")
return &Stage{ return &Stage{
Name: name, Name: name,
processFuncs: make([]func(*common_models.ProcessData) *common_models.ProcessData, 0), processFuncs: make([]func([]*common_models.ProcessData) []*common_models.ProcessData, 0),
In: make(<-chan *common_models.ProcessData, 1), In: make(<-chan []*common_models.ProcessData, stageBufSize),
Out: make(chan *common_models.ProcessData, 1), Out: make(chan []*common_models.ProcessData, stageBufSize),
execOver: make(chan bool, 1), execOver: make(chan bool, 1),
} }
} }
func (s *Stage) StageRun() <-chan *common_models.ProcessData { func (s *Stage) StageRun() <-chan []*common_models.ProcessData {
go func() { go func() {
defer func() { defer func() {
close(s.Out) close(s.Out)
log.Printf("[%s]关闭out", s.Name) log.Printf("[%s]关闭out", s.Name)
}() }()
for n := range s.In { for n := range s.In {
//log.Printf("[%s]处理数据 %v", s.Name, n.DeviceData.Name) log.Printf("[%s]接收数据 In[%p] 通道长度=%d/%d", s.Name, s.In, len(s.In), cap(s.In))
s.Out <- s.process(n) result := s.process(n)
s.Out <- result
} }
log.Printf("%s over", s.Name) log.Printf("%s over", s.Name)
}() }()
@ -41,27 +47,30 @@ func (s *Stage) StageRun() <-chan *common_models.ProcessData {
return s.Out return s.Out
} }
// AddProcess 添加处理者。处理函数定义 func(*ProcessData) *ProcessData func (s *Stage) AddProcess(fun func([]*common_models.ProcessData) []*common_models.ProcessData) {
func (s *Stage) AddProcess(fun func(*common_models.ProcessData) *common_models.ProcessData) {
s.processFuncs = append(s.processFuncs, fun) s.processFuncs = append(s.processFuncs, fun)
} }
func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessData { func (s *Stage) process(data []*common_models.ProcessData) []*common_models.ProcessData {
go s.handlerTimeOutCheck(s.Name, data.DeviceData.DeviceId) go s.handlerTimeOutCheck(s.Name, "批量处理耗时跟踪")
for _, processFunc := range s.processFuncs { for _, processFunc := range s.processFuncs {
//tag := fmt.Sprintf("%d/%d", i+1, len(s.processFuncs)) //tag := fmt.Sprintf("%d/%d", i+1, len(s.processFuncs))
//log.Printf("stage[%s]流程处理 start=> %s", s.Name, data.DeviceData.Name) log.Printf("stage[%s] start, len(data)=%d", s.Name, len(data))
func() { func() {
defer timeCost(s.Name, data.DeviceData.DeviceId, time.Now()) defer timeCost(s.Name, fmt.Sprintf("ProcessData数组元素个数[%d]", len(data)), time.Now())
data = processFunc(data) data = processFunc(data)
}() }()
//log.Printf("stage[%s]流程处理 over=> %s", s.Name, data.DeviceData.Name) log.Printf("stage[%s] over, len(data)=%d", s.Name, len(data))
} }
s.execOver <- true s.execOver <- true
return data return data
} }
func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) { func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) {
defaultTimeout := 240 * time.Second // 4分钟 stageTimeout := configLoad.LoadConfig().GetInt64("performance.node.stageTimeout")
defaultTimeout := time.Duration(stageTimeout) * time.Second
select { select {
case <-s.execOver: case <-s.execOver:
case <-time.After(defaultTimeout): case <-time.After(defaultTimeout):

Loading…
Cancel
Save