Skip to content

Commit 4ef50ee

Browse files
KnisterPeterZauberNerdPhilipp Hinrichsen
authored
feat: handle context cancelation during docker exec (#1170)
* feat: handle context cancelation during docker exec To allow interrupting docker exec (which could be long running) we process the log output in a go routine and handle context cancelation as well as command result. In case of context cancelation a CTRL+C is written into the docker container. This should be enough to terminate the running command. To make sure we do not get stuck during cleanup, we do set the cleanup contexts with a timeout of 5 minutes Co-authored-by: Björn Brauer <[email protected]> Co-authored-by: Philipp Hinrichsen <[email protected]> * feat: handle SIGTERM signal and abort run * test: on context cancel, abort running command This test makes sure that whenever the act Context was canceled, the currently running docker exec is sent a 0x03 (ctrl+c). Co-authored-by: Björn Brauer <[email protected]> * test: make sure the exec funcction handles command exit code This test makes sure that the exec function does handle docker command error results Co-authored-by: Björn Brauer <[email protected]> Co-authored-by: Philipp Hinrichsen <[email protected]> Co-authored-by: Björn Brauer <[email protected]>
1 parent 943a0e6 commit 4ef50ee

File tree

5 files changed

+182
-14
lines changed

5 files changed

+182
-14
lines changed

main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"os"
66
"os/signal"
7+
"syscall"
78

89
"github.com/nektos/act/cmd"
910
)
@@ -16,7 +17,7 @@ func main() {
1617

1718
// trap Ctrl+C and call cancel on the context
1819
c := make(chan os.Signal, 1)
19-
signal.Notify(c, os.Interrupt)
20+
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
2021
defer func() {
2122
signal.Stop(c)
2223
cancel()

pkg/container/docker_run.go

+43-12
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func NewContainer(input *NewContainerInput) Container {
8989

9090
// supportsContainerImagePlatform returns true if the underlying Docker server
9191
// API version is 1.41 and beyond
92-
func supportsContainerImagePlatform(ctx context.Context, cli *client.Client) bool {
92+
func supportsContainerImagePlatform(ctx context.Context, cli client.APIClient) bool {
9393
logger := common.Logger(ctx)
9494
ver, err := cli.ServerVersion(ctx)
9595
if err != nil {
@@ -210,12 +210,12 @@ func (cr *containerReference) ReplaceLogWriter(stdout io.Writer, stderr io.Write
210210
}
211211

212212
type containerReference struct {
213-
cli *client.Client
213+
cli client.APIClient
214214
id string
215215
input *NewContainerInput
216216
}
217217

218-
func GetDockerClient(ctx context.Context) (cli *client.Client, err error) {
218+
func GetDockerClient(ctx context.Context) (cli client.APIClient, err error) {
219219
// TODO: this should maybe need to be a global option, not hidden in here?
220220
// though i'm not sure how that works out when there's another Executor :D
221221
// I really would like something that works on OSX native for eg
@@ -244,7 +244,7 @@ func GetDockerClient(ctx context.Context) (cli *client.Client, err error) {
244244
}
245245

246246
func GetHostInfo(ctx context.Context) (info types.Info, err error) {
247-
var cli *client.Client
247+
var cli client.APIClient
248248
cli, err = GetDockerClient(ctx)
249249
if err != nil {
250250
return info, err
@@ -558,6 +558,30 @@ func (cr *containerReference) exec(cmd []string, env map[string]string, user, wo
558558
}
559559
defer resp.Close()
560560

561+
err = cr.waitForCommand(ctx, isTerminal, resp, idResp, user, workdir)
562+
if err != nil {
563+
return err
564+
}
565+
566+
inspectResp, err := cr.cli.ContainerExecInspect(ctx, idResp.ID)
567+
if err != nil {
568+
return errors.WithStack(err)
569+
}
570+
571+
if inspectResp.ExitCode == 0 {
572+
return nil
573+
}
574+
575+
return fmt.Errorf("exit with `FAILURE`: %v", inspectResp.ExitCode)
576+
}
577+
}
578+
579+
func (cr *containerReference) waitForCommand(ctx context.Context, isTerminal bool, resp types.HijackedResponse, idResp types.IDResponse, user string, workdir string) error {
580+
logger := common.Logger(ctx)
581+
582+
cmdResponse := make(chan error)
583+
584+
go func() {
561585
var outWriter io.Writer
562586
outWriter = cr.input.Stdout
563587
if outWriter == nil {
@@ -568,25 +592,32 @@ func (cr *containerReference) exec(cmd []string, env map[string]string, user, wo
568592
errWriter = os.Stderr
569593
}
570594

595+
var err error
571596
if !isTerminal || os.Getenv("NORAW") != "" {
572597
_, err = stdcopy.StdCopy(outWriter, errWriter, resp.Reader)
573598
} else {
574599
_, err = io.Copy(outWriter, resp.Reader)
575600
}
576-
if err != nil {
577-
logger.Error(err)
578-
}
601+
cmdResponse <- err
602+
}()
579603

580-
inspectResp, err := cr.cli.ContainerExecInspect(ctx, idResp.ID)
604+
select {
605+
case <-ctx.Done():
606+
// send ctrl + c
607+
_, err := resp.Conn.Write([]byte{3})
581608
if err != nil {
582-
return errors.WithStack(err)
609+
logger.Warnf("Failed to send CTRL+C: %+s", err)
583610
}
584611

585-
if inspectResp.ExitCode == 0 {
586-
return nil
612+
// we return the context canceled error to prevent other steps
613+
// from executing
614+
return ctx.Err()
615+
case err := <-cmdResponse:
616+
if err != nil {
617+
logger.Error(err)
587618
}
588619

589-
return fmt.Errorf("exit with `FAILURE`: %v", inspectResp.ExitCode)
620+
return nil
590621
}
591622
}
592623

pkg/container/docker_run_test.go

+118
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package container
22

33
import (
4+
"bufio"
45
"context"
6+
"io"
7+
"net"
8+
"strings"
59
"testing"
10+
"time"
611

12+
"github.com/docker/docker/api/types"
13+
"github.com/docker/docker/client"
714
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/mock"
816
)
917

1018
func TestDocker(t *testing.T) {
@@ -45,3 +53,113 @@ func TestDocker(t *testing.T) {
4553
"CONFLICT_VAR": "I_EXIST_IN_MULTIPLE_PLACES",
4654
}, env)
4755
}
56+
57+
type mockDockerClient struct {
58+
client.APIClient
59+
mock.Mock
60+
}
61+
62+
func (m *mockDockerClient) ContainerExecCreate(ctx context.Context, id string, opts types.ExecConfig) (types.IDResponse, error) {
63+
args := m.Called(ctx, id, opts)
64+
return args.Get(0).(types.IDResponse), args.Error(1)
65+
}
66+
67+
func (m *mockDockerClient) ContainerExecAttach(ctx context.Context, id string, opts types.ExecStartCheck) (types.HijackedResponse, error) {
68+
args := m.Called(ctx, id, opts)
69+
return args.Get(0).(types.HijackedResponse), args.Error(1)
70+
}
71+
72+
func (m *mockDockerClient) ContainerExecInspect(ctx context.Context, execID string) (types.ContainerExecInspect, error) {
73+
args := m.Called(ctx, execID)
74+
return args.Get(0).(types.ContainerExecInspect), args.Error(1)
75+
}
76+
77+
type endlessReader struct {
78+
io.Reader
79+
}
80+
81+
func (r endlessReader) Read(p []byte) (n int, err error) {
82+
return 1, nil
83+
}
84+
85+
type mockConn struct {
86+
net.Conn
87+
mock.Mock
88+
}
89+
90+
func (m *mockConn) Write(b []byte) (n int, err error) {
91+
args := m.Called(b)
92+
return args.Int(0), args.Error(1)
93+
}
94+
95+
func (m *mockConn) Close() (err error) {
96+
return nil
97+
}
98+
99+
func TestDockerExecAbort(t *testing.T) {
100+
ctx, cancel := context.WithCancel(context.Background())
101+
102+
conn := &mockConn{}
103+
conn.On("Write", mock.AnythingOfType("[]uint8")).Return(1, nil)
104+
105+
client := &mockDockerClient{}
106+
client.On("ContainerExecCreate", ctx, "123", mock.AnythingOfType("types.ExecConfig")).Return(types.IDResponse{ID: "id"}, nil)
107+
client.On("ContainerExecAttach", ctx, "id", mock.AnythingOfType("types.ExecStartCheck")).Return(types.HijackedResponse{
108+
Conn: conn,
109+
Reader: bufio.NewReader(endlessReader{}),
110+
}, nil)
111+
112+
cr := &containerReference{
113+
id: "123",
114+
cli: client,
115+
input: &NewContainerInput{
116+
Image: "image",
117+
},
118+
}
119+
120+
channel := make(chan error)
121+
122+
go func() {
123+
channel <- cr.exec([]string{""}, map[string]string{}, "user", "workdir")(ctx)
124+
}()
125+
126+
time.Sleep(500 * time.Millisecond)
127+
128+
cancel()
129+
130+
err := <-channel
131+
assert.ErrorIs(t, err, context.Canceled)
132+
133+
conn.AssertExpectations(t)
134+
client.AssertExpectations(t)
135+
}
136+
137+
func TestDockerExecFailure(t *testing.T) {
138+
ctx := context.Background()
139+
140+
conn := &mockConn{}
141+
142+
client := &mockDockerClient{}
143+
client.On("ContainerExecCreate", ctx, "123", mock.AnythingOfType("types.ExecConfig")).Return(types.IDResponse{ID: "id"}, nil)
144+
client.On("ContainerExecAttach", ctx, "id", mock.AnythingOfType("types.ExecStartCheck")).Return(types.HijackedResponse{
145+
Conn: conn,
146+
Reader: bufio.NewReader(strings.NewReader("output")),
147+
}, nil)
148+
client.On("ContainerExecInspect", ctx, "id").Return(types.ContainerExecInspect{
149+
ExitCode: 1,
150+
}, nil)
151+
152+
cr := &containerReference{
153+
id: "123",
154+
cli: client,
155+
input: &NewContainerInput{
156+
Image: "image",
157+
},
158+
}
159+
160+
err := cr.exec([]string{""}, map[string]string{}, "user", "workdir")(ctx)
161+
assert.Error(t, err, "exit with `FAILURE`: 1")
162+
163+
conn.AssertExpectations(t)
164+
client.AssertExpectations(t)
165+
}

pkg/runner/job_executor.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package runner
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/nektos/act/pkg/common"
89
"github.com/nektos/act/pkg/model"
@@ -100,7 +101,16 @@ func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executo
100101
pipeline = append(pipeline, steps...)
101102

102103
return common.NewPipelineExecutor(pipeline...).
103-
Finally(postExecutor).
104+
Finally(func(ctx context.Context) error {
105+
var cancel context.CancelFunc
106+
if ctx.Err() == context.Canceled {
107+
// in case of an aborted run, we still should execute the
108+
// post steps to allow cleanup.
109+
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
110+
defer cancel()
111+
}
112+
return postExecutor(ctx)
113+
}).
104114
Finally(info.interpolateOutputs()).
105115
Finally(info.closeContainer())
106116
}

pkg/runner/runner.go

+8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"regexp"
99
"runtime"
1010
"strings"
11+
"time"
1112

1213
log "github.com/sirupsen/logrus"
1314

@@ -169,7 +170,14 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
169170
}
170171

171172
if runner.config.AutoRemove && isLastRunningContainer(s, r) {
173+
var cancel context.CancelFunc
174+
if ctx.Err() == context.Canceled {
175+
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
176+
defer cancel()
177+
}
178+
172179
log.Infof("Cleaning up container for job %s", rc.JobName)
180+
173181
if err := rc.stopJobContainer()(ctx); err != nil {
174182
log.Errorf("Error while cleaning container: %v", err)
175183
}

0 commit comments

Comments
 (0)