Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetstream example #81

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
77 changes: 77 additions & 0 deletions miscellaneous/message-queue-trigger/nats-jetstream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Message Queue Trigger Demonstration - NATS Jetstream

## Create Nats Jetstream server
```
kubectl apply -f jetstream-server.yaml
```
## Create Producer

```
fission environment create --name go --image fission/go-env-1.16 --builder fission/go-builder-1.16
fission fn create --name producer --env go --src "producer/*" --entrypoint Handler
```

## Create Fission function
```
fission fn create --name helloworld --env go --src hello.go --entrypoint Handler
```

## Create Fission trigger

```
fission mqt create --name jetstreamtest --function helloworld --mqtype nats-jetstream --mqtkind keda --topic input.created --resptopic output.response-topic --errortopic erroutput.error-topic --maxretries 3 --metadata stream=input --metadata fissionConsumer=fission_consumer --metadata natsServerMonitoringEndpoint=nats-jetstream.default.svc.cluster.local:8222 --metadata natsServer=nats://nats-jetstream.default.svc.cluster.local:4222 --metadata consumer=fission_consumer
```
## Run the producer
```
fission fn test --name=producer
```
### Sample Output
```
Order with OrderID:1 has been published
Order with OrderID:2 has been published
Order with OrderID:3 has been published
Successfully sent to request-topic
```

## Check logs
To verify the status of trigger, we can-

- check for logs in the fission helloworld function's pod

```
$ fission fn pod --name=helloworld
NAME NAMESPACE READY STATUS IP EXECUTORTYPE MANAGED
poolmgr-go-default-6312601-6d6b85ff4f-b8m7g fission-function 2/2 Running 10.244.0.188 poolmgr false
```
or

```
$ kubectl -n fission-function get pod -l functionName=helloworld
NAME READY STATUS RESTARTS AGE
poolmgr-go-default-6312601-6d6b85ff4f-b8m7g 2/2 Terminating 0 30m
```
### sample output

```
$ kubectl -n fission-function logs -f -c go poolmgr-go-default-6312601-6d6b85ff4f-b8m7g
2022/08/24 06:16:17 listening on 8888 ...
2022/08/24 06:42:23 specializing ...
2022/08/24 06:42:23 loading plugin from /userfunc/deployarchive/helloworld-eb3f240a-d6bb-4728-b806-f426ce0e255a-vyh8tf-oa1sgs
2022/08/24 06:42:23 done
Hello Test1
Hello Test2
Hello Test3
```

- check jetstream pods logs-

```
$ kubectl logs deploy/jetstreamtest
{"level":"info","ts":1661322333.8198879,"caller":"app/main.go:90","msg":"Done processing message","messsage":"Hello Test1"}
{"level":"info","ts":1661322333.8208282,"caller":"app/main.go:90","msg":"Done processing message","messsage":"Hello Test2"}
{"level":"info","ts":1661322333.8217056,"caller":"app/main.go:90","msg":"Done processing message","messsage":"Hello Test3"}
```

NOTE:
- Jetstream connector creates a push based subscriber to get the data. Make sure the `consumer` provided in `mqt` is of type pull. Also if the consumer is not present connector will itself create the it.
- The connector needs all the stream mentioned(topic,respTopic,errTopic streams) to be present otherwise it will fail. For this example we have created all these streams in producer function. So before pusblisher publishes the messages it also creates the required stream if not present.
28 changes: 28 additions & 0 deletions miscellaneous/message-queue-trigger/nats-jetstream/hello.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"fmt"
"io"
"net/http"
)

// Handler is the entry point for this fission function
func Handler(w http.ResponseWriter, r *http.Request) { // nolint:unused,deadcode
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Error reading request body",
http.StatusInternalServerError)
}
results := string(body)
fmt.Println("Hello: ", results)
_, err = w.Write([]byte("Hello " + results))
if err != nil {
http.Error(w, "Error writing response", http.StatusInternalServerError)
}
}

