You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
115 lines
2.1 KiB
115 lines
2.1 KiB
1 month ago
|
package et_worker
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"node/et_worker/processors"
|
||
|
"node/stages"
|
||
|
)
|
||
|
|
||
|
type ProcessorManager struct {
|
||
|
source processors.ISource
|
||
|
sink processors.ISink
|
||
|
ps []processors.IProcessor
|
||
|
}
|
||
|
|
||
|
func NewProcessorManager() *ProcessorManager {
|
||
|
return &ProcessorManager{}
|
||
|
}
|
||
|
|
||
|
func (the *ProcessorManager) AddSource(source processors.ISource) {
|
||
|
the.source = source
|
||
|
}
|
||
|
|
||
|
func (the *ProcessorManager) AddProcessor(processor processors.IProcessor) {
|
||
|
the.ps = append(the.ps, processor)
|
||
|
}
|
||
|
|
||
|
func (the *ProcessorManager) AddSink(sink processors.ISink) {
|
||
|
the.sink = sink
|
||
|
}
|
||
|
|
||
|
func (the *ProcessorManager) Run(ctx context.Context) error {
|
||
|
var err error
|
||
|
|
||
|
//in, err := the.source.Process(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
//stage1 := stages.NewStage("测点信息获取")
|
||
|
stage2 := stages.NewStage("单测点计算")
|
||
|
out := stage2.StageRun()
|
||
|
for {
|
||
|
the.sink.Process(ctx, <-out)
|
||
|
}
|
||
|
|
||
|
//stage1.AddProcess(Add1)
|
||
|
//stage1.AddProcess(Add1000)
|
||
|
// pipeline构建和执行
|
||
|
//for data := range in {
|
||
|
// for _, v := range the.ps {
|
||
|
// data, err = v.Process(ctx, data)
|
||
|
//
|
||
|
// // 错误集中处理,这里暂跳过
|
||
|
// if err != nil {
|
||
|
// log.Printf("Process err %s\n", err)
|
||
|
// break
|
||
|
// }
|
||
|
// }
|
||
|
//
|
||
|
// err := the.sink.Process(ctx, data)
|
||
|
// if err != nil {
|
||
|
// log.Printf("Sink err %s\n", err)
|
||
|
// return nil
|
||
|
// }
|
||
|
//}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
//func (the *ProcessorManager) RunN(ctx context.Context, maxCnt int) error {
|
||
|
// var err error
|
||
|
//
|
||
|
// in, err := the.source.Process(ctx)
|
||
|
// if err != nil {
|
||
|
// return err
|
||
|
// }
|
||
|
//
|
||
|
// // pipeline构建和执行
|
||
|
// syncProcess := func(data any) {
|
||
|
// for _, v := range the.ps {
|
||
|
// data, err = v.Process(ctx, data)
|
||
|
//
|
||
|
// // 错误集中处理,这里选择提前退出
|
||
|
// if err != nil {
|
||
|
// log.Printf("Process err %s\n", err)
|
||
|
// return
|
||
|
// }
|
||
|
// }
|
||
|
//
|
||
|
// err := the.sink.Process(ctx, data)
|
||
|
// if err != nil {
|
||
|
// log.Printf("Sink err %s\n", err)
|
||
|
// return
|
||
|
// }
|
||
|
// }
|
||
|
//
|
||
|
// wg := sync.WaitGroup{}
|
||
|
// wg.Add(maxCnt)
|
||
|
//
|
||
|
// // 多个协程消费同一个channel
|
||
|
// for i := 0; i < maxCnt; i++ {
|
||
|
// go func() {
|
||
|
// defer wg.Done()
|
||
|
//
|
||
|
// for data := range in {
|
||
|
// syncProcess(data)
|
||
|
// }
|
||
|
// }()
|
||
|
// }
|
||
|
//
|
||
|
// wg.Wait()
|
||
|
//
|
||
|
// return nil
|
||
|
//}
|