et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

51 lines
987 B

package stages
import (
"gitea.anxinyun.cn/container/common_models"
"log"
)
type StageManager struct {
in <-chan *common_models.ProcessData
Stages []Stage
out <-chan *common_models.ProcessData
}
func NewStageManager() *StageManager {
return &StageManager{}
}
func (the *StageManager) Run() {
if len(the.Stages) == 0 {
log.Panicf("Stages.len=%d 无有效处理流程", len(the.Stages))
}
for i := 0; i < len(the.Stages); i++ {
if i == 0 {
the.Stages[i].In = the.in
} else {
the.Stages[i].In = the.Stages[i-1].Out
}
the.Stages[i].StageRun()
}
the.out = the.Stages[len(the.Stages)-1].Out
//todo 替换为sink
go func() {
for {
<-the.out
}
}()
}
func (the *StageManager) AddSource(source <-chan *common_models.ProcessData) {
the.in = source
}
func (the *StageManager) AddOut(source chan *common_models.ProcessData) {
the.out = source
}
func (the *StageManager) AddStages(stages ...Stage) {
the.Stages = append(the.Stages, stages...)
}