// ErrorHandler is the entry point for this fission function
func ErrorHandler(w http.ResponseWriter, r *http.Request) { // nolint:unused,deadcode

http.Error(w, "Error reading request body", http.StatusBadRequest)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: nats-jetstream-deployment
labels:
app: nats-jetstream
spec:
replicas: 1
selector:
matchLabels:
app: nats-jetstream
template:
metadata:
labels:
app: nats-jetstream
spec:
containers:
- name: nats-jetstream
# docker run --network host -p 4222:4222 nats -js -m 8222
image: nats:latest
args: ["-js","-m", "8222" ]
---
apiVersion: v1
kind: Service
metadata:
name: nats-jetstream
labels:
app: nats-jetstream
spec:
selector:
app: nats-jetstream
clusterIP: None
ports:
- name: client
port: 4222
- name: cluster
port: 6222
- name: monitor
port: 8222
- name: metrics
port: 7777
- name: leafnodes
port: 7422
- name: gateways
port: 7522
10 changes: 10 additions & 0 deletions miscellaneous/message-queue-trigger/nats-jetstream/producer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module github.com/fission/mqtrigger

go 1.12

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/nats-io/nats-server/v2 v2.8.4 // indirect
github.com/nats-io/nats.go v1.16.0
google.golang.org/protobuf v1.28.1 // indirect
)
58 changes: 58 additions & 0 deletions miscellaneous/message-queue-trigger/nats-jetstream/producer/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28gth9P+wV2K/zYUUAkJ+55U8cpS0p5I=
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.16.0 h1:zvLE7fGBQYW6MWaFaRdsgm9qT39PJDQoju+DS8KsO1g=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38=
golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M=
golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
111 changes: 111 additions & 0 deletions miscellaneous/message-queue-trigger/nats-jetstream/producer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package main

import (
"fmt"
"log"
"net/http"
"os"
"strconv"

"github.com/nats-io/nats.go"
)

const (
streamName = "input"
streamSubjects = "input.*"
subjectName = "input.created"
responseStreamName = "output"
responseStreamSubject = "output.response-topic"
errorStreamName = "erroutput"
errorstreamSubjects = "erroutput.error-topic"
)

// Handler is the entry point for this fission function
func Handler(w http.ResponseWriter, r *http.Request) { // nolint:unused,deadcode

// Connect to NATS
host := "nats://nats-jetstream.default.svc.cluster.local:4222"

nc, err := nats.Connect(host)
if err != nil {
w.Write([]byte(fmt.Sprintf("error connecting to host: %v", err.Error())))
return
}
// Creates JetStreamContext
js, err := nc.JetStream()
if err != nil {
w.Write([]byte(fmt.Sprintf("error getting context: %v", err.Error())))
return
}

// Creates stream
err = createStream(js, streamName, streamSubjects)
if err != nil {
w.Write([]byte(fmt.Sprintf("error create stream: %v", err.Error())))
return
}

// Creates stream
err = createStream(js, responseStreamName, responseStreamSubject)
if err != nil {
w.Write([]byte(fmt.Sprintf("error create stream: %v", err.Error())))
return
}

// create output & err stream
err = createStream(js, errorStreamName, errorstreamSubjects)
if err != nil {
w.Write([]byte(fmt.Sprintf("error create stream: %v", err.Error())))
return
}

// Create records by publishing messages
err = publishdata(w, js)
if err != nil {
w.Write([]byte(fmt.Sprintf("error in publishing stream: %v", err.Error())))
return
}
fmt.Println("Published all the messages")

w.Write([]byte("Successfully sent to request-topic"))
}

// publishdata publishes data to input stream
func publishdata(w http.ResponseWriter, js nats.JetStreamContext) error {

no, err := strconv.Atoi(os.Getenv("COUNT"))
if err != nil {
log.Println("invalid count provided. Err: ", err)
no = 3
err = nil
}
for i := 1; i <= no; i++ {
_, err := js.Publish(subjectName, []byte("Test"+strconv.Itoa(i)))
if err != nil {
log.Println("Error found: ", err)
return err
}
w.Write([]byte(fmt.Sprintf("Order with OrderID:%d has been published\n", i)))
}
return nil
}

// createStream creates a stream by using JetStreamContext
func createStream(js nats.JetStreamContext, streamName, streamSubjects string) error {
stream, err := js.StreamInfo(streamName)
if err != nil {
log.Println(err)
err = nil
}
if stream == nil {
log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamSubjects},
})
if err != nil {
return err
}
}
return nil
}