Skip to content

Commit 6bd5ecc

Browse files
committedMay 19, 2017
Initial commit.
1 parent d770e0e commit 6bd5ecc

File tree

6 files changed

+634
-0
lines changed

6 files changed

+634
-0
lines changed
 

‎base_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package main
2+
3+
import (
4+
. "gopkg.in/check.v1"
5+
"testing"
6+
)
7+
8+
// Hook up gocheck into the "go test" runner.
9+
func Test(t *testing.T) { TestingT(t) }
10+
11+
type MySuite struct{}
12+
13+
var _ = Suite(&MySuite{})

‎command.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
"os/exec"
9+
)
10+
11+
// runCommand invokes script under sh.scriptPath, returning its stdout and
12+
// any error that resulted. Errors include the script exiting with nonzero
13+
// status or via signal, the script writing to stderr, or the context
14+
// reaching Done state. In the latter case the error will be one of
15+
// context.Canceled or context.DeadlineExceeded.
16+
func runCommand(ctx context.Context, script string, args ...string) (string, error) {
17+
// Create a new context for the command so that we don't fight over
18+
// the Done() message.
19+
cmdctx, _ := context.WithCancel(ctx)
20+
cmd := exec.CommandContext(cmdctx, script, args...)
21+
22+
// It'd be simpler to use cmd.Output(), which was what I tried first.
23+
// The problem is that due to https://github.com/golang/go/issues/18874
24+
// we then may fail to promptly timeout children that spawn their own
25+
// child processes.
26+
27+
var pstdout, pstderr io.ReadCloser
28+
var err error
29+
pstdout, err = cmd.StdoutPipe()
30+
if err != nil {
31+
return "", fmt.Errorf("unable to create stdout pipe: %v", err)
32+
}
33+
defer func(rc io.ReadCloser) {
34+
rc.Close()
35+
}(pstdout)
36+
37+
pstderr, err = cmd.StderrPipe()
38+
if err != nil {
39+
return "", fmt.Errorf("unable to create stderr pipe: %v", err)
40+
}
41+
defer func(rc io.ReadCloser) {
42+
rc.Close()
43+
}(pstderr)
44+
45+
err = cmd.Start()
46+
if err != nil {
47+
return "", fmt.Errorf("failed to start child: %v", err)
48+
}
49+
50+
var stdout, stderr bytes.Buffer
51+
chdone := make(chan struct{}, 2)
52+
53+
// These goroutines shouldn't leak because once Wait() returns, Copy()
54+
// inputs will be closed and thus the goroutines will return.
55+
go func() {
56+
io.Copy(&stdout, pstdout)
57+
chdone <- struct{}{}
58+
}()
59+
go func() {
60+
io.Copy(&stderr, pstderr)
61+
chdone <- struct{}{}
62+
}()
63+
64+
closed, ctxdone := 0, false
65+
for !ctxdone && closed < 2 {
66+
select {
67+
case <-ctx.Done():
68+
// We may get partial stdout in this case, which is fine.
69+
ctxdone = true
70+
case <-chdone:
71+
closed++
72+
}
73+
}
74+
err = cmd.Wait()
75+
if ctxdone {
76+
err = ctx.Err()
77+
}
78+
if err == nil && stderr.Len() != 0 {
79+
err = fmt.Errorf("got stderr output: %v", stderr.String())
80+
}
81+
return stdout.String(), err
82+
}

‎command_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
// "github.com/kylelemons/godebug/pretty"
5+
"fmt"
6+
"github.com/prometheus/client_golang/prometheus"
7+
dto "github.com/prometheus/client_model/go"
8+
. "gopkg.in/check.v1"
9+
"sort"
10+
"time"
11+
)
12+
13+
func (s MySuite) TestTranslateOpentsdb(c *C) {
14+
now := time.Now().Unix()
15+
ot := fmt.Sprintf("a.a %d 9 l1=v1\na.b %d 99 l2=v2 l3=v3", now, now+1)
16+
pms, err := translateOpenTsdb(ot)
17+
c.Assert(err, IsNil)
18+
c.Assert(len(pms), Equals, 2)
19+
20+
c.Check(pms[0].Desc().String(), Equals, `Desc{fqName: "a_a", help: "help", constLabels: {l1="v1"}, variableLabels: []}`)
21+
22+
var met1 dto.Metric
23+
pms[0].Write(&met1)
24+
// c.Check(met1.GetTimestampMs(), Equals, 0)
25+
g1 := met1.GetGauge()
26+
c.Assert(g1, Not(IsNil))
27+
c.Check(g1.GetValue(), Equals, 9.0)
28+
sort.Sort(prometheus.LabelPairSorter(met1.Label))
29+
c.Assert(len(met1.Label), Equals, 1)
30+
c.Check(met1.Label[0].GetName(), Equals, "l1")
31+
c.Check(met1.Label[0].GetValue(), Equals, "v1")
32+
33+
c.Check(pms[1].Desc().String(), Equals, `Desc{fqName: "a_b", help: "help", constLabels: {l2="v2",l3="v3"}, variableLabels: []}`)
34+
35+
var met2 dto.Metric
36+
pms[1].Write(&met2)
37+
// c.Check(met2.GetTimestampMs(), Equals, 0)
38+
g2 := met2.GetGauge()
39+
c.Assert(g2, Not(IsNil))
40+
c.Check(g2.GetValue(), Equals, 99.0)
41+
sort.Sort(prometheus.LabelPairSorter(met2.Label))
42+
c.Assert(len(met2.Label), Equals, 2)
43+
c.Check(met2.Label[0].GetName(), Equals, "l2")
44+
c.Check(met2.Label[0].GetValue(), Equals, "v2")
45+
c.Check(met2.Label[1].GetName(), Equals, "l3")
46+
c.Check(met2.Label[1].GetValue(), Equals, "v3")
47+
}

