et-go 20240919重建
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
2.1 KiB

1 month ago
package et_worker
import (
"context"
"node/et_worker/processors"
"node/stages"
)
type ProcessorManager struct {
source processors.ISource
sink processors.ISink
ps []processors.IProcessor
}
func NewProcessorManager() *ProcessorManager {
return &ProcessorManager{}
}
func (the *ProcessorManager) AddSource(source processors.ISource) {
the.source = source
}
func (the *ProcessorManager) AddProcessor(processor processors.IProcessor) {
the.ps = append(the.ps, processor)
}
func (the *ProcessorManager) AddSink(sink processors.ISink) {
the.sink = sink
}
func (the *ProcessorManager) Run(ctx context.Context) error {
var err error
//in, err := the.source.Process(ctx)
if err != nil {
return err
}
//stage1 := stages.NewStage("测点信息获取")
stage2 := stages.NewStage("单测点计算")
out := stage2.StageRun()
for {
the.sink.Process(ctx, <-out)
}
//stage1.AddProcess(Add1)
//stage1.AddProcess(Add1000)
// pipeline构建和执行
//for data := range in {
// for _, v := range the.ps {
// data, err = v.Process(ctx, data)
//
// // 错误集中处理,这里暂跳过
// if err != nil {
// log.Printf("Process err %s\n", err)
// break
// }
// }
//
// err := the.sink.Process(ctx, data)
// if err != nil {
// log.Printf("Sink err %s\n", err)
// return nil
// }
//}
return nil
}
//func (the *ProcessorManager) RunN(ctx context.Context, maxCnt int) error {
// var err error
//
// in, err := the.source.Process(ctx)
// if err != nil {
// return err
// }
//
// // pipeline构建和执行
// syncProcess := func(data any) {
// for _, v := range the.ps {
// data, err = v.Process(ctx, data)
//
// // 错误集中处理,这里选择提前退出
// if err != nil {
// log.Printf("Process err %s\n", err)
// return
// }
// }
//
// err := the.sink.Process(ctx, data)
// if err != nil {
// log.Printf("Sink err %s\n", err)
// return
// }
// }
//
// wg := sync.WaitGroup{}
// wg.Add(maxCnt)
//
// // 多个协程消费同一个channel
// for i := 0; i < maxCnt; i++ {
// go func() {
// defer wg.Done()
//
// for data := range in {
// syncProcess(data)
// }
// }()
// }
//
// wg.Wait()
//
// return nil
//}