Skip to content

Commit 42dc9a1

Browse files
committed
Handle workflow 'waiting' action, Introduce CREATE_VM_DELAY env
1 parent c1579f2 commit 42dc9a1

File tree

3 files changed

+42
-8
lines changed

3 files changed

+42
-8
lines changed

runner-autoscaler/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ The scaler is configured via the following environment variables:
3434
| PROJECT_ID | "" | The Google Cloud Project Id. |
3535
| ZONE | "" | The Google Cloud zone where the VM instance will be created. |
3636
| TASK_QUEUE | "" | The relative resource name of the Cloud Task queue. |
37+
| CREATE_VM_DELAY | "10" | The delay in seconds to wait before the VM is created. Useful for skipping the VM creation if the workflow job is canceled by the user shortly afterwards. |
3738
| INSTANCE_TEMPLATE | "" | The relative resource name of the instance template from which the VM instance will be created. |
3839
| SECRET_VERSION | "" | The relative resource name of the secret version which contains the PAT or PAT classic. |
3940
| RUNNER_PREFIX | "runner" | Prefix for the the name of a new VM instance. A random string (10 random lower case characters) will be added to make the name unique: "<prefix>-<random_string>". |

runner-autoscaler/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func main() {
7777
RunnerLabels: []string{},
7878
RegisteredSources: map[string]pkg.Source{},
7979
SourceQueryParam: getEnvDefault("SOURCE_QUERY_PARAM_NAME", "src"),
80+
CreateVmDelay: getEnvDefaultInt64("CREATE_VM_DELAY", 10),
8081
}
8182

8283
if enterpriseEnv := strings.Split(getEnvDefault("GITHUB_ENTERPRISE", ""), ";"); len(enterpriseEnv) == 2 {

runner-autoscaler/pkg/srv.go

+40-8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"net/url"
1515
"regexp"
1616
"strings"
17+
"time"
1718

1819
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
1920
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
@@ -498,16 +499,17 @@ func (s *Autoscaler) GenerateRunnerJitConfig(ctx context.Context, url string, ru
498499
}
499500
}
500501

