diff --git a/node/stages/stage.go b/node/stages/stage.go index 3ca1860..1725a91 100644 --- a/node/stages/stage.go +++ b/node/stages/stage.go @@ -1,7 +1,9 @@ package stages import ( + "fmt" "gitea.anxinyun.cn/container/common_models" + "gitea.anxinyun.cn/container/common_utils/configLoad" "log" "time" ) @@ -9,31 +11,35 @@ import ( // 阶段处理 type Stage struct { Name string - In <-chan *common_models.ProcessData - processFuncs []func(*common_models.ProcessData) *common_models.ProcessData - Out chan *common_models.ProcessData + In <-chan []*common_models.ProcessData + processFuncs []func([]*common_models.ProcessData) []*common_models.ProcessData + Out chan []*common_models.ProcessData execOver chan bool //阶段执行完毕,用以排查超时 } func NewStage(name string) *Stage { + stageBufSize := configLoad.LoadConfig().GetInt64("performance.node.stageBufSize") + return &Stage{ Name: name, - processFuncs: make([]func(*common_models.ProcessData) *common_models.ProcessData, 0), - In: make(<-chan *common_models.ProcessData, 1), - Out: make(chan *common_models.ProcessData, 1), + processFuncs: make([]func([]*common_models.ProcessData) []*common_models.ProcessData, 0), + In: make(<-chan []*common_models.ProcessData, stageBufSize), + Out: make(chan []*common_models.ProcessData, stageBufSize), execOver: make(chan bool, 1), } } -func (s *Stage) StageRun() <-chan *common_models.ProcessData { +func (s *Stage) StageRun() <-chan []*common_models.ProcessData { go func() { defer func() { close(s.Out) log.Printf("[%s]关闭out", s.Name) }() + for n := range s.In { - //log.Printf("[%s]处理数据 %v", s.Name, n.DeviceData.Name) - s.Out <- s.process(n) + log.Printf("[%s]接收数据 In[%p] 通道长度=%d/%d", s.Name, s.In, len(s.In), cap(s.In)) + result := s.process(n) + s.Out <- result } log.Printf("%s over", s.Name) }() @@ -41,27 +47,30 @@ func (s *Stage) StageRun() <-chan *common_models.ProcessData { return s.Out } -// AddProcess 添加处理者。处理函数定义 func(*ProcessData) *ProcessData -func (s *Stage) AddProcess(fun func(*common_models.ProcessData) *common_models.ProcessData) { +func (s *Stage) AddProcess(fun func([]*common_models.ProcessData) []*common_models.ProcessData) { s.processFuncs = append(s.processFuncs, fun) } -func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessData { - go s.handlerTimeOutCheck(s.Name, data.DeviceData.DeviceId) +func (s *Stage) process(data []*common_models.ProcessData) []*common_models.ProcessData { + go s.handlerTimeOutCheck(s.Name, "批量处理耗时跟踪") + for _, processFunc := range s.processFuncs { //tag := fmt.Sprintf("%d/%d", i+1, len(s.processFuncs)) - //log.Printf("stage[%s]流程处理 start=> %s", s.Name, data.DeviceData.Name) + log.Printf("stage[%s] start, len(data)=%d", s.Name, len(data)) func() { - defer timeCost(s.Name, data.DeviceData.DeviceId, time.Now()) + defer timeCost(s.Name, fmt.Sprintf("ProcessData数组元素个数[%d]", len(data)), time.Now()) data = processFunc(data) }() - //log.Printf("stage[%s]流程处理 over=> %s", s.Name, data.DeviceData.Name) + log.Printf("stage[%s] over, len(data)=%d", s.Name, len(data)) } + s.execOver <- true return data } + func (s *Stage) handlerTimeOutCheck(stageName, deviceId string) { - defaultTimeout := 240 * time.Second // 4分钟 + stageTimeout := configLoad.LoadConfig().GetInt64("performance.node.stageTimeout") + defaultTimeout := time.Duration(stageTimeout) * time.Second select { case <-s.execOver: case <-time.After(defaultTimeout):