diff --git a/node/stages/stageManage.go b/node/stages/stageManage.go index 1434fd7..2482696 100644 --- a/node/stages/stageManage.go +++ b/node/stages/stageManage.go @@ -3,46 +3,73 @@ package stages import ( "gitea.anxinyun.cn/container/common_models" "log" + "sync" ) type StageManager struct { - in <-chan *common_models.ProcessData - Stages []Stage - out <-chan *common_models.ProcessData + in chan []*common_models.ProcessData + Stages []Stage + out chan []*common_models.ProcessData + resultChan chan []*common_models.ProcessData // 下一个处理环节的输入通道 } -func NewStageManager() *StageManager { - return &StageManager{} +func NewStageManager(resultChan chan []*common_models.ProcessData) *StageManager { + return &StageManager{ + resultChan: resultChan, + } } -func (the *StageManager) Run() { +func (the *StageManager) RunStages() { if len(the.Stages) == 0 { log.Panicf("Stages.len=%d 无有效处理流程", len(the.Stages)) } + + // 连接阶段 for i := 0; i < len(the.Stages); i++ { if i == 0 { the.Stages[i].In = the.in } else { the.Stages[i].In = the.Stages[i-1].Out } - the.Stages[i].StageRun() + the.Stages[i].StageRun() // 将启动环节协程 } the.out = the.Stages[len(the.Stages)-1].Out - //todo 替换为sink + log.Printf("********* stageManage.go the.out=%p", the.out) + + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for { - <-the.out + select { + case _, ok := <-the.out: + if !ok { + log.Printf("StageManager: out channel closed") + return + } + } } }() + + wg.Wait() } -func (the *StageManager) AddSource(source <-chan *common_models.ProcessData) { +func (the *StageManager) AddSource(source chan []*common_models.ProcessData) { + log.Printf("stageManager add source=%p 通道数据:%d/%d", source, len(source), cap(source)) the.in = source } -func (the *StageManager) AddOut(source chan *common_models.ProcessData) { +func (the *StageManager) GetOut() chan []*common_models.ProcessData { + return the.out +} + +func (the *StageManager) GetIn() chan []*common_models.ProcessData { + return the.in +} + +func (the *StageManager) AddOut(source chan []*common_models.ProcessData) { the.out = source }