-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathparsing.py
152 lines (132 loc) · 5.34 KB
/
parsing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import json
import argparse
import zipfile
import pathlib
from zipfile import ZipFile
import re
import glob
import importlib
import importlib.util
import subprocess
from subprocess import call
from concurrent.futures import ThreadPoolExecutor, wait
import shutil
import gc
import boto3
from boto3 import Session
import requests
import datetime
from constants import FILE_TYPES
import secretvalidation
def hunt(threads, deldownloads, getversions, profile=None):
"""
Main function
Sets the stage for everything!
Variables -
threads: number of threads for downloads
deldownloads: YES or NO
getversions: YES or NO
profile: the AWS profile lambdas are downloaded from
"""
sigs = getSigs()
print(profile)
if profile is None:
# if user doesn't supply a profile, we will grab every file within /loot and search through it.
rootdir = './loot'
print(rootdir)
print(glob.glob(rootdir + r'/*', recursive=False))
for file in glob.glob(rootdir + r'/*', recursive=False):
print(file)
threadSecrets(threads, deldownloads, file.split('/')[-1], sigs)
if profile is not None:
# user supplied single aws profile, lets roll
threadSecrets(threads, deldownloads, profile, sigs)
def getSigs():
#
# This will create an array of json signature objects to iterate through based on the files starting with sig_
# In future we can add a flag to ask for the file or naming scheme of the file and have the signatures loaded that way.
#______________________________________________________________________________
sigs = []
sigfiles = os.listdir(os.path.join(os.path.dirname(os.path.realpath(__file__)), "signatures"))
for sigfile in sigfiles:
try:
if sigfile.startswith('sig_'):
#pull in all sig files from the signature dir
sigfilePath = os.path.join(os.path.dirname(os.path.realpath(__file__)), "signatures/" + sigfile)
jsonSigs = json.load(open(sigfilePath))
for sigType in jsonSigs[0]["sigs"]:
sigs.append(sigType)
except Exception as e:
print("Failed to import sigfiles." + str(e))
return(sigs)
def threadSecrets(threads, deldownloads, profile, sigs):
"""
Variables -
threads: number of threads for downloads
deldownloads: YES or NO
profile: the AWS profile to interact with
sigs: Json values of all the signatures
"""
print("Scanning for Secrets in " + profile)
rootdir = './loot'
files = glob.glob(rootdir + r'/' + profile + '/*/*.zip', recursive=True)
files = files + glob.glob(rootdir + r'/' + profile + '/*.zip', recursive=True)
for f in files:
#print(f)
checkSecrets(f, deldownloads, profile, sigs)
def regexChecker(pattern, fileread):
#print('regexChecker()')
returnvalue = re.finditer(b"%b" % pattern.encode(), fileread, re.MULTILINE | re.IGNORECASE)
return returnvalue
def checkSecrets(f,deldownloads, profile, sigs):
"""
Search through lambda zip for secrets based on signatures
Variables -
f: zip file to search through
"""
print("checkSecrets()")
print(f) # nice looking single line per file
with ZipFile(f, "r") as inzip:
for name in inzip.namelist():
if pathlib.Path(name).suffix in FILE_TYPES:
try:
with inzip.open(name) as ziptry:
pass
del ziptry
except Exception as e:
with open(f'./logs/failures.log', "a") as code:
code.write(f"Failed to read zipfile, {profile}, {f}, " + str(e) + "\n")
continue
with inzip.open(name) as zipfile:
try:
a = zipfile.read()
except Exception as e:
with open(f'./logs/failures.log', "a") as code:
code.write(f"Failed to read zipfile, {profile}, {f}, " + str(e) + "\n")
continue
for sigType in sigs:
if sigType['type'] == 'regex':
for outp in regexChecker(sigType['pattern'], a):
start = outp.span()[0]
line_no = a[:start].count(b"\n") + 1
try:
output = str(outp.group(), 'UTF-8')
except:
print("Way too ugly...moving on")
#print(output)
secretvalidation.validate(profile, {
'zip': f,
'name': name,
'description': sigType['caption'],
'output': output.strip('"').strip("'").strip(' '),
'line_no': line_no,
'fileread': a,
'pattern': sigType['pattern']
})
else:
continue
del a
gc.collect()
if deldownloads:
os.remove(f)