-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse-samlrequest-logs
executable file
·114 lines (90 loc) · 3.16 KB
/
parse-samlrequest-logs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python3
"""
parse-samlrequest-logs
This script searches web server logs for requests with a SAMLRequest parameter.
Matching log entries have their SAMLRequest parameter parsed, and the following
data reported.
1. Date and time of the request
2. Server hostname, if it could be extracted from the log file path
3. Issuer entity ID
4. Time between the SAML request being issued and the web request
5. HTTP status code
Usage:
parse-samlrequest-logs LOG...
"""
import base64
import defusedxml.ElementTree
import re
import urllib.parse
import zlib
from datetime import datetime
from docopt import docopt
host_regex = r'/(ip-[^\./]+)'
saml_request_regex = r'SAMLRequest=([^& ]+)'
timestamp_regex = r'\[(../.../....:..:..:.. .....)\]'
xmlns = {
'assertion': 'urn:oasis:names:tc:SAML:2.0:assertion',
'protocol': 'urn:oasis:names:tc:SAML:2.0:assertion',
}
args = docopt(__doc__)
entries = []
max_timestamp_length = 1
max_host_length = 1
max_sp_length = 1
max_delta_seconds_length = 1
for file in args['LOG']:
log = open(file, 'r')
for entry in log:
host = 'unknown'
sp = 'unknown'
timestamp = '1970/01/01:00:00:00 UTC'
status_code = entry.split(' ')[-2]
match = re.search(saml_request_regex, entry)
if match:
saml_request = zlib.decompress(base64.b64decode(urllib.parse.unquote(match[1])), -15).decode('utf-8')
element = defusedxml.ElementTree.fromstring(saml_request, forbid_dtd=True)
issue_instant = element.attrib['IssueInstant']
issuer = element.find('assertion:Issuer', xmlns)
if issuer is not None:
sp = issuer.text
else:
continue
match = re.search(timestamp_regex, entry)
if match:
timestamp = match[1]
match = re.search(host_regex, file)
if match:
host = match[1]
log_dt = datetime.strptime(timestamp, '%d/%b/%Y:%H:%M:%S %z')
issue_dt = datetime.strptime(issue_instant, '%Y-%m-%dT%H:%M:%S%z')
time_delta = log_dt - issue_dt
row = {
'host': host,
'sp': sp,
'since_epoch': log_dt.timestamp(),
'timestamp': timestamp.split(' ')[0],
'delta_seconds': int(time_delta.total_seconds()),
'status_code': status_code,
}
if len(row['timestamp']) > max_timestamp_length:
max_timestamp_length = len(row['timestamp'])
if len(row['host']) > max_host_length:
max_host_length = len(row['host'])
if len(row['sp']) > max_sp_length:
max_sp_length = len(row['sp'])
if len(str(row['delta_seconds'])) > max_delta_seconds_length:
max_delta_seconds_length = len(str(row['delta_seconds']))
entries.append(row)
log.close()
output_line = '{:%d} {:%d} {:%d} {:%d} {:3}' % (
max_timestamp_length,
max_host_length,
max_sp_length,
max_delta_seconds_length)
for row in sorted(entries, key=lambda x: x['since_epoch']):
print(output_line.format(
row['timestamp'],
row['host'],
row['sp'],
row['delta_seconds'],
row['status_code']))