‎main.go

+225
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
// +build go1.8
2+
3+
package main
4+
5+
import (
6+
"context"
7+
"flag"
8+
"fmt"
9+
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/prometheus/client_golang/prometheus/promhttp"
11+
"log"
12+
"net/http"
13+
_ "net/http/pprof"
14+
"path"
15+
"strings"
16+
"sync"
17+
"time"
18+
)
19+
20+
var (
21+
mDuration = prometheus.NewCounterVec(prometheus.CounterOpts{
22+
Name: "script_duration_seconds_total",
23+
Help: "time elapsed executing script",
24+
}, []string{"script_name"})
25+
mConcExceeds = prometheus.NewCounterVec(prometheus.CounterOpts{
26+
Name: "script_concurrency_exceeds_total",
27+
Help: "number of times script was not executed because there were already too many executions ongoing",
28+
}, []string{"script_name"})
29+
mRuns = prometheus.NewCounterVec(prometheus.CounterOpts{
30+
Name: "script_runs_total",
31+
Help: "number of times script execution attempted",
32+
}, []string{"script_name"})
33+
mErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
34+
Name: "script_errors_total",
35+
Help: "number of script executions that ended with an error",
36+
}, []string{"script_name"})
37+
mParseErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
38+
Name: "script_parse_errors_total",
39+
Help: "number of script executions that ended without error but produced unparseable output",
40+
}, []string{"script_name"})
41+
mTimeouts = prometheus.NewCounterVec(prometheus.CounterOpts{
42+
Name: "script_timeouts_total",
43+
Help: "number of script executions that were killed due to timeout",
44+
}, []string{"script_name"})
45+
mRunning = prometheus.NewGaugeVec(prometheus.GaugeOpts{
46+
Name: "script_running",
47+
Help: "number of executions ongoing",
48+
}, []string{"script_name"})
49+
)
50+
51+
func init() {
52+
prometheus.MustRegister(mDuration)
53+
prometheus.MustRegister(mConcExceeds)
54+
prometheus.MustRegister(mRuns)
55+
prometheus.MustRegister(mErrors)
56+
prometheus.MustRegister(mParseErrors)
57+
prometheus.MustRegister(mTimeouts)
58+
prometheus.MustRegister(mRunning)
59+
}
60+
61+
// A runresult describes the result of executing a script.
62+
type runresult struct {
63+
// Stdout of script invocation. Any stderr output results in an error.
64+
output string
65+
// Error resulting from script invocation, or nil.
66+
err error
67+
}
68+
69+
// A runreq is a request to run a script and capture its output
70+
type runreq struct {
71+
// Context to run in (allows for cancelling requests)
72+
ctx context.Context
73+
74+
// Script to run, relative to scriptPath.
75+
script string
76+
77+
// Result of running script.
78+
result chan runresult
79+
}
80+
81+
// ScriptHandler is the core of this app.
82+
type ScriptHandler struct {
83+
// if opentsdb is true, interpret script output as opentsdb text format instead of Prometheus'
84+
opentsdb bool
85+
86+
// Prefix of request path to strip off
87+
metricsPath string
88+
89+
// Filesystem path prefix to prepend to all scripts.
90+
scriptPath string
91+
92+
// Used internally to manage concurrency.
93+
reqchan chan runreq
94+
95+
// Max number of concurrent requests per script
96+
scriptWorkers int
97+
98+
// Max duration of any script invocation
99+
timeout time.Duration
100+
101+
// mtx must be locked before modifying any fields below it (preceding
102+
// fields are not supposed to be modifyied.)
103+
mtx sync.Mutex
104+
105+
// Count of running script invocations by script name.
106+
numChildren map[string]int
107+
}
108+
109+
func NewScriptHandler(metricsPath, scriptPath string, opentsdb bool, scriptWorkers int, timeout time.Duration) *ScriptHandler {
110+
return &ScriptHandler{
111+
metricsPath: metricsPath,
112+
scriptPath: scriptPath,
113+
opentsdb: opentsdb,
114+
numChildren: make(map[string]int),
115+
reqchan: make(chan runreq),
116+
scriptWorkers: scriptWorkers,
117+
timeout: timeout,
118+
}
119+
}
120+
121+
// ServeHTTP implements http.Handler. It handles incoming HTTP requests by
122+
// stripping off the metricsPath prefix, executing scriptPath + the remaining
123+
// script name, interpreting the output as metrics, then publishing the result
124+
// as a regular Prometheus metrics response.
125+
func (sh *ScriptHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
126+
script := strings.TrimPrefix(strings.TrimPrefix(r.URL.Path, sh.metricsPath), "/")
127+
if script == "" {
128+
promhttp.Handler().ServeHTTP(w, r)
129+
} else {
130+
reschan := make(chan runresult)
131+
ctx, _ := context.WithDeadline(r.Context(), time.Now().Add(sh.timeout))
132+
sh.reqchan <- runreq{script: script, result: reschan, ctx: ctx}
133+
result := <-reschan
134+
135+
if result.err != nil {
136+
log.Printf("error running script '%s': %v", script, result.err)
137+
} else if err := serveMetricsFromText(sh.opentsdb, w, r, result.output); err != nil {
138+
log.Printf("error parsing output from script '%s': %v", script, err)
139+
mParseErrors.WithLabelValues(script).Add(1)
140+
}
141+
}
142+
}
143+
144+
// Start will run forever, handling incoming runreqs.
145+
func (sh *ScriptHandler) Start() {
146+
for req := range sh.reqchan {
147+
sh.mtx.Lock()
148+
curChildCount := sh.numChildren[req.script]
149+
sh.mtx.Unlock()
150+
151+
if curChildCount >= sh.scriptWorkers {
152+
mConcExceeds.WithLabelValues(req.script).Add(1)
153+
err := fmt.Errorf("can't spawn a new instance of script '%s': already have %d running", req.script, curChildCount)
154+
req.result <- runresult{err: err}
155+
continue
156+
}
157+
158+
sh.mtx.Lock()
159+
sh.numChildren[req.script]++
160+
sh.mtx.Unlock()
161+
162+
mRunning.WithLabelValues(req.script).Add(1)
163+
164+
go func() {
165+
mRuns.WithLabelValues(req.script).Add(1)
166+
start := time.Now()
167+
ctx, _ := context.WithCancel(req.ctx)
168+
output, err := runCommand(ctx, path.Join(sh.scriptPath, req.script))
169+
elapsed := time.Since(start)
170+
mDuration.WithLabelValues(req.script).Add(float64(elapsed) / float64(time.Second))
171+
172+
if err != nil {
173+
mErrors.WithLabelValues(req.script).Add(1)
174+
}
175+
if err == context.DeadlineExceeded {
176+
mTimeouts.WithLabelValues(req.script).Add(1)
177+
}
178+
179+
sh.mtx.Lock()
180+
sh.numChildren[req.script]--
181+
sh.mtx.Unlock()
182+
mRunning.WithLabelValues(req.script).Add(-1)
183+
184+
req.result <- runresult{output: output, err: err}
185+
}()
186+
}
187+
}
188+
189+
func main() {
190+
var (
191+
listenAddress = flag.String("web.listen-address", ":9661",
192+
"Address on which to expose metrics and web interface.")
193+
metricsPath = flag.String("web.telemetry-path", "/metrics",
194+
"Path under which to expose metrics.")
195+
scriptPath = flag.String("script.path", "",
196+
"path under which scripts are located")
197+
opentsdb = flag.Bool("opentsdb", false,
198+
"expect opentsdb-format metrics from script output")
199+
timeout = flag.Duration("timeout", time.Minute,
200+
"how long a script can run before being cancelled")
201+
scworkers = flag.Int("script-workers", 1,
202+
"allow this many concurrent requests per script")
203+
)
204+
flag.Parse()
205+
206+
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
207+
w.Write([]byte(`<html>
208+
<head><title>Script Exporter</title></head>
209+
<body>
210+
<h1>Script Exporter</h1>
211+
<p><a href="` + *metricsPath + `">Metrics</a></p>
212+
</body>
213+
</html>`))
214+
})
215+
216+
sh := NewScriptHandler(*metricsPath, *scriptPath, *opentsdb, *scworkers, *timeout)
217+
go sh.Start()
218+
http.Handle(*metricsPath+"/", sh)
219+
http.Handle(*metricsPath, promhttp.Handler())
220+
221+
srv := &http.Server{Addr: *listenAddress, ReadTimeout: 5 * time.Second, WriteTimeout: 5 * time.Second}
222+
if err := srv.ListenAndServe(); err != nil {
223+
log.Fatalf("Unable to setup HTTP server: %v", err)
224+
}
225+
}

0 commit comments

Comments
 (0)
Please sign in to comment.