Skip to content

Commit a9c1354

Browse files
committed
Add Serializer plugins, and 'file' output plugin
1 parent 72f5c9b commit a9c1354

File tree

22 files changed

+660
-155
lines changed

22 files changed

+660
-155
lines changed

Godeps

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ github.com/gorilla/context 1c83b3eabd45b6d76072b66b746c20815fb2872d
1919
github.com/gorilla/mux 26a6070f849969ba72b72256e9f14cf519751690
2020
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
2121
github.com/influxdata/config bae7cb98197d842374d3b8403905924094930f24
22-
github.com/influxdata/influxdb a9552fdd91361819a792f337e5d9998859732a67
23-
github.com/influxdb/influxdb a9552fdd91361819a792f337e5d9998859732a67
22+
github.com/influxdata/influxdb ef571fc104dc24b77cd3710c156cd95e5cfd7aa5
2423
github.com/jmespath/go-jmespath c01cf91b011868172fdcd9f41838e80c9d716264
2524
github.com/klauspost/crc32 999f3125931f6557b991b2f8472172bdfa578d38
2625
github.com/lib/pq 8ad2b298cadd691a77015666a5372eae5dbfac8f

internal/config/config.go

+43
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/influxdata/telegraf/plugins/inputs"
1717
"github.com/influxdata/telegraf/plugins/outputs"
1818
"github.com/influxdata/telegraf/plugins/parsers"
19+
"github.com/influxdata/telegraf/plugins/serializers"
1920

2021
"github.com/influxdata/config"
2122
"github.com/naoina/toml/ast"
@@ -398,6 +399,17 @@ func (c *Config) addOutput(name string, table *ast.Table) error {
398399
}
399400
output := creator()
400401

402+
// If the output has a SetSerializer function, then this means it can write
403+
// arbitrary types of output, so build the serializer and set it.
404+
switch t := output.(type) {
405+
case serializers.SerializerOutput:
406+
serializer, err := buildSerializer(name, table)
407+
if err != nil {
408+
return err
409+
}
410+
t.SetSerializer(serializer)
411+
}
412+
401413
outputConfig, err := buildOutput(name, table)
402414
if err != nil {
403415
return err
@@ -660,6 +672,37 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
660672
return parsers.NewParser(c)
661673
}
662674

675+
// buildSerializer grabs the necessary entries from the ast.Table for creating
676+
// a serializers.Serializer object, and creates it, which can then be added onto
677+
// an Output object.
678+
func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error) {
679+
c := &serializers.Config{}
680+
681+
if node, ok := tbl.Fields["data_format"]; ok {
682+
if kv, ok := node.(*ast.KeyValue); ok {
683+
if str, ok := kv.Value.(*ast.String); ok {
684+
c.DataFormat = str.Value
685+
}
686+
}
687+
}
688+
689+
if c.DataFormat == "" {
690+
c.DataFormat = "influx"
691+
}
692+
693+
if node, ok := tbl.Fields["prefix"]; ok {
694+
if kv, ok := node.(*ast.KeyValue); ok {
695+
if str, ok := kv.Value.(*ast.String); ok {
696+
c.Prefix = str.Value
697+
}
698+
}
699+
}
700+
701+
delete(tbl.Fields, "data_format")
702+
delete(tbl.Fields, "prefix")
703+
return serializers.NewSerializer(c)
704+
}
705+
663706
// buildOutput parses output specific items from the ast.Table, builds the filter and returns an
664707
// internal_models.OutputConfig to be inserted into internal_models.RunningInput
665708
// Note: error exists in the return for future calls that might require error

plugins/outputs/all/all.go

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
_ "github.com/influxdata/telegraf/plugins/outputs/amqp"
66
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
77
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
8+
_ "github.com/influxdata/telegraf/plugins/outputs/file"
89
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
910
_ "github.com/influxdata/telegraf/plugins/outputs/influxdb"
1011
_ "github.com/influxdata/telegraf/plugins/outputs/kafka"

plugins/outputs/amqp/amqp.go

+26-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/influxdata/telegraf"
1111
"github.com/influxdata/telegraf/internal"
1212
"github.com/influxdata/telegraf/plugins/outputs"
13+
"github.com/influxdata/telegraf/plugins/serializers"
14+
1315
"github.com/streadway/amqp"
1416
)
1517

@@ -39,6 +41,8 @@ type AMQP struct {
3941
channel *amqp.Channel
4042
sync.Mutex
4143
headers amqp.Table
44+
45+
serializer serializers.Serializer
4246
}
4347