501-
func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string, secret string, job Job) error {
502+
func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string, secret string, job Job, delay time.Duration) error {
502503

503504
data, _ := json.Marshal(job)
504505
now := timestamppb.Now()
505-
now.Seconds += 1 // delay the callback a little bit
506+
now.Seconds += int64(delay.Seconds())
506507
req := &taskspb.CreateTaskRequest{
507508
Parent: s.conf.TaskQueue,
508509
Task: &taskspb.Task{
510+
Name: fmt.Sprintf("%s/tasks/%d", s.conf.TaskQueue, job.Id),
509511
DispatchDeadline: &durationpb.Duration{
510-
Seconds: 120, // the timeout of the cloud task callback
512+
Seconds: 120, // the timeout of the cloud task callback - must be greater the time it takes to start the VM
511513
Nanos: 0,
512514
},
513515
ScheduleTime: now,
@@ -529,11 +531,25 @@ func (s *Autoscaler) createCallbackTaskWithToken(ctx context.Context, url string
529531
defer client.Close()
530532
_, err := client.CreateTask(ctx, req)
531533
if err != nil {
532-
return fmt.Errorf("cloudtasks.CreateTask failed: %v", err)
534+
return fmt.Errorf("cloudtasks.CreateTask failed for job Id %d: %v", job.Id, err)
533535
} else {
534-
log.Infof("Created cloud task callback with url \"%s\" and payload \"%s\"", url, data)
536+
log.Infof("Created cloud task callback for workflow job Id %d with url \"%s\" and payload \"%s\"", job.Id, url, data)
535537
}
538+
return nil
539+
}
536540

541+
func (s *Autoscaler) deleteCallbackTask(ctx context.Context, job Job) error {
542+
543+
client := newTaskClient(ctx)
544+
defer client.Close()
545+
err := client.DeleteTask(ctx, &taskspb.DeleteTaskRequest{
546+
Name: fmt.Sprintf("%s/tasks/%d", s.conf.TaskQueue, job.Id),
547+
})
548+
if err != nil {
549+
return fmt.Errorf("cloudtasks.DeleteTask failed for job Id %d: %v", job.Id, err)
550+
} else {
551+
log.Infof("Deleted cloud task callback for workflow job Id %d", job.Id)
552+
}
537553
return nil
538554
}
539555

@@ -609,7 +625,7 @@ func (s *Autoscaler) handleCreateVm(ctx *gin.Context) {
609625
}, job.Labels)
610626
case TypeRepository:
611627
log.Infof("Using jit config for runner registration for repository: %s", src.Name)
612-
// For repositories there is an implicit runner group with id 1
628+
// for repositories there is an implicit runner group with id 1
613629
s.createVmWithJitConfig(ctx, fmt.Sprintf(RUNNER_REPO_JIT_CONFIG_ENDPOINT, src.Name), 1, VmSettings{
614630
Name: fmt.Sprintf("%s-%s", s.conf.RunnerPrefix, RandStringRunes(10)),
615631
MachineType: job.GetMagicLabelValue(MagicLabelMachine),
@@ -653,23 +669,38 @@ func (s *Autoscaler) handleWebhook(ctx *gin.Context) {
653669
if payload.Action == QUEUED {
654670
if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok {
655671
createUrl := createCallbackUrl(ctx, s.conf.RouteCreateVm, s.conf.SourceQueryParam, src.Name)
656-
if err := s.createCallbackTaskWithToken(ctx, createUrl, src.Secret, payload.Job); err != nil {
672+
// delay the create vm callback so we have a chance to delete it if the workflow job is changing its state to 'waiting'
673+
if err := s.createCallbackTaskWithToken(ctx, createUrl, src.Secret, payload.Job, time.Duration(s.conf.CreateVmDelay)*time.Second); err != nil {
657674
log.Errorf("Can not enqueue create-vm cloud task callback: %s", err.Error())
658675
ctx.AbortWithError(http.StatusInternalServerError, err)
659676
return
660677
}
661678
} else {
662679
log.Warnf("Webhook requested to start a runner that is missing the label(s) \"%s\" - ignoring", strings.Join(missingLabels, ", "))
663680
}
681+
} else if payload.Action == WAITING {
682+
// the waiting action happens if a deployment environment is configured in the workflow that requires a review. We have to cancel the cloud task callback
683+
if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok {
684+
if err := s.deleteCallbackTask(ctx, payload.Job); err != nil {
685+
// best effort - this is not considered an error
686+
log.Warnf("Can not delete create-vm cloud task callback: %s", err.Error())
687+
}
688+
} else {
689+
log.Warnf("Webhook signals 'wait' but is missing the label(s) \"%s\" - ignoring", strings.Join(missingLabels, ", "))
690+
}
664691
} else if payload.Action == COMPLETED {
665692
runnerGroupId := s.conf.RunnerGroupId
666693
if src.SourceType == TypeRepository {
667694
runnerGroupId = 1
668695
}
669696
if payload.Job.RunnerGroupId == runnerGroupId {
670697
if ok, missingLabels := payload.Job.HasAllLabels(s.conf.RunnerLabels); ok {
698+
699+
// if the user immediately cancels a workflow we have the chance to delete the callback if not older than 10 seconds - best effort, ignore all errors
700+
s.deleteCallbackTask(ctx, payload.Job)
701+
671702
deleteUrl := createCallbackUrl(ctx, s.conf.RouteDeleteVm, s.conf.SourceQueryParam, src.Name)
672-
if err := s.createCallbackTaskWithToken(ctx, deleteUrl, src.Secret, payload.Job); err != nil {
703+
if err := s.createCallbackTaskWithToken(ctx, deleteUrl, src.Secret, payload.Job, 1*time.Second); err != nil {
673704
log.Errorf("Can not enqueue delete-vm cloud task callback: %s", err.Error())
674705
ctx.AbortWithError(http.StatusInternalServerError, err)
675706
return
@@ -713,6 +744,7 @@ type AutoscalerConfig struct {
713744
RunnerLabels []string
714745
RegisteredSources map[string]Source
715746
SourceQueryParam string
747+
CreateVmDelay int64
716748
}
717749

718750
type Autoscaler struct {

0 commit comments

Comments
 (0)