Skip to content

Commit 783f133

Browse files
authored
Merge pull request #44 from ripienaar/43
(#43) support packaging handlers into docker containers
2 parents 4ea9e69 + 904bbfc commit 783f133

17 files changed

+384
-25
lines changed

Dockerfile.goreleaser

+2
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,7 @@
22

33
FROM alpine:latest
44

5+
RUN apk --no-cache add ca-certificates
6+
57
ENTRYPOINT ["/usr/bin/ajc"]
68
COPY ajc /usr/bin/ajc

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ task, _ := asyncjobs.NewTask("email:new", newEmail())
5050
client.EnqueueTask(ctx, task)
5151
```
5252

53-
Tasks are processes by horizontally and vertically scalable. Typically, a Handler handles one type of Task. We have Prometheus
53+
Tasks are processes by horizontally and vertically scalable processes. Typically, a Handler handles one type of Task. We have Prometheus
5454
integration, concurrency and backoffs configured.
5555

5656
```go
@@ -64,7 +64,7 @@ client, _ := asyncjobs.NewClient(
6464
asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes))
6565

6666
router := asyncjobs.NewTaskRouter()
67-
router.Handler("email:new", func(ctx context.Context, task *asyncjobs.Task) (interface{}, error) {
67+
router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (interface{}, error) {
6868
log.Printf("Processing task %s", task.ID)
6969

7070
// do work here using task.Payload

ajc/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func main() {
4141
configureInfoCommand(ajc)
4242
configureTaskCommand(ajc)
4343
configureQueueCommand(ajc)
44+
configurePackagesCommand(ajc)
4445

4546
_, err := ajc.Parse(os.Args[1:])
4647
if err != nil {

ajc/package_command.go

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package main
6+
7+
import (
8+
"fmt"
9+
"os"
10+
"path/filepath"
11+
12+
"github.com/choria-io/asyncjobs/generators"
13+
"gopkg.in/alecthomas/kingpin.v2"
14+
"gopkg.in/yaml.v2"
15+
)
16+
17+
type packageCommand struct {
18+
file string
19+
}
20+
21+
func configurePackagesCommand(app *kingpin.Application) {
22+
c := &packageCommand{}
23+
24+
pkg := app.Command("package", "Creates packages hosting handlers").Alias("pkg")
25+
26+
pkg.Command("docker", "Creates a Docker Container hosting handlers based on handlers.yaml").Action(c.dockerAction)
27+
pkg.Flag("file", "Use a specific configuration file rather than asyncjobs.yaml").Default("asyncjobs.yaml").ExistingFileVar(&c.file)
28+
}
29+
30+
func (c *packageCommand) dockerAction(_ *kingpin.ParseContext) error {
31+
createLogger()
32+
33+
_, err := os.Stat(c.file)
34+
if os.IsNotExist(err) {
35+
return fmt.Errorf("handlers.yaml does not exist")
36+
}
37+
38+
hb, err := os.ReadFile(c.file)
39+
if err != nil {
40+
return err
41+
}
42+
43+
h := &generators.Package{}
44+
err = yaml.Unmarshal(hb, h)
45+
if err != nil {
46+
return fmt.Errorf("invalid handlers file: %v", err)
47+
}
48+
49+
if h.AJVersion == "" {
50+
h.AJVersion = version
51+
}
52+
if h.Name == "" {
53+
h.Name = "choria.io/asyncjobs/handlers"
54+
}
55+
56+
if len(h.TaskHandlers) == 0 {
57+
return fmt.Errorf("no task handlers specified in %s", c.file)
58+
}
59+
60+
generator, err := generators.NewGoContainer(h)
61+
if err != nil {
62+
return err
63+
}
64+
65+
path, err := filepath.Abs(".")
66+
if err != nil {
67+
return err
68+
}
69+
70+
err = generator.RenderToDirectory(path)
71+
if err != nil {
72+
return err
73+
}
74+
75+
log.Printf("Run docker build to build your package\n")
76+
77+
return nil
78+
}

ajc/task_command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (c *taskCommand) processAction(_ *kingpin.ParseContext) error {
188188
}
189189

190190
router := asyncjobs.NewTaskRouter()
191-
err = router.HandleFunc(c.ttype, func(ctx context.Context, task *asyncjobs.Task) (interface{}, error) {
191+
err = router.HandleFunc(c.ttype, func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (interface{}, error) {
192192
tj, err := json.Marshal(task)
193193
if err != nil {
194194
return nil, err

ajc/util.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"golang.org/x/term"
2222
)
2323

24-
func prepare(copts ...asyncjobs.ClientOpt) error {
24+
func createLogger() {
2525
logger := logrus.New()
2626
if debug {
2727
logger.SetLevel(logrus.DebugLevel)
@@ -33,7 +33,11 @@ func prepare(copts ...asyncjobs.ClientOpt) error {
3333
FullTimestamp: true,
3434
TimestampFormat: "15:04:05",
3535
})
36+
3637
log = logrus.NewEntry(logger)
38+
}
39+
func prepare(copts ...asyncjobs.ClientOpt) error {
40+
createLogger()
3741

3842
if nctx == "" {
3943
return fmt.Errorf("no NATS Context specified")

client_examples_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func ExampleClient_consumer() {
7171
panicIfErr(err)
7272

7373
router := NewTaskRouter()
74-
err = router.HandleFunc("email:send", func(_ context.Context, t *Task) (interface{}, error) {
74+
err = router.HandleFunc("email:send", func(_ context.Context, _ Logger, t *Task) (interface{}, error) {
7575
log.Printf("Processing task: %s", t.ID)
7676

7777
// handle task.Payload which is a JSON encoded email

client_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ var _ = Describe("Client", func() {
138138
handled := int32(0)
139139

140140
router := NewTaskRouter()
141-
router.HandleFunc("test", func(ctx context.Context, t *Task) (interface{}, error) {
141+
router.HandleFunc("test", func(ctx context.Context, log Logger, t *Task) (interface{}, error) {
142142
if t.Tries > 1 {
143-
log.Printf("Try %d for task %s", t.Tries, t.ID)
143+
log.Infof("Try %d for task %s", t.Tries, t.ID)
144144
}
145145

146146
done := atomic.AddInt32(&handled, 1)
147147
if done == int32(testCount)+10 {
148-
log.Printf("Processed all messages")
148+
log.Infof("Processed all messages")
149149
time.AfterFunc(50*time.Millisecond, func() {
150150
cancel()
151151
})
@@ -198,10 +198,10 @@ var _ = Describe("Client", func() {
198198
var tries []time.Time
199199

200200
router := NewTaskRouter()
201-
router.HandleFunc("ginkgo", func(ctx context.Context, t *Task) (interface{}, error) {
201+
router.HandleFunc("ginkgo", func(ctx context.Context, log Logger, t *Task) (interface{}, error) {
202202
tries = append(tries, time.Now())
203203

204-
log.Printf("Trying task %s on try %d\n", t.ID, t.Tries)
204+
log.Infof("Trying task %s on try %d\n", t.ID, t.Tries)
205205

206206
if t.Tries < 2 {
207207
return "fail", fmt.Errorf("simulated failure")
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
FROM golang:latest AS builder
2+
3+
WORKDIR /usr/src/app
4+
5+
COPY main.go /usr/src/app/main.go
6+
7+
RUN go mod init "{{ .Package.Name }}" && \
8+
go mod tidy -compat=1.17 && \
9+
go get github.com/choria-io/asyncjobs@{{ .Package.AJVersion }} && \
10+
go mod download
11+
12+
RUN go build -v -o /app -ldflags="-s -w"
13+
14+
FROM alpine:latest
15+
16+
RUN adduser -g "Choria Async Jobs" choria && \
17+
mkdir /lib64 && \
18+
ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2 && \
19+
apk --no-cache add ca-certificates && \
20+
mkdir -p /handler/config
21+
22+
COPY --from=builder /app /handler/app
23+
24+
EXPOSE 8080/tcp
25+
26+
USER choria
27+
28+
ENV XDG_CONFIG_HOME "/handler/config"
29+
ENV AJ_WORK_QUEUE "{{ .Package.WorkQueue }}"
30+
{{- if .Package.ContextName }}
31+
ENV AJ_NATS_CONTEXT "{{ .Package.ContextName }}"
32+
{{- else }}
33+
ENV AJ_NATS_CONTEXT "AJ"
34+
{{- end }}
35+
36+
ENTRYPOINT ["/handler/app"]

generators/fs/godocker/main.go.templ

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright (c) 2022, R.I. Pienaar and the Project contributors
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package main
6+
7+
import (
8+
"context"
9+
"fmt"
10+
"os"
11+
"runtime"
12+
"strconv"
13+
14+
"github.com/sirupsen/logrus"
15+
aj "github.com/choria-io/asyncjobs"
16+
17+
{{ range $handler := .Package.TaskHandlers }}
18+
{{ $handler.TaskType | TypeToPackageName }} "{{ $handler.Package }}"
19+
{{- end }}
20+
)
21+
22+
var usage = `
23+
Choria Async Jobs Handler Failed: %v
24+
25+
This is a generated Handler for the Choria Async Jobs Project.
26+
27+
It hosts the following handlers:
28+
{{ range $handler := .Package.TaskHandlers }}
29+
- {{ $handler.TaskType }}: {{ $handler.Package }}.AsyncJobHandler
30+
{{- end }}
31+
32+
To run this you need to prepare a NATS context using the nats
33+
CLI and then mount it into the container on /handler/config/nats.
34+
35+
The following Environment variables are supported:
36+
37+
- AJ_WORK_QUEUE: The Work Queue to consume from, defaults to DEFAULT
38+
- AJ_NATS_CONTEXT: The name of a NATS Context to use for connections
39+
- AJ_CONCURRENCY: The number of concurrent handlers that can be run
40+
41+
Prometheus statistics are Exposed on port 8080 as /metrics
42+
43+
For further information see the project wiki at:
44+
45+
https://github.com/choria-io/asyncjobs
46+
47+
Build Information:
48+
49+
- Build Time: {{ .BuildTime }}
50+
- Async Jobs Package Version: {{ .Package.AJVersion }}
51+
52+
`
53+
54+
func usageIfError(err error) {
55+
if err == nil {
56+
return
57+
}
58+
59+
fmt.Printf(usage, err)
60+
os.Exit(1)
61+
}
62+
63+
func main() {
64+
wq := os.Getenv("AJ_WORK_QUEUE")
65+
if wq == "" {
66+
usageIfError(fmt.Errorf("AJ_WORK_QUEUE is required"))
67+
}
68+
69+
nctx := os.Getenv("AJ_NATS_CONTEXT")
70+
if nctx == "" {
71+
usageIfError(fmt.Errorf("AJ_NATS_CONTEXT is required"))
72+
}
73+
74+
var err error
75+
concurrencyS := os.Getenv("AJ_CONCURRENCY")
76+
concurrency := runtime.NumCPU()
77+
if concurrencyS != "" {
78+
concurrency, err = strconv.Atoi(concurrencyS)
79+
if err != nil {
80+
usageIfError(fmt.Errorf("AJ_CONCURRENCY must be numeric"))
81+
}
82+
}
83+
84+
logger := logrus.New()
85+
if os.Getenv("AJ_DEBUG") == "1" {
86+
logger.SetLevel(logrus.DebugLevel)
87+
} else {
88+
logger.SetLevel(logrus.InfoLevel)
89+
}
90+
logger.SetFormatter(&logrus.TextFormatter{
91+
FullTimestamp: true,
92+
TimestampFormat: "15:04:05",
93+
})
94+
95+
log := logrus.NewEntry(logger)
96+
97+
log.Printf("Connecting using Context %s consuming work queue %s with concurrency %d", nctx, wq, concurrency)
98+
99+
client, err := aj.NewClient(
100+
aj.NatsContext(nctx),
101+
aj.BindWorkQueue(wq),
102+
aj.ClientConcurrency(concurrency),
103+
aj.CustomLogger(log),
104+
aj.PrometheusListenPort(8080))
105+
usageIfError(err)
106+
107+
router := aj.NewTaskRouter()
108+
{{ range $handler := .Package.TaskHandlers }}
109+
err = router.HandleFunc("{{ $handler.TaskType }}", {{ $handler.TaskType | TypeToPackageName }}.AsyncJobHandler)
110+
usageIfError(err)
111+
{{- end }}
112+
113+
err = client.Run(context.Background(), router)
114+
usageIfError(err)
115+
}

0 commit comments

Comments
 (0)