Skip to content

Commit f28c532

Browse files
misc: refactor logger of job setup and cleanup (#29)
* add support for additional json fields to output setup clean and pre stage step status * fixes a finish job glitch as well
1 parent 635129e commit f28c532

File tree

2 files changed

+80
-17
lines changed

2 files changed

+80
-17
lines changed

pkg/common/executor.go

+45
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

78
log "github.com/sirupsen/logrus"
@@ -130,6 +131,31 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor {
130131
}
131132
}
132133

134+
func NewFieldExecutor(name string, value interface{}, exec Executor) Executor {
135+
return func(ctx context.Context) error {
136+
return exec(WithLogger(ctx, Logger(ctx).WithField(name, value)))
137+
}
138+
}
139+
140+
// Then runs another executor if this executor succeeds
141+
func (e Executor) ThenError(then func(ctx context.Context, err error) error) Executor {
142+
return func(ctx context.Context) error {
143+
err := e(ctx)
144+
if err != nil {
145+
switch err.(type) {
146+
case Warning:
147+
Logger(ctx).Warning(err.Error())
148+
default:
149+
return then(ctx, err)
150+
}
151+
}
152+
if ctx.Err() != nil {
153+
return ctx.Err()
154+
}
155+
return then(ctx, err)
156+
}
157+
}
158+
133159
// Then runs another executor if this executor succeeds
134160
func (e Executor) Then(then Executor) Executor {
135161
return func(ctx context.Context) error {
@@ -149,6 +175,25 @@ func (e Executor) Then(then Executor) Executor {
149175
}
150176
}
151177

178+
// Then runs another executor if this executor succeeds
179+
func (e Executor) OnError(then Executor) Executor {
180+
return func(ctx context.Context) error {
181+
err := e(ctx)
182+
if err != nil {
183+
switch err.(type) {
184+
case Warning:
185+
Logger(ctx).Warning(err.Error())
186+
default:
187+
return errors.Join(err, then(ctx))
188+
}
189+
}
190+
if ctx.Err() != nil {
191+
return ctx.Err()
192+
}
193+
return nil
194+
}
195+
}
196+
152197
// If only runs this executor if conditional is true
153198
func (e Executor) If(conditional Conditional) Executor {
154199
return func(ctx context.Context) error {

pkg/runner/job_executor.go

+35-17
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
9494
}
9595
}
9696

97-
postExecutor = postExecutor.Finally(func(ctx context.Context) error {
97+
var stopContainerExecutor common.Executor = func(ctx context.Context) error {
9898
jobError := common.JobError(ctx)
9999
var err error
100100
if rc.Config.AutoRemove || jobError == nil {
@@ -108,30 +108,48 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
108108
logger.Errorf("Error while stop job container: %v", err)
109109
}
110110
}
111+
return err
112+
}
113+
114+
var setJobResultExecutor common.Executor = func(ctx context.Context) error {
115+
jobError := common.JobError(ctx)
111116
setJobResult(ctx, info, rc, jobError == nil)
112117
setJobOutputs(ctx, rc)
118+
return nil
119+
}
113120

114-
return err
115-
})
121+
var setJobError = func(ctx context.Context, err error) error {
122+
common.SetJobError(ctx, err)
123+
return nil
124+
}
116125

117126
pipeline := make([]common.Executor, 0)
118-
pipeline = append(pipeline, rc.InitializeNodeTool())
119127
pipeline = append(pipeline, preSteps...)
120128
pipeline = append(pipeline, steps...)
121129

122-
return common.NewPipelineExecutor(info.startContainer(), common.NewPipelineExecutor(pipeline...).
123-
Finally(func(ctx context.Context) error { //nolint:contextcheck
124-
var cancel context.CancelFunc
125-
if ctx.Err() == context.Canceled {
126-
// in case of an aborted run, we still should execute the
127-
// post steps to allow cleanup.
128-
ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), 5*time.Minute)
129-
defer cancel()
130-
}
131-
return postExecutor(ctx)
132-
}).
133-
Finally(info.interpolateOutputs()).
134-
Finally(info.closeContainer()))
130+
return common.NewPipelineExecutor(
131+
common.NewFieldExecutor("step", "Set up job", common.NewFieldExecutor("stepid", []string{"--setup-job"},
132+
common.NewPipelineExecutor(common.NewInfoExecutor("\u2B50 Run Set up job"), info.startContainer(), rc.InitializeNodeTool()).
133+
Then(common.NewFieldExecutor("stepResult", model.StepStatusSuccess, common.NewInfoExecutor(" \u2705 Success - Set up job"))).
134+
OnError(common.NewFieldExecutor("stepResult", model.StepStatusFailure, common.NewInfoExecutor(" \u274C Failure - Set up job")).ThenError(setJobError)))),
135+
common.NewPipelineExecutor(pipeline...).
136+
Finally(func(ctx context.Context) error { //nolint:contextcheck
137+
var cancel context.CancelFunc
138+
if ctx.Err() == context.Canceled {
139+
// in case of an aborted run, we still should execute the
140+
// post steps to allow cleanup.
141+
ctx, cancel = context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), 5*time.Minute)
142+
defer cancel()
143+
}
144+
return postExecutor(ctx)
145+
}).
146+
Finally(common.NewFieldExecutor("step", "Complete job", common.NewFieldExecutor("stepid", []string{"--complete-job"},
147+
common.NewInfoExecutor("\u2B50 Run Complete job").
148+
Finally(stopContainerExecutor).
149+
Finally(
150+
info.interpolateOutputs().Finally(info.closeContainer()).Then(common.NewFieldExecutor("stepResult", model.StepStatusSuccess, common.NewInfoExecutor(" \u2705 Success - Complete job"))).
151+
OnError(common.NewFieldExecutor("stepResult", model.StepStatusFailure, common.NewInfoExecutor(" \u274C Failure - Complete job"))),
152+
)))).Finally(setJobResultExecutor))
135153
}
136154

137155
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {

0 commit comments

Comments
 (0)