-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathproxy.py
389 lines (322 loc) · 11.4 KB
/
proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
from socket import socket, SOL_SOCKET, SO_REUSEADDR
from socket import error as socket_error
from select import select
from select import error as select_error
from subprocess import PIPE, Popen
import sys, os, time, traceback, errno, signal
import logging
import usrtrace
from packet_decoder import stateless_unpack as unpack
from packet_decoder import stateless_pack as pack
from packet_decoder import Packet
from config import *
conn_map = {} # Map from in_sock to out_sock (for both directions)
user_map = {} # Map from sock to user data
user_socks = set() # Collection of socks to users
read_buffers = {} # Map from fd to a read buffer
send_buffers = {} # Map from fd to a send buffer, if any.
listener = None # the main bound listen socket
ticks = 0 # Outstanding ticks to handle at next opportunity
plugins = []
def main():
global listener, ticks
listener = socket()
listener.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
listener.bind(LISTEN_ADDR)
listener.listen(128)
listener.setblocking(0)
logging.basicConfig(filename=LOG_FILE, level=LOG_LEVEL, format=LOG_FORMAT)
if DEBUG:
debug_handler = logging.StreamHandler() # defaults to stderr
debug_handler.setFormatter(logging.Formatter(LOG_FORMAT))
debug_handler.setLevel(logging.DEBUG)
logging.root.addHandler(debug_handler)
logging.info("Starting up")
if PASSTHROUGH:
passthrough_log = open(PASSTHROUGH_LOG_FILE, 'w')
import helpers # Hax before import does important hax
helpers.active_users = active_users
helpers.send_packet = send_packet
helpers.InvalidUserError = InvalidUserError
from plugins import plugins as _plugins # Lazy import prevents circular references
global plugins
plugins = _plugins
for plugin in plugins[:]: # Note that x[:] is a copy of x
try:
logging.debug("Loading plugin: %s", plugin)
if hasattr(plugin, 'on_start'):
plugin.on_start()
except Exception:
logging.exception("Error initialising plugin %s", plugin)
plugins.remove(plugin)
if not DEBUG:
print 'proxy: Daemonising...'
daemonise()
sys.stdout.close()
sys.stderr.close()
logging.debug("Started up")
def add_tick(sig, frame):
global ticks
ticks += 1
signal.signal(signal.SIGALRM, add_tick)
signal.setitimer(signal.ITIMER_REAL, TICK_INTERVAL, TICK_INTERVAL)
try:
while 1:
while ticks:
handle_tick()
ticks -= 1
try:
r, w, x = select(conn_map.keys() + [listener], send_buffers.keys(), [])
except select_error, ex:
ex_errno, ex_msg = ex.args
if ex_errno == errno.EINTR:
continue # This lets us handle any tick that may have been queued and retry
raise
dead = [] # Keeps track of fds in r, w that get dropped, so we know when not to bother.
for fd in w:
if fd in dead:
logging.debug("fd already down - skipping")
continue
buf = send_buffers[fd]
try:
n = fd.send(buf[:MAX_SEND])
except socket_error, ex:
if ex.errno == errno.EINTR:
n = 0
elif ex.errno in (errno.ECONNRESET, errno.EPIPE, errno.ENETDOWN, errno.ENETUNREACH, errno.ENOBUFS):
# These are all socket failure conditions, drop the connection
user = user_map[fd]
if ex.errno == errno.ECONNRESET and fd in user_socks: # User CONNRESET is ok, means user closed program or lost conn. Server CONNRESET is NOT.
logging.info("Connection from %s closed by connection reset", user)
else:
logging.warning("Dropping connection for %s due to send error to %s", user, "user" if fd in user_socks else "server", exc_info=1)
dead += [fd, conn_map[fd]]
drop_connection(user)
continue
else:
raise
assert n <= len(buf)
if n < len(buf):
send_buffers[fd] = buf[n:]
else:
del send_buffers[fd]
for fd in r:
if fd in dead:
logging.debug("fd already down - skipping")
continue
if fd is listener:
new_connection()
continue
buf = read_buffers[fd]
to_server = (fd in user_socks)
user = user_map[fd]
logging.debug("Buffer before read: length %d", len(buf))
try:
read = fd.recv(MAX_RECV)
except socket_error, ex:
if ex.errno == errno.EINTR:
continue
if ex.errno in (errno.ECONNRESET, errno.ETIMEDOUT, errno.ENOBUFS, errno.ENOMEM):
# These are all socket failure conditions, drop the connection
logging.warning("Dropping connection for %s due to recv error from %s", user, "user" if to_server else "server", exc_info=1)
dead += [fd, conn_map[fd]]
drop_connection(user)
continue
if not read:
# Empty read means EOF
if to_server:
logging.info("Connection from %s closed", user)
else:
logging.info("Server connection for %s closed", user)
dead += [fd, conn_map[fd]]
drop_connection(user)
continue
# logging.debug("Read %s server for %s: %s", "to" if to_server else "from", user, repr(read))
buf += read
logging.debug("Buffer after read: length %d", len(buf))
# Decode as many packets as we can
while 1:
if PASSTHROUGH:
if not buf:
break
out_bytestr = buf
logging.info("Passing through %s", repr(buf))
passthrough_log.write(buf)
passthrough_log.flush()
buf = ''
else:
try:
packet, buf = unpack(buf, to_server)
except Exception: # Undefined exception inherited from packet_decoder
logging.exception("Bad packet %s %s:\n%s", "from" if to_server else "to", user, hexdump(buf))
logging.warning("Dropping connection for %s due to bad packet from %s", user, "user" if to_server else "server")
dead += [fd, conn_map[fd]]
drop_connection(user)
break
if packet is None:
# Couldn't decode, need more read first - we're done here.
break
# logging.debug("%s server for %s: %s", "to" if to_server else "from", user, packet)
packets = handle_packet(packet, user, to_server)
packed = []
for packet in packets:
try:
packed.append(pack(packet, to_server))
except Exception: # Undefined exception inherited from packet_decoder
logging.warning("Bad packet object while packing packet %s %s: %s", "from" if to_server else "to", user, packet, exc_info=1)
out_bytestr = ''.join(packed)
# Append resulting bytestr to write buffer, to be sent later.
send_fd = conn_map[fd]
write_buf = send_buffers.get(send_fd, '')
write_buf += out_bytestr
send_buffers[send_fd] = write_buf
if fd not in dead:
logging.debug("Buffer after decode: length %d", len(buf))
read_buffers[fd] = buf
except (Exception, KeyboardInterrupt):
logging.critical("Unhandled exception", exc_info=1)
listener.close()
sys.exit(1)
def drop_connection(user):
user_fd = user.user_sock
srv_fd = user.srv_sock
repeat_on_eintr(user_fd.close)
repeat_on_eintr(srv_fd.close)
del conn_map[user_fd]
del conn_map[srv_fd]
del user_map[user_fd]
del user_map[srv_fd]
del read_buffers[user_fd]
del read_buffers[srv_fd]
user_socks.remove(user_fd)
if user_fd in send_buffers:
del send_buffers[user_fd]
if srv_fd in send_buffers:
del send_buffers[srv_fd]
logging.info("Removed socket pair for %s", user)
def new_connection():
user_sock, addr = repeat_on_eintr(listener.accept)
logging.info("New connection from address %s", str(addr))
# Setup objects
srv_sock = socket()
user = User(addr=addr, user_sock=user_sock, srv_sock=srv_sock)
repeat_on_eintr(lambda: srv_sock.connect(SERVER_ADDR))
repeat_on_eintr(lambda: user_sock.setblocking(0))
repeat_on_eintr(lambda: srv_sock.setblocking(0))
# Add things to global data structures
user_map[user_sock] = user
user_map[srv_sock] = user
conn_map[user_sock] = srv_sock
conn_map[srv_sock] = user_sock
user_socks.add(user_sock)
read_buffers[user_sock] = ''
read_buffers[srv_sock] = ''
logging.debug("Now accepting packets from address %s", str(addr))
def daemonise():
"""Detach from current session and run in background."""
sys.stdin.close()
sys.stdout.close()
sys.stderr.close()
os.chdir("/")
if os.fork():
sys.exit(0)
if os.fork():
sys.exit(0)
def repeat_on_eintr(fn):
while 1:
try:
return fn()
except socket_error, ex:
if ex.errno != errno.EINTR:
raise
def handle_tick():
for plugin in plugins:
if hasattr(plugin, 'on_tick'):
try:
plugin.on_tick(set(user_map.values()))
except Exception:
logging.exception("Error in plugin %s" % plugin)
def handle_packet(packet, user, to_server):
"""
packet: The packet object recieved
to_server: True if packet is user->server, else False.
addr: The user packet is being sent from/to.
Return a list of packet objects to send to out stream (normally [the same packet])"""
ispacket = lambda x: isinstance(x, Packet)
packets = [packet]
for plugin in filter(lambda p: hasattr(p, 'on_packet'), plugins):
old_packets = packets
packets = []
for packet in old_packets:
try:
ret = plugin.on_packet(packet, user, to_server)
if type(ret) == list:
assert all(ispacket(x) for x in ret), "Return value not list of packets: %s" % repr(ret)
packets += ret
elif ispacket(ret):
packets.append(ret)
else:
assert False, "Return value not packet or list: %s" % repr(ret)
except Exception:
logging.exception("Error in plugin %s" % plugin)
packets.append(packet)
return packets
def send_packet(packet, user, to_server):
"""Takes packet, user object and whether to send to server (as though from user) or vice versa.
Simulates that kind of packet having been recived and passes it on as normal,
ie. a packet still goes through the whole list of plugins.
Raises InvalidUserError if user no longer exists.
"""
if user not in user_map.values():
raise InvalidUserError(user)
packets = handle_packet(packet, user, to_server)
try:
out_bytestr = ''.join([pack(packet, to_server) for packet in packets])
except Exception: # Undefined exception inherited from packet_decoder
logging.exception("Bad packet object while packing generated packet %s %s: %s", "from" if to_server else "to", user, packet)
raise # Will be caught as a failure of the plugin sending it.
fd = user.srv_sock if to_server else user.user_sock
write_buf = send_buffers.get(fd, '')
write_buf += out_bytestr
send_buffers[fd] = write_buf
def hexdump(s):
"""Returns a string representation of a bytestring"""
STEP = 18 # Optimal per-line for screen width 80
PRINTING = [chr(n) for n in range(32,127)]
result = ''
for offset in range(0, len(s), STEP):
slice = s[offset:offset+STEP]
result += ' '.join(['%02x' % ord(c) for c in slice])
result += ' '
result += ''.join([c if c in PRINTING else '.' for c in slice])
result += '\n'
return result
class User(object):
"""An object representing a user. Should always contain an addr = (ip, port).
Should also have user_sock and srv_sock.
May contain other things eg. username.
Add fields by writing to them, eg. user.username = "example"
"""
def __init__(self, **kwargs):
self.__dict__ = kwargs
def __str__(self):
d = self.__dict__ # I'm lazy and don't like underscores
if 'addr' not in d:
return repr(self)
elif 'username' not in d:
return "<unknown>@%s:%s" % self.addr
else:
return "%s@%s:%s" % ((self.username,) + self.addr)
class InvalidUserError(Exception):
pass
def active_users():
"""A hack to allow a "logged in" helper"""
global user_map
names = []
for user in user_map.values():
if hasattr(user,'username'):
names.append(user.username)
return names
if __name__=='__main__':
# Python idiom meaning: if not imported, but run directly:
main()