From 18792f9620fd2fd7dacc53ae30a234fbfc7c81b1 Mon Sep 17 00:00:00 2001 From: Markus Wolf Date: Fri, 25 Feb 2022 19:47:16 +0100 Subject: [PATCH] feat: run jobs in parallel (#1003) * feat: run jobs in parallel This changes fixes and restructures the parallel execution of jobs. The previous changes limiting the parallel execution did break this and allowed only one job in parallel. While we run #CPU jobs in parallel now, the jobs added per job-matrix add to this. So we might over-commit to the capacity, but at least it is limited. * fix: correctly build job pipeline The job pipeline should just append all required pipeline steps. The parallelism will be handled by the ParallelExecutor and we shouldn't handle it during building the pipelines. Also this adds a test, that the ParallelExecutor does run a limited amount of parallel goroutines. * test: correct test implementation Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pkg/common/executor.go | 34 ++++++++++++++++------------------ pkg/common/executor_test.go | 20 ++++++++++++++++---- pkg/runner/runner.go | 12 +++--------- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/pkg/common/executor.go b/pkg/common/executor.go index 5e9b28d..fb76d0b 100644 --- a/pkg/common/executor.go +++ b/pkg/common/executor.go @@ -91,27 +91,33 @@ func NewErrorExecutor(err error) Executor { } // NewParallelExecutor creates a new executor from a parallel of other executors -func NewParallelExecutor(executors ...Executor) Executor { +func NewParallelExecutor(parallel int, executors ...Executor) Executor { return func(ctx context.Context) error { - errChan := make(chan error) + work := make(chan Executor, len(executors)) + errs := make(chan error, len(executors)) - for _, executor := range executors { - e := executor - go func() { - err := e.ChannelError(errChan)(ctx) - if err != nil { - log.Fatal(err) + for i := 0; i < parallel; i++ { + go func(work <-chan Executor, errs chan<- error) { + for executor := range work { + errs <- executor(ctx) } - }() + }(work, errs) } + for i := 0; i < len(executors); i++ { + work <- executors[i] + } + close(work) + // Executor waits all executors to cleanup these resources. var firstErr error for i := 0; i < len(executors); i++ { - if err := <-errChan; err != nil && firstErr == nil { + err := <-errs + if firstErr == nil { firstErr = err } } + if err := ctx.Err(); err != nil { return err } @@ -119,14 +125,6 @@ func NewParallelExecutor(executors ...Executor) Executor { } } -// ChannelError sends error to errChan rather than returning error -func (e Executor) ChannelError(errChan chan error) Executor { - return func(ctx context.Context) error { - errChan <- e(ctx) - return nil - } -} - // Then runs another executor if this executor succeeds func (e Executor) Then(then Executor) Executor { return func(ctx context.Context) error { diff --git a/pkg/common/executor_test.go b/pkg/common/executor_test.go index 17df3b7..7f691e4 100644 --- a/pkg/common/executor_test.go +++ b/pkg/common/executor_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -79,14 +80,25 @@ func TestNewParallelExecutor(t *testing.T) { ctx := context.Background() count := 0 + activeCount := 0 + maxCount := 0 emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error { count++ + + activeCount++ + if activeCount > maxCount { + maxCount = activeCount + } + time.Sleep(2 * time.Second) + activeCount-- + return nil }) - err := NewParallelExecutor(emptyWorkflow, emptyWorkflow)(ctx) - assert.Equal(2, count) + err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx) + assert.Equal(3, count, "should run all 3 executors") + assert.Equal(2, maxCount, "should run at most 2 executors in parallel") assert.Nil(err) } @@ -101,7 +113,7 @@ func TestNewParallelExecutorFailed(t *testing.T) { count++ return fmt.Errorf("fake error") }) - err := NewParallelExecutor(errorWorkflow)(ctx) + err := NewParallelExecutor(1, errorWorkflow)(ctx) assert.Equal(1, count) assert.ErrorIs(context.Canceled, err) } @@ -123,7 +135,7 @@ func TestNewParallelExecutorCanceled(t *testing.T) { count++ return errExpected }) - err := NewParallelExecutor(errorWorkflow, successWorkflow, successWorkflow)(ctx) + err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx) assert.Equal(3, count) assert.Error(errExpected, err) } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 7b26922..d7020dd 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -121,8 +121,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { stage := plan.Stages[i] stagePipeline = append(stagePipeline, func(ctx context.Context) error { pipeline := make([]common.Executor, 0) - stageExecutor := make([]common.Executor, 0) for r, run := range stage.Runs { + stageExecutor := make([]common.Executor, 0) job := run.Job() if job.Strategy != nil { strategyRc := runner.newRunContext(run, nil) @@ -140,7 +140,6 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { maxParallel = len(matrixes) } - b := 0 for i, matrix := range matrixes { rc := runner.newRunContext(run, matrix) rc.JobName = rc.Name @@ -167,15 +166,10 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor { return nil })(common.WithJobErrorContainer(WithJobLogger(ctx, jobName, rc.Config.Secrets, rc.Config.InsecureSecrets))) }) - b++ - if b == maxParallel { - pipeline = append(pipeline, common.NewParallelExecutor(stageExecutor...)) - stageExecutor = make([]common.Executor, 0) - b = 0 - } } + pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...)) } - return common.NewPipelineExecutor(pipeline...)(ctx) + return common.NewParallelExecutor(runtime.NumCPU(), pipeline...)(ctx) }) }