Browse Source

processFuncs入参和输出都改为数组

dev
yfh 2 months ago
parent
commit
a68d032eb8
  1. 49
      node/stages/stageManage.go

49
node/stages/stageManage.go

@ -3,46 +3,73 @@ package stages
import ( import (
"gitea.anxinyun.cn/container/common_models" "gitea.anxinyun.cn/container/common_models"
"log" "log"
"sync"
) )
type StageManager struct { type StageManager struct {
in <-chan *common_models.ProcessData in chan []*common_models.ProcessData
Stages []Stage Stages []Stage
out <-chan *common_models.ProcessData out chan []*common_models.ProcessData
resultChan chan []*common_models.ProcessData // 下一个处理环节的输入通道
} }
func NewStageManager() *StageManager { func NewStageManager(resultChan chan []*common_models.ProcessData) *StageManager {
return &StageManager{} return &StageManager{
resultChan: resultChan,
}
} }
func (the *StageManager) Run() { func (the *StageManager) RunStages() {
if len(the.Stages) == 0 { if len(the.Stages) == 0 {
log.Panicf("Stages.len=%d 无有效处理流程", len(the.Stages)) log.Panicf("Stages.len=%d 无有效处理流程", len(the.Stages))
} }
// 连接阶段
for i := 0; i < len(the.Stages); i++ { for i := 0; i < len(the.Stages); i++ {
if i == 0 { if i == 0 {
the.Stages[i].In = the.in the.Stages[i].In = the.in
} else { } else {
the.Stages[i].In = the.Stages[i-1].Out the.Stages[i].In = the.Stages[i-1].Out
} }
the.Stages[i].StageRun()
the.Stages[i].StageRun() // 将启动环节协程
} }
the.out = the.Stages[len(the.Stages)-1].Out the.out = the.Stages[len(the.Stages)-1].Out
//todo 替换为sink log.Printf("********* stageManage.go the.out=%p", the.out)
var wg sync.WaitGroup
wg.Add(1)
go func() { go func() {
defer wg.Done()
for { for {
<-the.out select {
case _, ok := <-the.out:
if !ok {
log.Printf("StageManager: out channel closed")
return
}
}
} }
}() }()
wg.Wait()
} }
func (the *StageManager) AddSource(source <-chan *common_models.ProcessData) { func (the *StageManager) AddSource(source chan []*common_models.ProcessData) {
log.Printf("stageManager add source=%p 通道数据:%d/%d", source, len(source), cap(source))
the.in = source the.in = source
} }
func (the *StageManager) AddOut(source chan *common_models.ProcessData) { func (the *StageManager) GetOut() chan []*common_models.ProcessData {
return the.out
}
func (the *StageManager) GetIn() chan []*common_models.ProcessData {
return the.in
}
func (the *StageManager) AddOut(source chan []*common_models.ProcessData) {
the.out = source the.out = source
} }

Loading…
Cancel
Save