-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_alerts.py
387 lines (311 loc) · 12.2 KB
/
test_alerts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
#!/usr/bin/python3
#
# Copyright 2024 Wikimedia Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Use pytest to validate/test alerting rules files and their tests.
# For each directory of ALERTSDIR the following tests are ran:
# - each *_test.yaml file is a alerts unit test file, validate it with promtool
# - each non-test *.yaml file is an alerting rule file, validate it with promtool
# - additionally, each alerting rule file is checked for missing labels and annotations
import fnmatch
import os
import pathlib
import re
import string
import subprocess
import urllib
import warnings
from pathlib import Path
from urllib.parse import quote, unquote
import pytest
import requests
import yaml
SUBDIRS = [
x
for x in os.listdir(os.environ.get("ALERTSDIR", "."))
if os.path.isdir(x) and not x.startswith(".")
]
EXT_LABELS_RE = re.compile(r"{.*(prometheus|site)\s*[=!~]")
PROMETHEUS_INSTANCES = [
"analytics",
"cloud",
"ext",
"k8s",
"k8s-aux",
"k8s-dse",
"k8s-mlserve",
"k8s-mlstaging",
"k8s-staging",
"ops",
"services",
]
def all_testfiles(paths):
"""Return all files with alerting rules tests."""
files = []
for path in paths:
p = pathlib.Path(path)
files.extend(p.glob("**/*_test.yaml"))
return files
def all_rulefiles(paths):
"""Return all alerting rule files."""
files = []
for path in paths:
p = pathlib.Path(path)
non_test_files = set(p.glob("**/*.yaml")) - set(p.glob("**/*_test.yaml"))
files.extend(non_test_files)
# this is to get a reproducible list
return sorted(files)
def test_yml_extension():
for path in SUBDIRS:
p = pathlib.Path(path)
yml_files = set(p.glob("**/*.yml"))
assert yml_files == set(), "use yaml extension not yml"
@pytest.mark.parametrize("testfile", all_testfiles(SUBDIRS), ids=str)
def test_alerts(testfile):
"""Run alert unit tests for testfile."""
path = testfile.as_posix()
p = _run_promtool(["test", "rules", os.path.basename(path)], path)
assert p.returncode == 0, "promtool test rules failed: %s\n%s" % (
p.stdout,
p.stderr,
)
@pytest.mark.parametrize("testfile", all_testfiles(SUBDIRS), ids=str)
def test_rule_test_file_references(testfile):
"""Test if rule test reference existing files, and they are valid."""
test_yaml = yaml.load(testfile.read_text(), Loader=yaml.FullLoader)
assert "rule_files" in test_yaml, (
"'rule_files' not found in %s" % testfile.as_posix()
)
assert len(test_yaml["rule_files"]) == 1, (
"Multiple rule_files not allowed in %s" % testfile.as_posix()
)
rule_file = testfile.parent / test_yaml["rule_files"][0]
assert rule_file.exists(), "%s references non existing rule %s" % (
testfile.as_posix(),
rule_file.as_posix(),
)
assert (
f"{rule_file.stem}_test.yaml" == testfile.name
), "test files must be named after the rule file they are testing"
@pytest.mark.parametrize("rulefile", all_rulefiles(SUBDIRS), ids=str)
def test_valid_rule(rulefile):
"""Validate rulefile with promtool"""
path = rulefile.as_posix()
p = _run_promtool(["check", "rules", os.path.basename(path)], path)
assert p.returncode == 0, "promtool check rules failed: %s\n%s" % (
p.stdout,
p.stderr,
)
@pytest.mark.parametrize("rulefile", all_rulefiles(SUBDIRS), ids=str)
def test_lint_rule(rulefile):
"""Lint rulefile with pint"""
path = rulefile.as_posix()
p = subprocess.run(["pint", "lint", path], capture_output=True, encoding="utf8")
# Severe problem found, abort
assert p.returncode == 0, "pint lint failed: %s\n%s" % (
p.stdout,
p.stderr,
)
# Report less severe problems (with a filename:linenumber: prefix) as
# warnings
warn_line_re = re.compile(r"^.+:\d+: ")
for line in p.stderr.splitlines():
if warn_line_re.match(line):
warnings.warn(UserWarning(line))
@pytest.mark.parametrize("rulefile", all_rulefiles(SUBDIRS), ids=str)
def test_rule_metadata(rulefile):
"""Ensure rulefile has all the expected labels/annotations"""
alerts = yaml.load(rulefile.read_text(), Loader=yaml.FullLoader)
for group in alerts["groups"]:
for rule in group["rules"]:
# Consider only alerting rules, not recording rules
if "alert" not in rule:
continue
_validate_rule_metadata(rule)
@pytest.mark.parametrize("rulefile", all_rulefiles(SUBDIRS), ids=str)
def test_deploy_metadata(rulefile):
"""Ensure the file's 'deploy-tag' metadata is valid"""
tag = _get_tag(rulefile.read_text(), "deploy-tag")
assert (
tag is not None
), "Please pick Prometheus instance(s) via 'deploy-tag'. See docs at https://wikitech.wikimedia.org/wiki/Alertmanager#deploy-tag"
tags = re.split(r",\s*", tag)
if len(tags) > 1:
for value in ("global", "local"):
assert value not in tags, "%r not allowed with multiple tags" % value
for t in tags:
if t in ("global", "local"):
continue
if fnmatch.filter(PROMETHEUS_INSTANCES, t):
continue
assert False, (
"%r is not a valid value for deploy-tag. See docs at https://wikitech.wikimedia.org/wiki/Alertmanager#deploy-tag"
% t
)
@pytest.mark.parametrize("rulefile", all_rulefiles(SUBDIRS), ids=str)
def test_local_labels_references(rulefile):
"""Ensure non-global alerts don't reference external labels.
In this case the alert will never fire because external labels don't show up
when evaluating non-global (i.e Prometheus, not Thanos) alerts.
"""
tag = _get_tag(rulefile.read_text(), "deploy-tag")
if tag is None or tag == "global":
return
alerts = yaml.load(rulefile.read_text(), Loader=yaml.SafeLoader)
for group in alerts["groups"]:
for rule in group["rules"]:
# Consider only alerting rules, not recording rules
if "alert" not in rule:
continue
m = EXT_LABELS_RE.search(rule["expr"])
assert m is None, (
"Alert %s is not going to fire: external label reference in non-global alert (%s)"
% (
rule["alert"],
rulefile.as_posix(),
)
)
@pytest.mark.ci()
@pytest.mark.parametrize("rulefile", all_rulefiles(SUBDIRS), ids=str)
def test_runbook_exists(rulefile):
"""Ensure that if the alert has a runbook, it actually exists"""
groups = yaml.load(rulefile.read_text(), Loader=yaml.FullLoader)
for group in groups["groups"]:
for rule in group["rules"]:
runbook = rule.get("annotations", {}).get("runbook", None)
if runbook is not None and runbook != "TODO":
response = requests.get(runbook)
# Private runbooks are okay
if response.status_code == 401 and ".google.com" in response.url:
continue
assert response.status_code == 200 and response.text != "", (
f"Unable to fetch runbook {runbook}, please make sure that it exists and "
f"it's reachable. Got {response.status_code} with body {response.text!r}"
)
@pytest.mark.parametrize(
"rulefile",
[
rulefile
for rulefile in all_rulefiles(SUBDIRS)
if "team-wmcs" == rulefile.parent.parent.name
],
ids=str,
)
def test_wmcs_runbook_is_defined(rulefile):
"""Make sure that there's at least one runbook define for all WMCS alerts."""
groups = yaml.load(rulefile.read_text(), Loader=yaml.FullLoader)
for group in groups["groups"]:
for index, rule in enumerate(group["rules"]):
runbook = rule.get("annotations", {}).get("runbook", None)
assert (
runbook
), f"Rule #{index} - alertname:{rule['alert']} has no runbook defined, please add one."
def _get_tag(text, name):
"""Read tag 'name' from text's "header". The header ends when a non-comment or non-empty line, the
rest is ignored. Return None on tag not found."""
# FIXME Use format strings
tag_re = re.compile("^# *{name}: *(.+)$".format(name=name))
for line in text.splitlines():
m = tag_re.match(line)
if m:
return m.group(1)
# stop looking after comments and empty lines
if not line.startswith("#") and not line.startswith(" "):
return None
return None
def _validate_rule_metadata(rule):
required_labels = ("severity",)
required_annotations = ("summary", "description", "dashboard", "runbook")
labels = rule["labels"]
annotations = rule["annotations"]
alertname = rule["alert"]
for l in required_labels:
assert l in labels
if "group_left(team)" not in rule["expr"]:
assert (
"team" in labels
), '"team" label required, unless you are getting the team from "role_owner" metric + group_left(team)'
for a in required_annotations:
assert a in annotations, (
"Annotation %r not found for alert %r. Consider adding a string/URL or TODO."
% (a, alertname)
)
if rule["labels"]["severity"] == "page":
assert (
"#page" in rule["annotations"]["summary"]
), "severity is 'page' but summary does not contain #page"
if "#page" in rule["annotations"]["summary"]:
assert (
rule["labels"]["severity"] == "page"
), "summary contains #page but severity is not 'page'"
assert all([x not in alertname for x in string.whitespace]), (
"Alert names with spaces are hard to address and silence: %r" % alertname
)
for a in ("runbook", "dashboard"):
if a in annotations:
assert _url_is_quoted(annotations[a]), (
"URL in %s contains unquoted characters, check warnings for details" % a
)
assert (
"grafana-rw.wikimedia.org" not in rule["annotations"]["dashboard"]
), "Link to public dashboards on grafana.w.org, not grafana-rw.w.org"
def _untemplate(string):
"""Return string without golang text/template markers."""
return re.sub("{{.*}}", "", string)
def _url_is_quoted(url):
"""Check if url has been URL quoted. The check will ignore template markers since no quoting can be checked at this stage."""
u = urllib.parse.urlparse(url)
if not u.query or "=" not in u.query:
return True
# Basic split/parsing of qs variables, urllib.parse.parse_qs unquotes the result, which we don't want
qs_vars = u.query.split("&")
qs_pairs = [x.split("=", 1) for x in qs_vars]
for el in qs_pairs:
# qs variable with no value (e.g. 'fullscreen' in Grafana)
if len(el) == 1:
raw_name = el[0]
raw_value = None
else:
raw_name, raw_value = el
name = _untemplate(raw_name)
if quote(unquote(name)) != name:
warnings.warn(
UserWarning(
"Unquoted query string variable %r: expected %r"
% (name, quote(name))
)
)
return False
if raw_value is None:
continue
value = _untemplate(raw_value)
if quote(unquote(value)) != value:
warnings.warn(
UserWarning(
"Unquoted value for query string variable %r. Expected: %r Got: %r"
% (name, quote(value), value)
)
)
return False
return True
def _run_promtool(args, workdir):
args.insert(0, "promtool")
return subprocess.run(
args,
cwd=os.path.dirname(workdir),
capture_output=True,
encoding="utf8",
env={"PATH": os.environ.get("PATH", "")},
)