-
Notifications
You must be signed in to change notification settings - Fork 46
/
Copy pathevent_processor.py
393 lines (338 loc) · 13.7 KB
/
event_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
from __future__ import absolute_import
from collections import namedtuple
from email.utils import parsedate
import errno
import jsonpickle
import pylru
from threading import Event, Lock, Thread
import time
# noinspection PyBroadException
try:
import queue
except:
# noinspection PyUnresolvedReferences,PyPep8Naming
import Queue as queue
import requests
from requests.packages.urllib3.exceptions import ProtocolError
import six
from ldclient.event_summarizer import EventSummarizer
from ldclient.fixed_thread_pool import FixedThreadPool
from ldclient.user_filter import UserFilter
from ldclient.interfaces import EventProcessor
from ldclient.repeating_timer import RepeatingTimer
from ldclient.util import _headers
from ldclient.util import log
__MAX_FLUSH_THREADS__ = 5
__CURRENT_EVENT_SCHEMA__ = 3
class NullEventProcessor(EventProcessor):
def __init__(self):
pass
def start(self):
pass
def stop(self):
pass
def is_alive(self):
return False
def send_event(self, event):
pass
def flush(self):
pass
EventProcessorMessage = namedtuple('EventProcessorMessage', ['type', 'param'])
class EventOutputFormatter(object):
def __init__(self, config):
self._inline_users = config.inline_users_in_events
self._user_filter = UserFilter(config)
def make_output_events(self, events, summary):
events_out = [ self.make_output_event(e) for e in events ]
if len(summary.counters) > 0:
events_out.append(self.make_summary_event(summary))
return events_out
def make_output_event(self, e):
kind = e['kind']
if kind == 'feature':
is_debug = e.get('debug')
out = {
'kind': 'debug' if is_debug else 'feature',
'creationDate': e['creationDate'],
'key': e['key'],
'version': e.get('version'),
'variation': e.get('variation'),
'value': e.get('value'),
'default': e.get('default'),
'prereqOf': e.get('prereqOf')
}
if self._inline_users or is_debug:
out['user'] = self._user_filter.filter_user_props(e['user'])
else:
out['userKey'] = e['user'].get('key')
return out
elif kind == 'identify':
return {
'kind': 'identify',
'creationDate': e['creationDate'],
'key': e['user'].get('key'),
'user': self._user_filter.filter_user_props(e['user'])
}
elif kind == 'custom':
out = {
'kind': 'custom',
'creationDate': e['creationDate'],
'key': e['key'],
'data': e.get('data')
}
if self._inline_users:
out['user'] = self._user_filter.filter_user_props(e['user'])
else:
out['userKey'] = e['user'].get('key')
return out
elif kind == 'index':
return {
'kind': 'index',
'creationDate': e['creationDate'],
'user': self._user_filter.filter_user_props(e['user'])
}
else:
return e
"""
Transform summarizer data into the format used for the event payload.
"""
def make_summary_event(self, summary):
flags_out = dict()
for ckey, cval in summary.counters.items():
flag_key, variation, version = ckey
flag_data = flags_out.get(flag_key)
if flag_data is None:
flag_data = { 'default': cval['default'], 'counters': [] }
flags_out[flag_key] = flag_data
counter = {
'count': cval['count'],
'value': cval['value']
}
if variation is not None:
counter['variation'] = variation
if version is None:
counter['unknown'] = True
else:
counter['version'] = version
flag_data['counters'].append(counter)
return {
'kind': 'summary',
'startDate': summary.start_date,
'endDate': summary.end_date,
'features': flags_out
}
class EventPayloadSendTask(object):
def __init__(self, session, config, formatter, payload, response_fn):
self._session = session
self._config = config
self._formatter = formatter
self._payload = payload
self._response_fn = response_fn
def run(self):
try:
output_events = self._formatter.make_output_events(self._payload.events, self._payload.summary)
resp = self._do_send(output_events, True)
if resp is not None:
self._response_fn(resp)
except Exception:
log.warning(
'Unhandled exception in event processor. Analytics events were not processed.',
exc_info=True)
def _do_send(self, output_events, should_retry):
# noinspection PyBroadException
try:
json_body = jsonpickle.encode(output_events, unpicklable=False)
log.debug('Sending events payload: ' + json_body)
hdrs = _headers(self._config.sdk_key)
hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__)
uri = self._config.events_uri
r = self._session.post(uri,
headers=hdrs,
timeout=(self._config.connect_timeout, self._config.read_timeout),
data=json_body)
r.raise_for_status()
return r
except ProtocolError as e:
if e.args is not None and len(e.args) > 1 and e.args[1] is not None:
inner = e.args[1]
if inner.errno is not None and inner.errno == errno.ECONNRESET and should_retry:
log.warning(
'ProtocolError exception caught while sending events. Retrying.')
self._do_send(output_events, False)
else:
log.warning(
'Unhandled exception in event processor. Analytics events were not processed.',
exc_info=True)
except Exception:
log.warning(
'Unhandled exception in event processor. Analytics events were not processed.',
exc_info=True)
FlushPayload = namedtuple('FlushPayload', ['events', 'summary'])
class EventBuffer(object):
def __init__(self, capacity):
self._capacity = capacity
self._events = []
self._summarizer = EventSummarizer()
self._exceeded_capacity = False
def add_event(self, event):
if len(self._events) >= self._capacity:
if not self._exceeded_capacity:
log.warning("Event queue is full-- dropped an event")
self._exceeded_capacity = True
else:
self._events.append(event)
self._exceeded_capacity = False
def add_to_summary(self, event):
self._summarizer.summarize_event(event)
def get_payload(self):
return FlushPayload(self._events, self._summarizer.snapshot())
def clear(self):
self._events = []
self._summarizer.clear()
class EventDispatcher(object):
def __init__(self, queue, config, session):
self._queue = queue
self._config = config
self._session = requests.Session() if session is None else session
self._close_session = (session is None) # so we know whether to close it later
self._disabled = False
self._buffer = EventBuffer(config.events_max_pending)
self._user_keys = pylru.lrucache(config.user_keys_capacity)
self._formatter = EventOutputFormatter(config)
self._last_known_past_time = 0
self._flush_workers = FixedThreadPool(__MAX_FLUSH_THREADS__, "ldclient.flush")
self._main_thread = Thread(target=self._run_main_loop)
self._main_thread.daemon = True
self._main_thread.start()
def _run_main_loop(self):
log.info("Starting event processor")
while True:
try:
message = self._queue.get(block=True)
if message.type == 'event':
self._process_event(message.param)
elif message.type == 'flush':
self._trigger_flush()
elif message.type == 'flush_users':
self._user_keys.clear()
elif message.type == 'test_sync':
self._flush_workers.wait()
message.param.set()
elif message.type == 'stop':
self._do_shutdown()
message.param.set()
return
except Exception:
log.error('Unhandled exception in event processor', exc_info=True)
self._session.close()
def _process_event(self, event):
if self._disabled:
return
# Always record the event in the summarizer.
self._buffer.add_to_summary(event)
# Decide whether to add the event to the payload. Feature events may be added twice, once for
# the event (if tracked) and once for debugging.
add_full_event = False
add_debug_event = False
add_index_event = False
if event['kind'] == "feature":
add_full_event = event['trackEvents']
add_debug_event = self._should_debug_event(event)
else:
add_full_event = True
# For each user we haven't seen before, we add an index event - unless this is already
# an identify event for that user.
if not (add_full_event and self._config.inline_users_in_events):
user = event.get('user')
if user and not self.notice_user(user):
if event['kind'] != 'identify':
add_index_event = True
if add_index_event:
ie = { 'kind': 'index', 'creationDate': event['creationDate'], 'user': user }
self._buffer.add_event(ie)
if add_full_event:
self._buffer.add_event(event)
if add_debug_event:
debug_event = event.copy()
debug_event['debug'] = True
self._buffer.add_event(debug_event)
# Add to the set of users we've noticed, and return true if the user was already known to us.
def notice_user(self, user):
if user is None or 'key' not in user:
return False
key = user['key']
if key in self._user_keys:
self._user_keys[key] # refresh cache item
return True
self._user_keys[key] = True
return False
def _should_debug_event(self, event):
debug_until = event.get('debugEventsUntilDate')
if debug_until is not None:
last_past = self._last_known_past_time
now = int(time.time() * 1000)
if debug_until > last_past and debug_until > now:
return True
return False
def _trigger_flush(self):
if self._disabled:
return
payload = self._buffer.get_payload()
if len(payload.events) > 0 or len(payload.summary.counters) > 0:
task = EventPayloadSendTask(self._session, self._config, self._formatter, payload,
self._handle_response)
if self._flush_workers.execute(task.run):
# The events have been handed off to a flush worker; clear them from our buffer.
self._buffer.clear()
else:
# We're already at our limit of concurrent flushes; leave the events in the buffer.
pass
def _handle_response(self, r):
server_date_str = r.headers.get('Date')
if server_date_str is not None:
server_date = parsedate(server_date_str)
if server_date is not None:
timestamp = int(time.mktime(server_date) * 1000)
self._last_known_past_time = timestamp
if r.status_code == 401:
log.error('Received 401 error, no further events will be posted since SDK key is invalid')
self._disabled = True
return
def _do_shutdown(self):
self._flush_workers.stop()
self._flush_workers.wait()
if self._close_session:
self._session.close()
class DefaultEventProcessor(EventProcessor):
def __init__(self, config, session=None):
self._queue = queue.Queue(config.events_max_pending)
self._flush_timer = RepeatingTimer(config.flush_interval, self.flush)
self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users)
self._flush_timer.start()
self._users_flush_timer.start()
self._close_lock = Lock()
self._closed = False
EventDispatcher(self._queue, config, session)
def send_event(self, event):
event['creationDate'] = int(time.time() * 1000)
self._queue.put(EventProcessorMessage('event', event))
def flush(self):
self._queue.put(EventProcessorMessage('flush', None))
def stop(self):
with self._close_lock:
if self._closed:
return
self._closed = True
self._flush_timer.stop()
self._users_flush_timer.stop()
self.flush()
self._post_message_and_wait('stop')
def _flush_users(self):
self._queue.put(EventProcessorMessage('flush_users', None))
# Used only in tests
def _wait_until_inactive(self):
self._post_message_and_wait('test_sync')
def _post_message_and_wait(self, type):
reply = Event()
self._queue.put(EventProcessorMessage(type, reply))
reply.wait()