Skip to content

Commit 3ac2b72

Browse files
psaPaul Armstrongwolfogre
authored
Fix bug in processing jobs on platforms without Docker (#1834)
* Log incoming jobs. Log the full contents of the job protobuf to make debugging jobs easier * Ensure that the parallel executor always uses at least one thread. The caller may mis-calculate the number of CPUs as zero, in which case ensure that at least one thread is spawned. * Use runtime.NumCPU for CPU counts. For hosts without docker, GetHostInfo() returns a blank struct which has zero CPUs and causes downstream trouble. --------- Co-authored-by: Paul Armstrong <[email protected]> Co-authored-by: Jason Song <[email protected]>
1 parent c70a674 commit 3ac2b72

File tree

4 files changed

+56
-8
lines changed

4 files changed

+56
-8
lines changed

pkg/common/executor.go

+7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package common
33
import (
44
"context"
55
"fmt"
6+
7+
log "github.com/sirupsen/logrus"
68
)
79

810
// Warning that implements `error` but safe to ignore
@@ -94,6 +96,11 @@ func NewParallelExecutor(parallel int, executors ...Executor) Executor {
9496
work := make(chan Executor, len(executors))
9597
errs := make(chan error, len(executors))
9698

99+
if 1 > parallel {
100+
log.Infof("Parallel tasks (%d) below minimum, setting to 1", parallel)
101+
parallel = 1
102+
}
103+
97104
for i := 0; i < parallel; i++ {
98105
go func(work <-chan Executor, errs chan<- error) {
99106
for executor := range work {

pkg/common/executor_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ func TestNewParallelExecutor(t *testing.T) {
100100
assert.Equal(3, count, "should run all 3 executors")
101101
assert.Equal(2, maxCount, "should run at most 2 executors in parallel")
102102
assert.Nil(err)
103+
104+
// Reset to test running the executor with 0 parallelism
105+
count = 0
106+
activeCount = 0
107+
maxCount = 0
108+
109+
errSingle := NewParallelExecutor(0, emptyWorkflow, emptyWorkflow, emptyWorkflow)(ctx)
110+
111+
assert.Equal(3, count, "should run all 3 executors")
112+
assert.Equal(1, maxCount, "should run at most 1 executors in parallel")
113+
assert.Nil(errSingle)
103114
}
104115

105116
func TestNewParallelExecutorFailed(t *testing.T) {

pkg/model/workflow.go

+1
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ func (j *Job) GetMatrixes() ([]map[string]interface{}, error) {
417417
}
418418
} else {
419419
matrixes = append(matrixes, make(map[string]interface{}))
420+
log.Debugf("Empty Strategy, matrixes=%v", matrixes)
420421
}
421422
return matrixes, nil
422423
}

pkg/runner/runner.go

+37-8
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import (
55
"encoding/json"
66
"fmt"
77
"os"
8+
"runtime"
89

910
log "github.com/sirupsen/logrus"
1011

1112
"github.com/nektos/act/pkg/common"
12-
"github.com/nektos/act/pkg/container"
1313
"github.com/nektos/act/pkg/model"
1414
)
1515

@@ -103,15 +103,45 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
103103
maxJobNameLen := 0
104104

105105
stagePipeline := make([]common.Executor, 0)
106+
log.Debugf("Plan Stages: %v", plan.Stages)
107+
106108
for i := range plan.Stages {
107109
stage := plan.Stages[i]
108110
stagePipeline = append(stagePipeline, func(ctx context.Context) error {
109111
pipeline := make([]common.Executor, 0)
110112
for _, run := range stage.Runs {
113+
log.Debugf("Stages Runs: %v", stage.Runs)
111114
stageExecutor := make([]common.Executor, 0)
112115
job := run.Job()
116+
log.Debugf("Job.Name: %v", job.Name)
117+
log.Debugf("Job.RawNeeds: %v", job.RawNeeds)
118+
log.Debugf("Job.RawRunsOn: %v", job.RawRunsOn)
119+
log.Debugf("Job.Env: %v", job.Env)
120+
log.Debugf("Job.If: %v", job.If)
121+
for step := range job.Steps {
122+
if nil != job.Steps[step] {
123+
log.Debugf("Job.Steps: %v", job.Steps[step].String())
124+
}
125+
}
126+
log.Debugf("Job.TimeoutMinutes: %v", job.TimeoutMinutes)
127+
log.Debugf("Job.Services: %v", job.Services)
128+
log.Debugf("Job.Strategy: %v", job.Strategy)
129+
log.Debugf("Job.RawContainer: %v", job.RawContainer)
130+
log.Debugf("Job.Defaults.Run.Shell: %v", job.Defaults.Run.Shell)
131+
log.Debugf("Job.Defaults.Run.WorkingDirectory: %v", job.Defaults.Run.WorkingDirectory)
132+
log.Debugf("Job.Outputs: %v", job.Outputs)
133+
log.Debugf("Job.Uses: %v", job.Uses)
134+
log.Debugf("Job.With: %v", job.With)
135+
// log.Debugf("Job.RawSecrets: %v", job.RawSecrets)
136+
log.Debugf("Job.Result: %v", job.Result)
113137

114138
if job.Strategy != nil {
139+
log.Debugf("Job.Strategy.FailFast: %v", job.Strategy.FailFast)
140+
log.Debugf("Job.Strategy.MaxParallel: %v", job.Strategy.MaxParallel)
141+
log.Debugf("Job.Strategy.FailFastString: %v", job.Strategy.FailFastString)
142+
log.Debugf("Job.Strategy.MaxParallelString: %v", job.Strategy.MaxParallelString)
143+
log.Debugf("Job.Strategy.RawMatrix: %v", job.Strategy.RawMatrix)
144+
115145
strategyRc := runner.newRunContext(ctx, run, nil)
116146
if err := strategyRc.NewExpressionEvaluator(ctx).EvaluateYamlNode(ctx, &job.Strategy.RawMatrix); err != nil {
117147
log.Errorf("Error while evaluating matrix: %v", err)
@@ -122,6 +152,8 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
122152
if m, err := job.GetMatrixes(); err != nil {
123153
log.Errorf("Error while get job's matrix: %v", err)
124154
} else {
155+
log.Debugf("Job Matrices: %v", m)
156+
log.Debugf("Runner Matrices: %v", runner.config.Matrix)
125157
matrixes = selectMatrixes(m, runner.config.Matrix)
126158
}
127159
log.Debugf("Final matrix after applying user inclusions '%v'", matrixes)
@@ -152,14 +184,11 @@ func (runner *runnerImpl) NewPlanExecutor(plan *model.Plan) common.Executor {
152184
}
153185
pipeline = append(pipeline, common.NewParallelExecutor(maxParallel, stageExecutor...))
154186
}
155-
var ncpu int
156-
info, err := container.GetHostInfo(ctx)
157-
if err != nil {
158-
log.Errorf("failed to obtain container engine info: %s", err)
159-
ncpu = 1 // sane default?
160-
} else {
161-
ncpu = info.NCPU
187+
ncpu := runtime.NumCPU()
188+
if 1 > ncpu {
189+
ncpu = 1
162190
}
191+
log.Debugf("Detected CPUs: %d", ncpu)
163192
return common.NewParallelExecutor(ncpu, pipeline...)(ctx)
164193
})
165194
}

0 commit comments

Comments
 (0)