successfully able to run simple workflows
Signed-off-by: Casey Lee <cplee@nektos.com>
This commit is contained in:
25
pkg/common/dryrun.go
Normal file
25
pkg/common/dryrun.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type dryrunContextKey string
|
||||
|
||||
const dryrunContextKeyVal = dryrunContextKey("dryrun")
|
||||
|
||||
// Dryrun returns true if the current context is dryrun
|
||||
func Dryrun(ctx context.Context) bool {
|
||||
val := ctx.Value(dryrunContextKeyVal)
|
||||
if val != nil {
|
||||
if dryrun, ok := val.(bool); ok {
|
||||
return dryrun
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// WithDryrun adds a value to the context for dryrun
|
||||
func WithDryrun(ctx context.Context, dryrun bool) context.Context {
|
||||
return context.WithValue(ctx, dryrunContextKeyVal, dryrun)
|
||||
}
|
@@ -1,6 +1,7 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -25,26 +26,76 @@ func Warningf(format string, args ...interface{}) Warning {
|
||||
}
|
||||
|
||||
// Executor define contract for the steps of a workflow
|
||||
type Executor func() error
|
||||
type Executor func(ctx context.Context) error
|
||||
|
||||
// Conditional define contract for the conditional predicate
|
||||
type Conditional func() bool
|
||||
type Conditional func(ctx context.Context) bool
|
||||
|
||||
// NewInfoExecutor is an executor that logs messages
|
||||
func NewInfoExecutor(format string, args ...interface{}) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
logger := Logger(ctx)
|
||||
logger.Infof(format, args...)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewPipelineExecutor creates a new executor from a series of other executors
|
||||
func NewPipelineExecutor(executors ...Executor) Executor {
|
||||
return func() error {
|
||||
for _, executor := range executors {
|
||||
if executor == nil {
|
||||
continue
|
||||
if executors == nil {
|
||||
return func(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
var rtn Executor
|
||||
for _, executor := range executors {
|
||||
if rtn == nil {
|
||||
rtn = executor
|
||||
} else {
|
||||
rtn = rtn.Then(executor)
|
||||
}
|
||||
}
|
||||
return rtn
|
||||
}
|
||||
|
||||
// NewConditionalExecutor creates a new executor based on conditions
|
||||
func NewConditionalExecutor(conditional Conditional, trueExecutor Executor, falseExecutor Executor) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
if conditional(ctx) {
|
||||
if trueExecutor != nil {
|
||||
return trueExecutor(ctx)
|
||||
}
|
||||
err := executor()
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case Warning:
|
||||
log.Warning(err.Error())
|
||||
return nil
|
||||
default:
|
||||
log.Debugf("%+v", err)
|
||||
} else {
|
||||
if falseExecutor != nil {
|
||||
return falseExecutor(ctx)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewErrorExecutor creates a new executor that always errors out
|
||||
func NewErrorExecutor(err error) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NewParallelExecutor creates a new executor from a parallel of other executors
|
||||
func NewParallelExecutor(executors ...Executor) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
errChan := make(chan error)
|
||||
|
||||
for _, executor := range executors {
|
||||
go executor.ChannelError(errChan)(ctx)
|
||||
}
|
||||
|
||||
for i := 0; i < len(executors); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -53,48 +104,76 @@ func NewPipelineExecutor(executors ...Executor) Executor {
|
||||
}
|
||||
}
|
||||
|
||||
// NewConditionalExecutor creates a new executor based on conditions
|
||||
func NewConditionalExecutor(conditional Conditional, trueExecutor Executor, falseExecutor Executor) Executor {
|
||||
return func() error {
|
||||
if conditional() {
|
||||
if trueExecutor != nil {
|
||||
return trueExecutor()
|
||||
}
|
||||
} else {
|
||||
if falseExecutor != nil {
|
||||
return falseExecutor()
|
||||
// ChannelError sends error to errChan rather than returning error
|
||||
func (e Executor) ChannelError(errChan chan error) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
errChan <- e(ctx)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Then runs another executor if this executor succeeds
|
||||
func (e Executor) Then(then Executor) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
err := e(ctx)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case Warning:
|
||||
log.Warning(err.Error())
|
||||
default:
|
||||
log.Debugf("%+v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
return then(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// If only runs this executor if conditional is true
|
||||
func (e Executor) If(conditional Conditional) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
if conditional(ctx) {
|
||||
return e(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func executeWithChan(executor Executor, errChan chan error) {
|
||||
errChan <- executor()
|
||||
// IfNot only runs this executor if conditional is true
|
||||
func (e Executor) IfNot(conditional Conditional) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
if !conditional(ctx) {
|
||||
return e(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewErrorExecutor creates a new executor that always errors out
|
||||
func NewErrorExecutor(err error) Executor {
|
||||
return func() error {
|
||||
// IfBool only runs this executor if conditional is true
|
||||
func (e Executor) IfBool(conditional bool) Executor {
|
||||
return e.If(func(ctx context.Context) bool {
|
||||
return conditional
|
||||
})
|
||||
}
|
||||
|
||||
// Finally adds an executor to run after other executor
|
||||
func (e Executor) Finally(finally Executor) Executor {
|
||||
return func(ctx context.Context) error {
|
||||
err := e(ctx)
|
||||
err2 := finally(ctx)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Error occurred running finally: %v (original error: %v)", err2, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NewParallelExecutor creates a new executor from a parallel of other executors
|
||||
func NewParallelExecutor(executors ...Executor) Executor {
|
||||
return func() error {
|
||||
errChan := make(chan error)
|
||||
|
||||
for _, executor := range executors {
|
||||
go executeWithChan(executor, errChan)
|
||||
}
|
||||
|
||||
for i := 0; i < len(executors); i++ {
|
||||
err := <-errChan
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
// Not return an inverted conditional
|
||||
func (c Conditional) Not() Conditional {
|
||||
return func(ctx context.Context) bool {
|
||||
return !c(ctx)
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
@@ -10,58 +11,62 @@ import (
|
||||
func TestNewWorkflow(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// empty
|
||||
emptyWorkflow := NewPipelineExecutor()
|
||||
assert.Nil(emptyWorkflow())
|
||||
assert.Nil(emptyWorkflow(ctx))
|
||||
|
||||
// error case
|
||||
errorWorkflow := NewErrorExecutor(fmt.Errorf("test error"))
|
||||
assert.NotNil(errorWorkflow())
|
||||
assert.NotNil(errorWorkflow(ctx))
|
||||
|
||||
// multiple success case
|
||||
runcount := 0
|
||||
successWorkflow := NewPipelineExecutor(
|
||||
func() error {
|
||||
func(ctx context.Context) error {
|
||||
runcount++
|
||||
return nil
|
||||
},
|
||||
func() error {
|
||||
func(ctx context.Context) error {
|
||||
runcount++
|
||||
return nil
|
||||
})
|
||||
assert.Nil(successWorkflow())
|
||||
assert.Nil(successWorkflow(ctx))
|
||||
assert.Equal(2, runcount)
|
||||
}
|
||||
|
||||
func TestNewConditionalExecutor(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
trueCount := 0
|
||||
falseCount := 0
|
||||
|
||||
err := NewConditionalExecutor(func() bool {
|
||||
err := NewConditionalExecutor(func(ctx context.Context) bool {
|
||||
return false
|
||||
}, func() error {
|
||||
}, func(ctx context.Context) error {
|
||||
trueCount++
|
||||
return nil
|
||||
}, func() error {
|
||||
}, func(ctx context.Context) error {
|
||||
falseCount++
|
||||
return nil
|
||||
})()
|
||||
})(ctx)
|
||||
|
||||
assert.Nil(err)
|
||||
assert.Equal(0, trueCount)
|
||||
assert.Equal(1, falseCount)
|
||||
|
||||
err = NewConditionalExecutor(func() bool {
|
||||
err = NewConditionalExecutor(func(ctx context.Context) bool {
|
||||
return true
|
||||
}, func() error {
|
||||
}, func(ctx context.Context) error {
|
||||
trueCount++
|
||||
return nil
|
||||
}, func() error {
|
||||
}, func(ctx context.Context) error {
|
||||
falseCount++
|
||||
return nil
|
||||
})()
|
||||
})(ctx)
|
||||
|
||||
assert.Nil(err)
|
||||
assert.Equal(1, trueCount)
|
||||
@@ -71,13 +76,15 @@ func TestNewConditionalExecutor(t *testing.T) {
|
||||
func TestNewParallelExecutor(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
count := 0
|
||||
emptyWorkflow := NewPipelineExecutor(func() error {
|
||||
emptyWorkflow := NewPipelineExecutor(func(ctx context.Context) error {
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
|
||||
err := NewParallelExecutor(emptyWorkflow, emptyWorkflow)()
|
||||
err := NewParallelExecutor(emptyWorkflow, emptyWorkflow)(ctx)
|
||||
assert.Equal(2, count)
|
||||
|
||||
assert.Nil(err)
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@@ -190,7 +191,7 @@ type NewGitCloneExecutorInput struct {
|
||||
|
||||
// NewGitCloneExecutor creates an executor to clone git repos
|
||||
func NewGitCloneExecutor(input NewGitCloneExecutorInput) Executor {
|
||||
return func() error {
|
||||
return func(ctx context.Context) error {
|
||||
input.Logger.Infof("git clone '%s' # ref=%s", input.URL, input.Ref)
|
||||
input.Logger.Debugf(" cloning %s to %s", input.URL, input.Dir)
|
||||
|
||||
|
27
pkg/common/logger.go
Normal file
27
pkg/common/logger.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type loggerContextKey string
|
||||
|
||||
const loggerContextKeyVal = loggerContextKey("logrus.FieldLogger")
|
||||
|
||||
// Logger returns the appropriate logger for current context
|
||||
func Logger(ctx context.Context) logrus.FieldLogger {
|
||||
val := ctx.Value(loggerContextKeyVal)
|
||||
if val != nil {
|
||||
if logger, ok := val.(logrus.FieldLogger); ok {
|
||||
return logger
|
||||
}
|
||||
}
|
||||
return logrus.StandardLogger()
|
||||
}
|
||||
|
||||
// WithLogger adds a value to the context for the logger
|
||||
func WithLogger(ctx context.Context, logger logrus.FieldLogger) context.Context {
|
||||
return context.WithValue(ctx, loggerContextKeyVal, logger)
|
||||
}
|
Reference in New Issue
Block a user