* feat: run jobs in parallel This changes fixes and restructures the parallel execution of jobs. The previous changes limiting the parallel execution did break this and allowed only one job in parallel. While we run #CPU jobs in parallel now, the jobs added per job-matrix add to this. So we might over-commit to the capacity, but at least it is limited. * fix: correctly build job pipeline The job pipeline should just append all required pipeline steps. The parallelism will be handled by the ParallelExecutor and we shouldn't handle it during building the pipelines. Also this adds a test, that the ParallelExecutor does run a limited amount of parallel goroutines. * test: correct test implementation Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
		
			
				
	
	
		
			142 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			142 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package common
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"testing"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/stretchr/testify/assert"
 | |
| )
 | |
| 
 | |
| func TestNewWorkflow(t *testing.T) {
 | |
| 	assert := assert.New(t)
 | |
| 
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	// empty
 | |
| 	emptyWorkflow := NewPipelineExecutor()
 | |
| 	assert.Nil(emptyWorkflow(ctx))
 | |
| 
 | |
| 	// error case
 | |
| 	errorWorkflow := NewErrorExecutor(fmt.Errorf("test error"))
 | |
| 	assert.NotNil(errorWorkflow(ctx))
 | |
| 
 | |
| 	// multiple success case
 | |
| 	runcount := 0
 | |
| 	successWorkflow := NewPipelineExecutor(
 | |
| 		func(ctx context.Context) error {
 | |
| 			runcount++
 | |
| 			return nil
 | |
| 		},
 | |
| 		func(ctx context.Context) error {
 | |
| 			runcount++
 | |
| 			return nil
 | |
| 		})
 | |
| 	assert.Nil(successWorkflow(ctx))
 | |
| 	assert.Equal(2, runcount)
 | |
| }
 | |
| 
 | |
| func TestNewConditionalExecutor(t *testing.T) {
 | |
| 	assert := assert.New(t)
 | |
| 
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	trueCount := 0
 | |
| 	falseCount := 0
 | |
| 
 | |
| 	err := NewConditionalExecutor(func(ctx context.Context) bool {
 | |
| 		return false
 | |
| 	}, func(ctx context.Context) error {
 | |
| 		trueCount++
 | |
| 		return nil
 | |
| 	}, func(ctx context.Context) error {
 | |
| 		falseCount++
 | |
| 		return nil
 | |
| 	})(ctx)
 | |
| 
 | |
| 	assert.Nil(err)
 | |
| 	assert.Equal(0, trueCount)
 | |
| 	assert.Equal(1, falseCount)
 | |
| 
 | |
| 	err = NewConditionalExecutor(func(ctx context.Context) bool {
 | |
| 		return true
 | |
| 	}, func(ctx context.Context) error {
 | |
| 		trueCount++
 | |
| 		return nil
 | |
| 	}, func(ctx context.Context) error {
 | |
| 		falseCount++
 | |
| 		return nil
 | |
| 	})(ctx)
 | |
| 
 | |
| 	assert.Nil(err)
 | |
| 	assert.Equal(1, trueCount)
 | |
| 	assert.Equal(1, falseCount)
 | |
| }
 | |
| 
 | |
| func TestNewParallelExecutor(t *testing.T) {
 | |
| 	assert := assert.New(t)
 | |
| 
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	count := 0
 | |
| 	activeCount := 0
 | |
| 	maxCount := 0
 | |
| 	emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
 | |
| 		count++
 | |
| 
 | |
| 		activeCount++
 | |
| 		if activeCount > maxCount {
 | |
| 			maxCount = activeCount
 | |
| 		}
 | |
| 		time.Sleep(2 * time.Second)
 | |
| 		activeCount--
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	err := NewParallelExecutor(2, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
 | |
| 
 | |
| 	assert.Equal(3, count, "should run all 3 executors")
 | |
| 	assert.Equal(2, maxCount, "should run at most 2 executors in parallel")
 | |
| 	assert.Nil(err)
 | |
| }
 | |
| 
 | |
| func TestNewParallelExecutorFailed(t *testing.T) {
 | |
| 	assert := assert.New(t)
 | |
| 
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	cancel()
 | |
| 
 | |
| 	count := 0
 | |
| 	errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
 | |
| 		count++
 | |
| 		return fmt.Errorf("fake error")
 | |
| 	})
 | |
| 	err := NewParallelExecutor(1, errorWorkflow)(ctx)
 | |
| 	assert.Equal(1, count)
 | |
| 	assert.ErrorIs(context.Canceled, err)
 | |
| }
 | |
| 
 | |
| func TestNewParallelExecutorCanceled(t *testing.T) {
 | |
| 	assert := assert.New(t)
 | |
| 
 | |
| 	ctx, cancel := context.WithCancel(context.Background())
 | |
| 	cancel()
 | |
| 
 | |
| 	errExpected := fmt.Errorf("fake error")
 | |
| 
 | |
| 	count := 0
 | |
| 	successWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
 | |
| 		count++
 | |
| 		return nil
 | |
| 	})
 | |
| 	errorWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
 | |
| 		count++
 | |
| 		return errExpected
 | |
| 	})
 | |
| 	err := NewParallelExecutor(3, errorWorkflow, successWorkflow, successWorkflow)(ctx)
 | |
| 	assert.Equal(3, count)
 | |
| 	assert.Error(errExpected, err)
 | |
| }
 |