4448
const (
@@ -69,8 +73,18 @@ var sampleConfig = `
6973
# ssl_key = "/etc/telegraf/key.pem"
7074
### Use SSL but skip chain & host verification
7175
# insecure_skip_verify = false
76+
77+
### Data format to output. This can be "influx" or "graphite"
78+
### Each data format has it's own unique set of configuration options, read
79+
### more about them here:
80+
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
81+
data_format = "influx"
7282
`
7383

84+
func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
85+
a.serializer = serializer
86+
}
87+
7488
func (q *AMQP) Connect() error {
7589
q.Lock()
7690
defer q.Unlock()
@@ -147,18 +161,24 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
147161
}
148162
var outbuf = make(map[string][][]byte)
149163

150-
for _, p := range metrics {
151-
var value, key string
152-
value = p.String()
153-
164+
for _, metric := range metrics {
165+
var key string
154166
if q.RoutingTag != "" {
155-
if h, ok := p.Tags()[q.RoutingTag]; ok {
167+
if h, ok := metric.Tags()[q.RoutingTag]; ok {
156168
key = h
157169
}
158170
}
159-
outbuf[key] = append(outbuf[key], []byte(value))
160171

172+
values, err := q.serializer.Serialize(metric)
173+
if err != nil {
174+
return err
175+
}
176+
177+
for _, value := range values {
178+
outbuf[key] = append(outbuf[key], []byte(value))
179+
}
161180
}
181+
162182
for key, buf := range outbuf {
163183
err := q.channel.Publish(
164184
q.Exchange, // exchange

plugins/outputs/amqp/amqp_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package amqp
33
import (
44
"testing"
55

6+
"github.com/influxdata/telegraf/plugins/serializers"
67
"github.com/influxdata/telegraf/testutil"
78
"github.com/stretchr/testify/require"
89
)
@@ -13,9 +14,11 @@ func TestConnectAndWrite(t *testing.T) {
1314
}
1415

1516
var url = "amqp://" + testutil.GetLocalHost() + ":5672/"
17+
s, _ := serializers.NewInfluxSerializer()
1618
q := &AMQP{
17-
URL: url,
18-
Exchange: "telegraf_test",
19+
URL: url,
20+
Exchange: "telegraf_test",
21+
serializer: s,
1922
}
2023

2124
// Verify that we can connect to the AMQP broker

plugins/outputs/file/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# file Output Plugin

plugins/outputs/file/file.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package file
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"os"
7+
8+
"github.com/influxdata/telegraf"
9+
"github.com/influxdata/telegraf/plugins/outputs"
10+
"github.com/influxdata/telegraf/plugins/serializers"
11+
)
12+
13+
type File struct {
14+
Files []string
15+
16+
writer io.Writer
17+
closers []io.Closer
18+
19+
serializer serializers.Serializer
20+
}
21+
22+
var sampleConfig = `
23+
### Files to write to, "stdout" is a specially handled file.
24+
files = ["stdout", "/tmp/metrics.out"]
25+
26+
### Data format to output. This can be "influx" or "graphite"
27+
### Each data format has it's own unique set of configuration options, read
28+
### more about them here:
29+
### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_OUTPUT.md
30+
data_format = "influx"
31+
`
32+
33+
func (f *File) SetSerializer(serializer serializers.Serializer) {
34+
f.serializer = serializer
35+
}
36+
37+
func (f *File) Connect() error {
38+
writers := []io.Writer{}
39+
for _, file := range f.Files {
40+
if file == "stdout" {
41+
writers = append(writers, os.Stdout)
42+
f.closers = append(f.closers, os.Stdout)
43+
} else {
44+
var of *os.File
45+
var err error
46+
if _, err := os.Stat(file); os.IsNotExist(err) {
47+
of, err = os.Create(file)
48+
} else {
49+
of, err = os.OpenFile(file, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
50+
}
51+
52+
if err != nil {
53+
return err
54+
}
55+
writers = append(writers, of)
56+
f.closers = append(f.closers, of)
57+
}
58+
}
59+
f.writer = io.MultiWriter(writers...)
60+
return nil
61+
}
62+
63+
func (f *File) Close() error {
64+
var errS string
65+
for _, c := range f.closers {
66+
if err := c.Close(); err != nil {
67+
errS += err.Error() + "\n"
68+
}
69+
}
70+
if errS != "" {
71+
return fmt.Errorf(errS)
72+
}
73+
return nil
74+
}
75+
76+
func (f *File) SampleConfig() string {
77+
return sampleConfig
78+
}
79+
80+
func (f *File) Description() string {
81+
return "Send telegraf metrics to file(s)"
82+
}
83+
84+
func (f *File) Write(metrics []telegraf.Metric) error {
85+
if len(metrics) == 0 {
86+
return nil
87+
}
88+
89+
for _, metric := range metrics {
90+
values, err := f.serializer.Serialize(metric)
91+
if err != nil {
92+
return err
93+
}
94+
95+
for _, value := range values {
96+
_, err = f.writer.Write([]byte(value + "\n"))
97+
if err != nil {
98+
return fmt.Errorf("FAILED to write message: %s, %s", value, err)
99+
}
100+
}
101+
}
102+
return nil
103+
}
104+
105+
func init() {
106+
outputs.Add("file", func() telegraf.Output {
107+
return &File{}
108+
})
109+
}

plugins/outputs/file/file_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package file

plugins/outputs/graphite/graphite.go

+15-65
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package graphite
33
import (
44
"errors"
55
"fmt"
6-
"github.com/influxdata/telegraf"
7-
"github.com/influxdata/telegraf/plugins/outputs"
86
"log"
97
"math/rand"
108
"net"
11-
"sort"
129
"strings"
1310
"time"
11+
12+
"github.com/influxdata/telegraf"
13+
"github.com/influxdata/telegraf/plugins/outputs"
14+
"github.com/influxdata/telegraf/plugins/serializers"
1415
)
1516

1617
type Graphite struct {
@@ -71,42 +72,22 @@ func (g *Graphite) Description() string {
7172
func (g *Graphite) Write(metrics []telegraf.Metric) error {
7273
// Prepare data
7374
var bp []string
74-
for _, metric := range metrics {
75-
// Get name
76-
name := metric.Name()
77-
// Convert UnixNano to Unix timestamps
78-
timestamp := metric.UnixNano() / 1000000000
79-
tag_str := buildTags(metric)
75+
s, err := serializers.NewGraphiteSerializer(g.Prefix)
76+
if err != nil {
77+
return err
78+
}
8079

81-
for field_name, value := range metric.Fields() {
82-
// Convert value
83-
value_str := fmt.Sprintf("%#v", value)
84-
// Write graphite metric
85-
var graphitePoint string
86-
if name == field_name {
87-
graphitePoint = fmt.Sprintf("%s.%s %s %d\n",
88-
tag_str,
89-
strings.Replace(name, ".", "_", -1),
90-
value_str,
91-
timestamp)
92-
} else {
93-
graphitePoint = fmt.Sprintf("%s.%s.%s %s %d\n",
94-
tag_str,
95-
strings.Replace(name, ".", "_", -1),
96-
strings.Replace(field_name, ".", "_", -1),
97-
value_str,
98-
timestamp)
99-
}
100-
if g.Prefix != "" {
101-
graphitePoint = fmt.Sprintf("%s.%s", g.Prefix, graphitePoint)
102-
}
103-
bp = append(bp, graphitePoint)
80+
for _, metric := range metrics {
81+
gMetrics, err := s.Serialize(metric)
82+
if err != nil {
83+
log.Printf("Error serializing some metrics to graphite: %s", err.Error())
10484
}
85+
bp = append(bp, gMetrics...)
10586
}
106-
graphitePoints := strings.Join(bp, "")
87+
graphitePoints := strings.Join(bp, "\n") + "\n"
10788

10889
// This will get set to nil if a successful write occurs
109-
err := errors.New("Could not write to any Graphite server in cluster\n")
90+
err = errors.New("Could not write to any Graphite server in cluster\n")
11091

11192
// Send data to a random server
11293
p := rand.Perm(len(g.conns))
@@ -128,37 +109,6 @@ func (g *Graphite) Write(metrics []telegraf.Metric) error {
128109
return err
129110
}
130111

131-
func buildTags(metric telegraf.Metric) string {
132-
var keys []string
133-
tags := metric.Tags()
134-
for k := range tags {
135-
if k == "host" {
136-
continue
137-
}
138-
keys = append(keys, k)
139-
}
140-
sort.Strings(keys)
141-
142-
var tag_str string
143-
if host, ok := tags["host"]; ok {
144-
if len(keys) > 0 {
145-
tag_str = strings.Replace(host, ".", "_", -1) + "."
146-
} else {
147-
tag_str = strings.Replace(host, ".", "_", -1)
148-
}
149-
}
150-
151-
for i, k := range keys {
152-
tag_value := strings.Replace(tags[k], ".", "_", -1)
153-
if i == 0 {
154-
tag_str += tag_value
155-
} else {
156-
tag_str += "." + tag_value
157-
}
158-
}
159-
return tag_str
160-
}
161-
162112
func init() {
163113
outputs.Add("graphite", func() telegraf.Output {
164114
return &Graphite{}

0 commit comments

Comments
 (0)