package et_worker import ( "context" "node/et_worker/processors" "node/stages" ) type ProcessorManager struct { source processors.ISource sink processors.ISink ps []processors.IProcessor } func NewProcessorManager() *ProcessorManager { return &ProcessorManager{} } func (the *ProcessorManager) AddSource(source processors.ISource) { the.source = source } func (the *ProcessorManager) AddProcessor(processor processors.IProcessor) { the.ps = append(the.ps, processor) } func (the *ProcessorManager) AddSink(sink processors.ISink) { the.sink = sink } func (the *ProcessorManager) Run(ctx context.Context) error { var err error //in, err := the.source.Process(ctx) if err != nil { return err } //stage1 := stages.NewStage("测点信息获取") stage2 := stages.NewStage("单测点计算") out := stage2.StageRun() for { the.sink.Process(ctx, <-out) } //stage1.AddProcess(Add1) //stage1.AddProcess(Add1000) // pipeline构建和执行 //for data := range in { // for _, v := range the.ps { // data, err = v.Process(ctx, data) // // // 错误集中处理,这里暂跳过 // if err != nil { // log.Printf("Process err %s\n", err) // break // } // } // // err := the.sink.Process(ctx, data) // if err != nil { // log.Printf("Sink err %s\n", err) // return nil // } //} return nil } //func (the *ProcessorManager) RunN(ctx context.Context, maxCnt int) error { // var err error // // in, err := the.source.Process(ctx) // if err != nil { // return err // } // // // pipeline构建和执行 // syncProcess := func(data any) { // for _, v := range the.ps { // data, err = v.Process(ctx, data) // // // 错误集中处理,这里选择提前退出 // if err != nil { // log.Printf("Process err %s\n", err) // return // } // } // // err := the.sink.Process(ctx, data) // if err != nil { // log.Printf("Sink err %s\n", err) // return // } // } // // wg := sync.WaitGroup{} // wg.Add(maxCnt) // // // 多个协程消费同一个channel // for i := 0; i < maxCnt; i++ { // go func() { // defer wg.Done() // // for data := range in { // syncProcess(data) // } // }() // } // // wg.Wait() // // return nil //}