package stages import ( "gitea.anxinyun.cn/container/common_models" "log" "time" ) // 阶段处理 type Stage struct { Name string In <-chan *common_models.ProcessData processFuncs []func(*common_models.ProcessData) *common_models.ProcessData Out chan *common_models.ProcessData } func NewStage(name string) *Stage { return &Stage{ Name: name, processFuncs: make([]func(*common_models.ProcessData) *common_models.ProcessData, 0), In: make(<-chan *common_models.ProcessData, 1), Out: make(chan *common_models.ProcessData, 1), } } func (s *Stage) StageRun() <-chan *common_models.ProcessData { go func() { defer func() { close(s.Out) log.Printf("[%s]关闭out", s.Name) }() for n := range s.In { //log.Printf("[%s]处理数据 %v", s.Name, n.DeviceData.Name) s.Out <- s.process(n) } log.Printf("%s over", s.Name) }() return s.Out } // AddProcess 添加处理者。处理函数定义 func(*ProcessData) *ProcessData func (s *Stage) AddProcess(fun func(*common_models.ProcessData) *common_models.ProcessData) { s.processFuncs = append(s.processFuncs, fun) } func (s *Stage) process(data *common_models.ProcessData) *common_models.ProcessData { for _, processFunc := range s.processFuncs { //tag := fmt.Sprintf("%d/%d", i+1, len(s.processFuncs)) //log.Printf("stage[%s][%s]流程处理 start=> %s", s.Name, tag, data.DeviceData.Name) func() { defer timeCost(s.Name, data.DeviceData.DeviceId, time.Now()) data = processFunc(data) }() //log.Printf("stage[%s]流程处理 over=> %s", s.Name, data.DeviceData.Name) } return data } func timeCost(nodeId, deviceId string, start time.Time) { tc := time.Since(start) log.Printf("stage[%s] ->[%s] 耗时 = %v", nodeId, deviceId, tc) }