Skip to content

Commit 08df269

Browse files
authored
feat: add avro support (Telefonica#16)
* feat: add avro support * chore: add line break * fix: always get a serializer * fix: timestamp to epoch * fix: timestamp example in docs
1 parent 04fbed1 commit 08df269

14 files changed

+294
-50
lines changed

Diff for: .dockerignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Dockerfile

Diff for: .gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@
1010

1111
# Output of the go coverage tool, specifically when used with LiteIDE
1212
*.out
13+
14+
prometheus-kafka-adapter

Diff for: .travis.yml

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ sudo: required
22

33
language: minimal
44

5+
# TODO use language go, test and build the code after building/publishing the docker image
6+
57
services:
68
- docker
79

Diff for: Dockerfile

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ ADD . /src/prometheus-kafka-adapter
77

88
RUN go build -o /prometheus-kafka-adapter
99

10-
FROM alpine
10+
FROM alpine:3.8
1111

1212
RUN apk add --no-cache librdkafka
1313

14+
COPY --from=build /src/prometheus-kafka-adapter/schemas/metric.avsc /schemas/metric.avsc
1415
COPY --from=build /prometheus-kafka-adapter /
1516

1617
CMD /prometheus-kafka-adapter

Diff for: README.md

+25-11
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,41 @@ We use `prometheus-kafka-adapter` internally at Telefonica for dumping Prometheu
1010

1111
## output
1212

13-
It produces the following messages in a kafka topic:
13+
It is able to write JSON or Avro-JSON messages in a kafka topic, depending on the `SERIALIZATION_FORMAT` configuration variable.
14+
15+
### JSON
1416

1517
```json
1618
{
17-
"__timestamp__": 1234567890,
18-
"__value__": 9876543210,
19-
20-
"__name__": "up",
21-
"job": "federation",
22-
23-
"label1": "value1",
24-
"label2": "value2"
19+
"timestamp": "1970-01-01T00:00:00Z",
20+
"value": "9876543210",
21+
"name": "up",
22+
23+
"labels": {
24+
"__name__": "up",
25+
"label1": "value1",
26+
"label2": "value2"
27+
}
2528
}
2629
```
2730

28-
`__timestamp__` and `__value__` are reserved values, and can't be used as label names. `__name__` defines the name of the metric.
31+
`timestamp` and `value` are reserved values, and can't be used as label names. `__name__` is a special label that defines the name of the metric and is copied as `name` to the top level for convenience.
32+
33+
### Avro JSON
2934

35+
The Avro-JSON serialization is the same. See the [Avro schema](./schemas/metric.avsc).
3036

3137
## configuration
3238

3339
### prometheus-kafka-adapter
3440

35-
There is a docker image `telefonica/prometheus-kafka-adapter:1.1.0-dev-4` [available on Docker Hub](https://hub.docker.com/r/telefonica/prometheus-kafka-adapter/).
41+
There is a docker image `telefonica/prometheus-kafka-adapter:1.1.0` [available on Docker Hub](https://hub.docker.com/r/telefonica/prometheus-kafka-adapter/).
3642

3743
Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends them to Kafka. This behaviour can be configured with the following environment variables:
3844

3945
- `KAFKA_BROKER_LIST`: defines kafka endpoint and port, defaults to `kafka:9092`.
4046
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`.
47+
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.
4148
- `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin).
4249
- `LOG_LEVEL`: defines log level for [`logrus`](https://github.com/sirupsen/logrus), can be `debug`, `info`, `warn`, `error`, `fatal` or `panic`, defaults to `info`.
4350
- `GIN_MODE`: manage [gin](https://github.com/gin-gonic/gin) debug logging, can be `debug` or `release`.
@@ -51,6 +58,13 @@ remote_write:
5158
- url: "http://prometheus-kafka-adapter:8080/receive"
5259
```
5360
61+
## development
62+
63+
```
64+
go test
65+
go build
66+
```
67+
5468
## contributing
5569

5670
With issues:

Diff for: config.go

+19
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ var (
2828
Topic: &kafkaTopic,
2929
Partition: kafka.PartitionAny,
3030
}
31+
serializer Serializer
3132
)
3233

3334
func init() {
@@ -50,6 +51,12 @@ func init() {
5051
Partition: kafka.PartitionAny,
5152
}
5253
}
54+
55+
var err error
56+
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
57+
if err != nil {
58+
logrus.WithError(err).Fatalln("couldn't create a metrics serializer")
59+
}
5360
}
5461

5562
func parseLogLevel(value string) logrus.Level {
@@ -62,3 +69,15 @@ func parseLogLevel(value string) logrus.Level {
6269

6370
return level
6471
}
72+
73+
func parseSerializationFormat(value string) (Serializer, error) {
74+
switch value {
75+
case "json":
76+
return NewJSONSerializer()
77+
case "avro-json":
78+
return NewAvroJSONSerializer("schemas/metric.avsc")
79+
default:
80+
logrus.WithField("serialization-format-value", value).Warningln("invalid serialization format, using json")
81+
return NewJSONSerializer()
82+
}
83+
}

Diff for: go.mod

+4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
module github.com/Telefonica/prometheus-kafka-adapter
22

33
require (
4+
github.com/actgardner/gogen-avro v5.1.0+incompatible // indirect
45
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
56
github.com/confluentinc/confluent-kafka-go v0.11.4
67
github.com/containous/traefik v1.7.1
8+
github.com/fatih/structs v1.1.0
79
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 // indirect
810
github.com/gin-gonic/contrib v0.0.0-20180614032058-39cfb9727134
911
github.com/gin-gonic/gin v1.3.0
1012
github.com/gogo/protobuf v1.1.1
1113
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
1214
github.com/grpc-ecosystem/grpc-gateway v1.5.0 // indirect
15+
github.com/linkedin/goavro v2.1.0+incompatible
1316
github.com/mattn/go-isatty v0.0.4 // indirect
1417
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
1518
github.com/prometheus/client_golang v0.8.0
@@ -18,6 +21,7 @@ require (
1821
github.com/prometheus/procfs v0.0.0-20180920065004-418d78d0b9a7 // indirect
1922
github.com/prometheus/prometheus v2.4.2+incompatible
2023
github.com/sirupsen/logrus v1.1.0
24+
github.com/stretchr/testify v1.2.2
2125
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a // indirect
2226
golang.org/x/net v0.0.0-20180926154720-4dfa2610cdf3 // indirect
2327
google.golang.org/genproto v0.0.0-20180928223349-c7e5094acea1 // indirect

Diff for: go.sum

+9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
2+
github.com/actgardner/gogen-avro v5.1.0+incompatible h1:FifTNNceWAXLIgeLiLaFzLcJ9NyBqh59g113kgOmqvo=
3+
github.com/actgardner/gogen-avro v5.1.0+incompatible/go.mod h1:N2PzqZtS+5w9xxGp2daeykhWdTL0lBiRhbbvkVj4Yd8=
24
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
35
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
46
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
57
github.com/confluentinc/confluent-kafka-go v0.11.4 h1:uH5doflVcMn+2G/ECv0wxpgmVkvEpTwYFW57V2iLqHo=
68
github.com/confluentinc/confluent-kafka-go v0.11.4/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
79
github.com/containous/traefik v1.7.1 h1:8fZ0MIRANiu39sBo/sIVy1EV1hRKcdz1Nc1QQjpL5zM=
810
github.com/containous/traefik v1.7.1/go.mod h1:epDRqge3JzKOhlSWzOpNYEEKXmM6yfN5tPzDGKk3ljo=
11+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
912
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
13+
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
14+
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
1015
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7 h1:AzN37oI0cOS+cougNAV9szl6CVoj2RYwzS3DpUQNtlY=
1116
github.com/gin-contrib/sse v0.0.0-20170109093832-22d885f9ecc7/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s=
1217
github.com/gin-gonic/contrib v0.0.0-20180614032058-39cfb9727134 h1:xgqFZVwmmtWiuq5LUZ/wa34hJR2Dm9NZAH+Cj9a7Hu0=
@@ -26,10 +31,13 @@ github.com/grpc-ecosystem/grpc-gateway v1.5.0 h1:WcmKMm43DR7RdtlkEXQJyo5ws8iTp98
2631
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
2732
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
2833
github.com/konsorten/go-windows-terminal-sequences v0.0.0-20180402223658-b729f2633dfe/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
34+
github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY=
35+
github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
2936
github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs=
3037
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
3138
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
3239
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
40+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3341
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3442
github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8=
3543
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -43,6 +51,7 @@ github.com/prometheus/prometheus v2.4.2+incompatible h1:IpbpeZAXsg39pqRThfPHoNRY
4351
github.com/prometheus/prometheus v2.4.2+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
4452
github.com/sirupsen/logrus v1.1.0 h1:65VZabgUiV9ktjGM5nTq0+YurgTyX+YI2lSSfDjI+qU=
4553
github.com/sirupsen/logrus v1.1.0/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
54+
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
4655
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
4756
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a h1:BgdofUvNP/srMxiUUpGyZm+WjX/qXpMXdl3edRf1Ta0=
4857
github.com/ugorji/go/codec v0.0.0-20180927125128-99ea80c8b19a/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=

Diff for: handlers.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ import (
2424
"github.com/confluentinc/confluent-kafka-go/kafka"
2525
"github.com/golang/snappy"
2626

27-
"github.com/prometheus/prometheus/prompb"
2827
"github.com/gogo/protobuf/proto"
28+
"github.com/prometheus/prometheus/prompb"
2929
)
3030

31-
func receiveHandler(p *kafka.Producer) func(c *gin.Context) {
31+
func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin.Context) {
3232
return func(c *gin.Context) {
3333
httpRequestsTotal.Add(float64(1))
3434

@@ -61,11 +61,11 @@ func receiveHandler(p *kafka.Producer) func(c *gin.Context) {
6161
}
6262

6363
for _, metric := range metrics {
64-
err := p.Produce(&kafka.Message{
64+
err := producer.Produce(&kafka.Message{
6565
TopicPartition: kafkaPartition,
6666
Value: metric,
6767
}, nil)
68-
68+
6969
if err != nil {
7070
c.AbortWithStatus(http.StatusInternalServerError)
7171
logrus.WithError(err).Error("couldn't produce message in kafka")

Diff for: main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@ import (
1818
"time"
1919

2020
"github.com/confluentinc/confluent-kafka-go/kafka"
21-
"github.com/prometheus/client_golang/prometheus"
2221
"github.com/containous/traefik/log"
2322
"github.com/gin-gonic/contrib/ginrus"
2423
"github.com/gin-gonic/gin"
24+
"github.com/prometheus/client_golang/prometheus"
2525
"github.com/sirupsen/logrus"
2626
)
2727

2828
func main() {
2929
log.Info("creating kafka producer")
3030

31-
p, err := kafka.NewProducer(&kafka.ConfigMap{
31+
producer, err := kafka.NewProducer(&kafka.ConfigMap{
3232
"bootstrap.servers": kafkaBrokerList,
3333
"go.batch.producer": true, // Enable batch producer (for increased performance).
3434
"go.delivery.reports": false, // per-message delivery reports to the Events() channel
@@ -42,7 +42,7 @@ func main() {
4242

4343
r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery())
4444

45-
r.POST("/receive", receiveHandler(p))
45+
r.POST("/receive", receiveHandler(producer, serializer))
4646
r.GET("/metrics", gin.WrapH(prometheus.UninstrumentedHandler()))
4747

4848
r.Run()

Diff for: prometheus.go

+1-31
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,11 @@
1515
package main
1616

1717
import (
18-
"encoding/json"
19-
20-
"github.com/prometheus/common/model"
2118
"github.com/prometheus/prometheus/prompb"
2219
"github.com/sirupsen/logrus"
2320
)
2421

2522
func processWriteRequest(req *prompb.WriteRequest) ([][]byte, error) {
2623
logrus.WithField("var", req).Debugln()
27-
result := [][]byte{}
28-
29-
for _, ts := range req.Timeseries {
30-
labels := make(model.Metric, len(ts.Labels))
31-
32-
for _, l := range ts.Labels {
33-
labels[model.LabelName(l.Name)] = model.LabelValue(l.Value)
34-
}
35-
36-
for _, sample := range ts.Samples {
37-
metric := make(map[string]interface{}, len(labels)+2)
38-
metric["__value__"] = sample.Value
39-
metric["__timestamp__"] = sample.Timestamp
40-
41-
for key, value := range labels {
42-
metric[string(key)] = value
43-
}
44-
45-
data, err := json.Marshal(metric)
46-
if err != nil {
47-
logrus.WithError(err).Errorln("couldn't proccess timeseries")
48-
}
49-
50-
result = append(result, data)
51-
}
52-
}
53-
54-
return result, nil
24+
return Serialize(serializer, req)
5525
}

Diff for: schemas/metric.avsc

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"namespace": "io.prometheus",
3+
"type": "record",
4+
"name": "Metric",
5+
"doc:" : "A basic schema for representing Prometheus metrics",
6+
"fields": [
7+
{"name": "timestamp", "type": "string"},
8+
{"name": "value", "type": "string"},
9+
{"name": "name", "type": "string"},
10+
{"name": "labels", "type": { "type": "map", "values": "string"} }
11+
]
12+
}

0 commit comments

Comments
 (0)