diff --git a/README.md b/README.md index 0e9674c3..01e2ffff 100644 --- a/README.md +++ b/README.md @@ -37,12 +37,9 @@ Your first feature flag else: # the code to run if the feature is off -Python 2.6 +Supported Python versions ---------- -Python 2.6 requires an extra dependency. Here's how to set it up: - -1. Use the `python2.6` extra in your requirements.txt: - `ldclient-py[python2.6]` +The SDK is tested with the most recent patch releases of Python 2.7, 3.3, 3.4, 3.5, and 3.6. Python 2.6 is no longer supported. Learn more ----------- diff --git a/ldclient/client.py b/ldclient/client.py index 14a87e04..22d63ea8 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -3,12 +3,12 @@ import hashlib import hmac import threading -import time import requests from builtins import object from ldclient.config import Config as Config +from ldclient.event_processor import NullEventProcessor from ldclient.feature_requester import FeatureRequesterImpl from ldclient.flag import evaluate from ldclient.polling import PollingUpdateProcessor @@ -21,7 +21,7 @@ import queue except: # noinspection PyUnresolvedReferences,PyPep8Naming - import Queue as queue + import Queue as queue # Python 3 from cachecontrol import CacheControl from threading import Lock @@ -43,46 +43,46 @@ def __init__(self, sdk_key=None, config=None, start_wait=5): self._config._validate() self._session = CacheControl(requests.Session()) - self._queue = queue.Queue(self._config.events_max_pending) - self._event_consumer = None + self._event_processor = None self._lock = Lock() self._store = self._config.feature_store """ :type: FeatureStore """ + if self._config.offline or not self._config.send_events: + self._event_processor = NullEventProcessor() + else: + self._event_processor = self._config.event_processor_class(self._config) + if self._config.offline: log.info("Started LaunchDarkly Client in offline mode") return - if self._config.send_events: - self._event_consumer = self._config.event_consumer_class(self._queue, self._config) - self._event_consumer.start() - if self._config.use_ldd: log.info("Started LaunchDarkly Client in LDD mode") return - if self._config.feature_requester_class: - self._feature_requester = self._config.feature_requester_class(self._config) - else: - self._feature_requester = FeatureRequesterImpl(self._config) - """ :type: FeatureRequester """ - update_processor_ready = threading.Event() if self._config.update_processor_class: log.info("Using user-specified update processor: " + str(self._config.update_processor_class)) self._update_processor = self._config.update_processor_class( - self._config, self._feature_requester, self._store, update_processor_ready) + self._config, self._store, update_processor_ready) else: + if self._config.feature_requester_class: + feature_requester = self._config.feature_requester_class(self._config) + else: + feature_requester = FeatureRequesterImpl(self._config) + """ :type: FeatureRequester """ + if self._config.stream: self._update_processor = StreamingUpdateProcessor( - self._config, self._feature_requester, self._store, update_processor_ready) + self._config, feature_requester, self._store, update_processor_ready) else: log.info("Disabling streaming API") log.warn("You should only disable the streaming API if instructed to do so by LaunchDarkly support") self._update_processor = PollingUpdateProcessor( - self._config, self._feature_requester, self._store, update_processor_ready) + self._config, feature_requester, self._store, update_processor_ready) """ :type: UpdateProcessor """ self._update_processor.start() @@ -102,19 +102,13 @@ def close(self): log.info("Closing LaunchDarkly client..") if self.is_offline(): return - if self._event_consumer and self._event_consumer.is_alive(): - self._event_consumer.stop() + if self._event_processor: + self._event_processor.stop() if self._update_processor and self._update_processor.is_alive(): self._update_processor.stop() def _send_event(self, event): - if self._config.offline or not self._config.send_events: - return - event['creationDate'] = int(time.time() * 1000) - if self._queue.full(): - log.warning("Event queue is full-- dropped an event") - else: - self._queue.put(event) + self._event_processor.send_event(event) def track(self, event_name, user, data=None): self._sanitize_user(user) @@ -135,9 +129,9 @@ def is_initialized(self): return self.is_offline() or self._config.use_ldd or self._update_processor.initialized() def flush(self): - if self._config.offline or not self._config.send_events: + if self._config.offline: return - return self._event_consumer.flush() + return self._event_processor.flush() def toggle(self, key, user, default): log.warn("Deprecated method: toggle() called. Use variation() instead.") @@ -145,14 +139,16 @@ def toggle(self, key, user, default): def variation(self, key, user, default): default = self._config.get_default(key, default) - self._sanitize_user(user) + if user is not None: + self._sanitize_user(user) if self._config.offline: return default def send_event(value, version=None): - self._send_event({'kind': 'feature', 'key': key, - 'user': user, 'value': value, 'default': default, 'version': version}) + self._send_event({'kind': 'feature', 'key': key, 'user': user, 'variation': None, + 'value': value, 'default': default, 'version': version, + 'trackEvents': False, 'debugEventsUntilDate': None}) if not self.is_initialized(): if self._store.initialized: @@ -163,12 +159,7 @@ def send_event(value, version=None): send_event(default) return default - if user is None or user.get('key') is None: - log.warn("Missing user or user key when evaluating Feature Flag key: " + key + ". Returning default.") - send_event(default) - return default - - if user.get('key', "") == "": + if user is not None and user.get('key', "") == "": log.warn("User key is blank. Flag evaluation will proceed, but the user will not be stored in LaunchDarkly.") def cb(flag): @@ -182,6 +173,7 @@ def cb(flag): except Exception as e: log.error("Exception caught in variation: " + e.message + " for flag key: " + key + " and user: " + str(user)) + send_event(default) return default @@ -191,14 +183,22 @@ def _evaluate(self, flag, user): return evaluate(flag, user, self._store) def _evaluate_and_send_events(self, flag, user, default): - value, events = self._evaluate(flag, user) - for event in events or []: - self._send_event(event) - - if value is None: + if user is None or user.get('key') is None: + log.warn("Missing user or user key when evaluating Feature Flag key: " + flag.get('key') + ". Returning default.") value = default + variation = None + else: + result = evaluate(flag, user, self._store) + for event in result.events or []: + self._send_event(event) + value = default if result.value is None else result.value + variation = result.variation + self._send_event({'kind': 'feature', 'key': flag.get('key'), - 'user': user, 'value': value, 'default': default, 'version': flag.get('version')}) + 'user': user, 'variation': variation, 'value': value, + 'default': default, 'version': flag.get('version'), + 'trackEvents': flag.get('trackEvents'), + 'debugEventsUntilDate': flag.get('debugEventsUntilDate')}) return value def all_flags(self, user): @@ -227,7 +227,7 @@ def cb(all_flags): return self._store.all(FEATURES, cb) def _evaluate_multi(self, user, flags): - return dict([(k, self._evaluate(v, user)[0]) for k, v in flags.items() or {}]) + return dict([(k, self._evaluate(v, user).value) for k, v in flags.items() or {}]) def secure_mode_hash(self, user): if user.get('key') is None or self._config.sdk_key is None: diff --git a/ldclient/config.py b/ldclient/config.py index 8abd96a8..b730fb09 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -1,4 +1,4 @@ -from ldclient.event_consumer import EventConsumerImpl +from ldclient.event_processor import DefaultEventProcessor from ldclient.feature_store import InMemoryFeatureStore from ldclient.util import log @@ -13,8 +13,8 @@ def __init__(self, events_uri='https://events.launchdarkly.com', connect_timeout=10, read_timeout=15, - events_upload_max_batch_size=100, events_max_pending=10000, + flush_interval=5, stream_uri='https://stream.launchdarkly.com', stream=True, verify_ssl=True, @@ -26,10 +26,13 @@ def __init__(self, use_ldd=False, feature_store=InMemoryFeatureStore(), feature_requester_class=None, - event_consumer_class=None, + event_processor_class=None, private_attribute_names=(), all_attributes_private=False, - offline=False): + offline=False, + user_keys_capacity=1000, + user_keys_flush_interval=300, + inline_users_in_events=False): """ :param string sdk_key: The SDK key for your LaunchDarkly account. :param string base_uri: The base URL for the LaunchDarkly server. Most users should use the default @@ -43,6 +46,8 @@ def __init__(self, :param int events_max_pending: The capacity of the events buffer. The client buffers up to this many events in memory before flushing. If the capacity is exceeded before the buffer is flushed, events will be discarded. + : param float flush_interval: The number of seconds in between flushes of the events buffer. Decreasing + the flush interval means that the event buffer is less likely to reach capacity. :param string stream_uri: The URL for the LaunchDarkly streaming events server. Most users should use the default value. :param bool stream: Whether or not the streaming API should be used to receive flag updates. By @@ -66,10 +71,17 @@ def __init__(self, private, not just the attributes specified in `private_attribute_names`. :param feature_store: A FeatureStore implementation :type feature_store: FeatureStore + :param int user_keys_capacity: The number of user keys that the event processor can remember at any + one time, so that duplicate user details will not be sent in analytics events. + :param float user_keys_flush_interval: The interval in seconds at which the event processor will + reset its set of known user keys. + :param bool inline_users_in_events: Whether to include full user details in every analytics event. + By default, events will only include the user key, except for one "index" event that provides the + full details for the user. :param feature_requester_class: A factory for a FeatureRequester implementation taking the sdk key and config :type feature_requester_class: (str, Config, FeatureStore) -> FeatureRequester - :param event_consumer_class: A factory for an EventConsumer implementation taking the event queue, sdk key, and config - :type event_consumer_class: (queue.Queue, str, Config) -> EventConsumer + :param event_processor_class: A factory for an EventProcessor implementation taking the config + :type event_processor_class: (Config) -> EventProcessor :param update_processor_class: A factory for an UpdateProcessor implementation taking the sdk key, config, and FeatureStore implementation """ @@ -86,12 +98,12 @@ def __init__(self, self.__poll_interval = max(poll_interval, 30) self.__use_ldd = use_ldd self.__feature_store = InMemoryFeatureStore() if not feature_store else feature_store - self.__event_consumer_class = EventConsumerImpl if not event_consumer_class else event_consumer_class + self.__event_processor_class = DefaultEventProcessor if not event_processor_class else event_processor_class self.__feature_requester_class = feature_requester_class self.__connect_timeout = connect_timeout self.__read_timeout = read_timeout - self.__events_upload_max_batch_size = events_upload_max_batch_size self.__events_max_pending = events_max_pending + self.__flush_interval = flush_interval self.__verify_ssl = verify_ssl self.__defaults = defaults if offline is True: @@ -100,6 +112,9 @@ def __init__(self, self.__private_attribute_names = private_attribute_names self.__all_attributes_private = all_attributes_private self.__offline = offline + self.__user_keys_capacity = user_keys_capacity + self.__user_keys_flush_interval = user_keys_flush_interval + self.__inline_users_in_events = inline_users_in_events @classmethod def default(cls): @@ -111,8 +126,8 @@ def copy_with_new_sdk_key(self, new_sdk_key): events_uri=self.__events_uri, connect_timeout=self.__connect_timeout, read_timeout=self.__read_timeout, - events_upload_max_batch_size=self.__events_upload_max_batch_size, events_max_pending=self.__events_max_pending, + flush_interval=self.__flush_interval, stream_uri=self.__stream_uri, stream=self.__stream, verify_ssl=self.__verify_ssl, @@ -123,10 +138,13 @@ def copy_with_new_sdk_key(self, new_sdk_key): use_ldd=self.__use_ldd, feature_store=self.__feature_store, feature_requester_class=self.__feature_requester_class, - event_consumer_class=self.__event_consumer_class, + event_processor_class=self.__event_processor_class, private_attribute_names=self.__private_attribute_names, all_attributes_private=self.__all_attributes_private, - offline=self.__offline) + offline=self.__offline, + user_keys_capacity=self.__user_keys_capacity, + user_keys_flush_interval=self.__user_keys_flush_interval, + inline_users_in_events=self.__inline_users_in_events) def get_default(self, key, default): return default if key not in self.__defaults else self.__defaults[key] @@ -176,8 +194,8 @@ def feature_store(self): return self.__feature_store @property - def event_consumer_class(self): - return self.__event_consumer_class + def event_processor_class(self): + return self.__event_processor_class @property def feature_requester_class(self): @@ -199,14 +217,14 @@ def events_enabled(self): def send_events(self): return self.__send_events - @property - def events_upload_max_batch_size(self): - return self.__events_upload_max_batch_size - @property def events_max_pending(self): return self.__events_max_pending + @property + def flush_interval(self): + return self.__flush_interval + @property def verify_ssl(self): return self.__verify_ssl @@ -223,6 +241,18 @@ def all_attributes_private(self): def offline(self): return self.__offline + @property + def user_keys_capacity(self): + return self.__user_keys_capacity + + @property + def user_keys_flush_interval(self): + return self.__user_keys_flush_interval + + @property + def inline_users_in_events(self): + return self.__inline_users_in_events + def _validate(self): if self.offline is False and self.sdk_key is None or self.sdk_key is '': log.warn("Missing or blank sdk_key.") diff --git a/ldclient/event_consumer.py b/ldclient/event_consumer.py deleted file mode 100644 index 4d8f671b..00000000 --- a/ldclient/event_consumer.py +++ /dev/null @@ -1,111 +0,0 @@ -from __future__ import absolute_import - -import errno -from threading import Thread - -import requests -from requests.packages.urllib3.exceptions import ProtocolError - -from ldclient.event_serializer import EventSerializer -from ldclient.interfaces import EventConsumer -from ldclient.util import _headers -from ldclient.util import log - - -class EventConsumerImpl(Thread, EventConsumer): - def __init__(self, event_queue, config): - Thread.__init__(self) - self._session = requests.Session() - self.daemon = True - self._config = config - self._queue = event_queue - self._serializer = EventSerializer(config) - self._running = True - - def run(self): - log.info("Starting event consumer") - self._running = True - while self._running: - try: - self.send() - except Exception: - log.warning( - 'Unhandled exception in event consumer') - - def stop(self): - self._running = False - - def flush(self): - self._queue.join() - - def send_batch(self, events): - def do_send(should_retry): - # noinspection PyBroadException - try: - json_body = self._serializer.serialize_events(events) - log.debug('Sending events payload: ' + json_body) - hdrs = _headers(self._config.sdk_key) - uri = self._config.events_uri - r = self._session.post(uri, - headers=hdrs, - timeout=(self._config.connect_timeout, self._config.read_timeout), - data=json_body) - if r.status_code == 401: - log.error('Received 401 error, no further events will be posted since SDK key is invalid') - self.stop() - return - r.raise_for_status() - except ProtocolError as e: - if e.args is not None and len(e.args) > 1 and e.args[1] is not None: - inner = e.args[1] - if inner.errno is not None and inner.errno == errno.ECONNRESET and should_retry: - log.warning( - 'ProtocolError exception caught while sending events. Retrying.') - do_send(False) - else: - log.warning( - 'Unhandled exception in event consumer. Analytics events were not processed.', - exc_info=True) - except: - log.warning( - 'Unhandled exception in event consumer. Analytics events were not processed.', - exc_info=True) - - try: - do_send(True) - finally: - for _ in events: - self._queue.task_done() - - def send(self): - events = self.next() - - if len(events) == 0: - return - else: - self.send_batch(events) - - def next(self): - q = self._queue - items = [] - - item = self.next_item() - if item is None: - return items - - items.append(item) - while len(items) < self._config.events_upload_max_batch_size and not q.empty(): - item = self.next_item() - if item: - items.append(item) - - return items - - def next_item(self): - q = self._queue - # noinspection PyBroadException - try: - item = q.get(block=True, timeout=5) - return item - except Exception: - return None diff --git a/ldclient/event_processor.py b/ldclient/event_processor.py new file mode 100644 index 00000000..1ef54f0a --- /dev/null +++ b/ldclient/event_processor.py @@ -0,0 +1,393 @@ +from __future__ import absolute_import + +from collections import namedtuple +from email.utils import parsedate +import errno +import jsonpickle +import pylru +from threading import Event, Lock, Thread +import time + +# noinspection PyBroadException +try: + import queue +except: + # noinspection PyUnresolvedReferences,PyPep8Naming + import Queue as queue + +import requests +from requests.packages.urllib3.exceptions import ProtocolError + +import six + +from ldclient.event_summarizer import EventSummarizer +from ldclient.fixed_thread_pool import FixedThreadPool +from ldclient.user_filter import UserFilter +from ldclient.interfaces import EventProcessor +from ldclient.repeating_timer import RepeatingTimer +from ldclient.util import _headers +from ldclient.util import log + + +__MAX_FLUSH_THREADS__ = 5 +__CURRENT_EVENT_SCHEMA__ = 3 + +class NullEventProcessor(EventProcessor): + def __init__(self): + pass + + def start(self): + pass + + def stop(self): + pass + + def is_alive(self): + return False + + def send_event(self, event): + pass + + def flush(self): + pass + + +EventProcessorMessage = namedtuple('EventProcessorMessage', ['type', 'param']) + + +class EventOutputFormatter(object): + def __init__(self, config): + self._inline_users = config.inline_users_in_events + self._user_filter = UserFilter(config) + + def make_output_events(self, events, summary): + events_out = [ self.make_output_event(e) for e in events ] + if len(summary.counters) > 0: + events_out.append(self.make_summary_event(summary)) + return events_out + + def make_output_event(self, e): + kind = e['kind'] + if kind == 'feature': + is_debug = e.get('debug') + out = { + 'kind': 'debug' if is_debug else 'feature', + 'creationDate': e['creationDate'], + 'key': e['key'], + 'version': e.get('version'), + 'variation': e.get('variation'), + 'value': e.get('value'), + 'default': e.get('default'), + 'prereqOf': e.get('prereqOf') + } + if self._inline_users or is_debug: + out['user'] = self._user_filter.filter_user_props(e['user']) + else: + out['userKey'] = e['user'].get('key') + return out + elif kind == 'identify': + return { + 'kind': 'identify', + 'creationDate': e['creationDate'], + 'key': e['user'].get('key'), + 'user': self._user_filter.filter_user_props(e['user']) + } + elif kind == 'custom': + out = { + 'kind': 'custom', + 'creationDate': e['creationDate'], + 'key': e['key'], + 'data': e.get('data') + } + if self._inline_users: + out['user'] = self._user_filter.filter_user_props(e['user']) + else: + out['userKey'] = e['user'].get('key') + return out + elif kind == 'index': + return { + 'kind': 'index', + 'creationDate': e['creationDate'], + 'user': self._user_filter.filter_user_props(e['user']) + } + else: + return e + + """ + Transform summarizer data into the format used for the event payload. + """ + def make_summary_event(self, summary): + flags_out = dict() + for ckey, cval in summary.counters.items(): + flag_key, variation, version = ckey + flag_data = flags_out.get(flag_key) + if flag_data is None: + flag_data = { 'default': cval['default'], 'counters': [] } + flags_out[flag_key] = flag_data + counter = { + 'count': cval['count'], + 'value': cval['value'] + } + if variation is not None: + counter['variation'] = variation + if version is None: + counter['unknown'] = True + else: + counter['version'] = version + flag_data['counters'].append(counter) + return { + 'kind': 'summary', + 'startDate': summary.start_date, + 'endDate': summary.end_date, + 'features': flags_out + } + + +class EventPayloadSendTask(object): + def __init__(self, session, config, formatter, payload, response_fn): + self._session = session + self._config = config + self._formatter = formatter + self._payload = payload + self._response_fn = response_fn + + def run(self): + try: + output_events = self._formatter.make_output_events(self._payload.events, self._payload.summary) + resp = self._do_send(output_events, True) + if resp is not None: + self._response_fn(resp) + except Exception: + log.warning( + 'Unhandled exception in event processor. Analytics events were not processed.', + exc_info=True) + + def _do_send(self, output_events, should_retry): + # noinspection PyBroadException + try: + json_body = jsonpickle.encode(output_events, unpicklable=False) + log.debug('Sending events payload: ' + json_body) + hdrs = _headers(self._config.sdk_key) + hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__) + uri = self._config.events_uri + r = self._session.post(uri, + headers=hdrs, + timeout=(self._config.connect_timeout, self._config.read_timeout), + data=json_body) + r.raise_for_status() + return r + except ProtocolError as e: + if e.args is not None and len(e.args) > 1 and e.args[1] is not None: + inner = e.args[1] + if inner.errno is not None and inner.errno == errno.ECONNRESET and should_retry: + log.warning( + 'ProtocolError exception caught while sending events. Retrying.') + self._do_send(output_events, False) + else: + log.warning( + 'Unhandled exception in event processor. Analytics events were not processed.', + exc_info=True) + except Exception: + log.warning( + 'Unhandled exception in event processor. Analytics events were not processed.', + exc_info=True) + + +FlushPayload = namedtuple('FlushPayload', ['events', 'summary']) + + +class EventBuffer(object): + def __init__(self, capacity): + self._capacity = capacity + self._events = [] + self._summarizer = EventSummarizer() + self._exceeded_capacity = False + + def add_event(self, event): + if len(self._events) >= self._capacity: + if not self._exceeded_capacity: + log.warning("Event queue is full-- dropped an event") + self._exceeded_capacity = True + else: + self._events.append(event) + self._exceeded_capacity = False + + def add_to_summary(self, event): + self._summarizer.summarize_event(event) + + def get_payload(self): + return FlushPayload(self._events, self._summarizer.snapshot()) + + def clear(self): + self._events = [] + self._summarizer.clear() + + +class EventDispatcher(object): + def __init__(self, queue, config, session): + self._queue = queue + self._config = config + self._session = requests.Session() if session is None else session + self._close_session = (session is None) # so we know whether to close it later + self._disabled = False + self._buffer = EventBuffer(config.events_max_pending) + self._user_keys = pylru.lrucache(config.user_keys_capacity) + self._formatter = EventOutputFormatter(config) + self._last_known_past_time = 0 + + self._flush_workers = FixedThreadPool(__MAX_FLUSH_THREADS__, "ldclient.flush") + + self._main_thread = Thread(target=self._run_main_loop) + self._main_thread.daemon = True + self._main_thread.start() + + def _run_main_loop(self): + log.info("Starting event processor") + while True: + try: + message = self._queue.get(block=True) + if message.type == 'event': + self._process_event(message.param) + elif message.type == 'flush': + self._trigger_flush() + elif message.type == 'flush_users': + self._user_keys.clear() + elif message.type == 'test_sync': + self._flush_workers.wait() + message.param.set() + elif message.type == 'stop': + self._do_shutdown() + message.param.set() + return + except Exception: + log.error('Unhandled exception in event processor', exc_info=True) + self._session.close() + + def _process_event(self, event): + if self._disabled: + return + + # Always record the event in the summarizer. + self._buffer.add_to_summary(event) + + # Decide whether to add the event to the payload. Feature events may be added twice, once for + # the event (if tracked) and once for debugging. + add_full_event = False + add_debug_event = False + add_index_event = False + if event['kind'] == "feature": + add_full_event = event['trackEvents'] + add_debug_event = self._should_debug_event(event) + else: + add_full_event = True + + # For each user we haven't seen before, we add an index event - unless this is already + # an identify event for that user. + if not (add_full_event and self._config.inline_users_in_events): + user = event.get('user') + if user and not self.notice_user(user): + if event['kind'] != 'identify': + add_index_event = True + + if add_index_event: + ie = { 'kind': 'index', 'creationDate': event['creationDate'], 'user': user } + self._buffer.add_event(ie) + if add_full_event: + self._buffer.add_event(event) + if add_debug_event: + debug_event = event.copy() + debug_event['debug'] = True + self._buffer.add_event(debug_event) + + # Add to the set of users we've noticed, and return true if the user was already known to us. + def notice_user(self, user): + if user is None or 'key' not in user: + return False + key = user['key'] + if key in self._user_keys: + self._user_keys[key] # refresh cache item + return True + self._user_keys[key] = True + return False + + def _should_debug_event(self, event): + debug_until = event.get('debugEventsUntilDate') + if debug_until is not None: + last_past = self._last_known_past_time + now = int(time.time() * 1000) + if debug_until > last_past and debug_until > now: + return True + return False + + def _trigger_flush(self): + if self._disabled: + return + payload = self._buffer.get_payload() + if len(payload.events) > 0 or len(payload.summary.counters) > 0: + task = EventPayloadSendTask(self._session, self._config, self._formatter, payload, + self._handle_response) + if self._flush_workers.execute(task.run): + # The events have been handed off to a flush worker; clear them from our buffer. + self._buffer.clear() + else: + # We're already at our limit of concurrent flushes; leave the events in the buffer. + pass + + def _handle_response(self, r): + server_date_str = r.headers.get('Date') + if server_date_str is not None: + server_date = parsedate(server_date_str) + if server_date is not None: + timestamp = int(time.mktime(server_date) * 1000) + self._last_known_past_time = timestamp + if r.status_code == 401: + log.error('Received 401 error, no further events will be posted since SDK key is invalid') + self._disabled = True + return + + def _do_shutdown(self): + self._flush_workers.stop() + self._flush_workers.wait() + if self._close_session: + self._session.close() + + +class DefaultEventProcessor(EventProcessor): + def __init__(self, config, session=None): + self._queue = queue.Queue(config.events_max_pending) + self._flush_timer = RepeatingTimer(config.flush_interval, self.flush) + self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users) + self._flush_timer.start() + self._users_flush_timer.start() + self._close_lock = Lock() + self._closed = False + EventDispatcher(self._queue, config, session) + + def send_event(self, event): + event['creationDate'] = int(time.time() * 1000) + self._queue.put(EventProcessorMessage('event', event)) + + def flush(self): + self._queue.put(EventProcessorMessage('flush', None)) + + def stop(self): + with self._close_lock: + if self._closed: + return + self._closed = True + self._flush_timer.stop() + self._users_flush_timer.stop() + self.flush() + self._post_message_and_wait('stop') + + def _flush_users(self): + self._queue.put(EventProcessorMessage('flush_users', None)) + + # Used only in tests + def _wait_until_inactive(self): + self._post_message_and_wait('test_sync') + + def _post_message_and_wait(self, type): + reply = Event() + self._queue.put(EventProcessorMessage(type, reply)) + reply.wait() diff --git a/ldclient/event_serializer.py b/ldclient/event_serializer.py deleted file mode 100644 index c833e80b..00000000 --- a/ldclient/event_serializer.py +++ /dev/null @@ -1,48 +0,0 @@ -import jsonpickle -import six - - -class EventSerializer: - IGNORE_ATTRS = frozenset(['key', 'custom', 'anonymous']) - ALLOWED_TOP_LEVEL_ATTRS = frozenset(['key', 'secondary', 'ip', 'country', 'email', - 'firstName', 'lastName', 'avatar', 'name', 'anonymous', 'custom']) - - def __init__(self, config): - self._private_attribute_names = config.private_attribute_names - self._all_attributes_private = config.all_attributes_private - - def serialize_events(self, events): - body = [events] if isinstance(events, dict) else events - filtered = [ self._filter_event(e) for e in body ] - return jsonpickle.encode(filtered, unpicklable=False) - - def _is_private_attr(self, name, user_private_attrs): - if name in EventSerializer.IGNORE_ATTRS: - return False - elif self._all_attributes_private: - return True - else: - return (name in self._private_attribute_names) or (name in user_private_attrs) - - def _filter_event(self, e): - def filter_user_props(user_props): - all_private_attrs = set() - user_private_attrs = user_props.get('privateAttributeNames', []) - - def filter_private_attrs(attrs, allowed_attrs = frozenset()): - for key, value in six.iteritems(attrs): - if (not allowed_attrs) or (key in allowed_attrs): - if self._is_private_attr(key, user_private_attrs): - all_private_attrs.add(key) - else: - yield key, value - - ret = dict(filter_private_attrs(user_props, EventSerializer.ALLOWED_TOP_LEVEL_ATTRS)) - if 'custom' in user_props: - ret['custom'] = dict(filter_private_attrs(user_props['custom'])) - - if all_private_attrs: - ret['privateAttrs'] = sorted(list(all_private_attrs)) # note, only sorting to make tests reliable - return ret - - return dict((key, filter_user_props(value) if key == 'user' else value) for (key, value) in six.iteritems(e)) diff --git a/ldclient/event_summarizer.py b/ldclient/event_summarizer.py new file mode 100644 index 00000000..abdafc7d --- /dev/null +++ b/ldclient/event_summarizer.py @@ -0,0 +1,40 @@ +from collections import namedtuple + + +EventSummary = namedtuple('EventSummary', ['start_date', 'end_date', 'counters']) + + +class EventSummarizer(object): + def __init__(self): + self.start_date = 0 + self.end_date = 0 + self.counters = dict() + + """ + Add this event to our counters, if it is a type of event we need to count. + """ + def summarize_event(self, event): + if event['kind'] == 'feature': + counter_key = (event['key'], event['variation'], event['version']) + counter_val = self.counters.get(counter_key) + if counter_val is None: + counter_val = { 'count': 1, 'value': event['value'], 'default': event['default'] } + self.counters[counter_key] = counter_val + else: + counter_val['count'] = counter_val['count'] + 1 + date = event['creationDate'] + if self.start_date == 0 or date < self.start_date: + self.start_date = date + if date > self.end_date: + self.end_date = date + + """ + Return the current summarized event data. + """ + def snapshot(self): + return EventSummary(start_date = self.start_date, end_date = self.end_date, counters = self.counters) + + def clear(self): + self.start_date = 0 + self.end_date = 0 + self.counters = dict() diff --git a/ldclient/expiringdict.py b/ldclient/expiringdict.py index 8823be19..4b244c21 100644 --- a/ldclient/expiringdict.py +++ b/ldclient/expiringdict.py @@ -23,11 +23,7 @@ import time from threading import RLock -try: - from collections import OrderedDict -except ImportError: - # Python < 2.7 - from ordereddict import OrderedDict +from collections import OrderedDict class ExpiringDict(OrderedDict): diff --git a/ldclient/fixed_thread_pool.py b/ldclient/fixed_thread_pool.py new file mode 100644 index 00000000..a3c769e4 --- /dev/null +++ b/ldclient/fixed_thread_pool.py @@ -0,0 +1,69 @@ +from threading import Event, Lock, Thread + +# noinspection PyBroadException +try: + import queue +except: + # noinspection PyUnresolvedReferences,PyPep8Naming + import Queue as queue + +from ldclient.util import log + +""" +A simple fixed-size thread pool that rejects jobs when its limit is reached. +""" +class FixedThreadPool(object): + def __init__(self, size, name): + self._size = size + self._lock = Lock() + self._busy_count = 0 + self._event = Event() + self._job_queue = queue.Queue() + for i in range(0, size): + thread = Thread(target = self._run_worker) + thread.name = "%s.%d" % (name, i + 1) + thread.daemon = True + thread.start() + + """ + Schedules a job for execution if there is an available worker thread, and returns + true if successful; returns false if all threads are busy. + """ + def execute(self, jobFn): + with self._lock: + if self._busy_count >= self._size: + return False + self._busy_count = self._busy_count + 1 + self._job_queue.put(jobFn) + return True + + """ + Waits until all currently busy worker threads have completed their jobs. + """ + def wait(self): + while True: + with self._lock: + if self._busy_count == 0: + return + self._event.clear() + self._event.wait() + + """ + Tells all the worker threads to terminate once all active jobs have completed. + """ + def stop(self): + for i in range(0, self._size): + self._job_queue.put('stop') + + def _run_worker(self): + while True: + item = self._job_queue.get(block = True) + if item is 'stop': + return + try: + item() + except Exception: + log.warning('Unhandled exception in worker thread', exc_info=True) + with self._lock: + self._busy_count = self._busy_count - 1 + self._event.set() diff --git a/ldclient/flag.py b/ldclient/flag.py index ccc994a3..7b0e9ed3 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -1,3 +1,4 @@ +from collections import namedtuple import hashlib import logging @@ -15,19 +16,25 @@ log = logging.getLogger(sys.modules[__name__].__name__) +EvalResult = namedtuple('EvalResult', ['variation', 'value', 'events']) + + def evaluate(flag, user, store): prereq_events = [] if flag.get('on', False): - value, prereq_events = _evaluate(flag, user, store) + variation, value, prereq_events = _evaluate(flag, user, store) if value is not None: - return value, prereq_events + return EvalResult(variation = variation, value = value, events = prereq_events) - return _get_off_variation(flag), prereq_events + off_var = flag.get('offVariation') + off_value = None if off_var is None else _get_variation(flag, off_var) + return EvalResult(variation = off_var, value = off_value, events = prereq_events) def _evaluate(flag, user, store, prereq_events=None): events = prereq_events or [] failed_prereq = None + prereq_var = None prereq_value = None for prereq in flag.get('prerequisites') or []: prereq_flag = store.get(FEATURES, prereq.get('key'), lambda x: x) @@ -36,22 +43,23 @@ def _evaluate(flag, user, store, prereq_events=None): failed_prereq = prereq break if prereq_flag.get('on', False) is True: - prereq_value, events = _evaluate(prereq_flag, user, store, events) - variation = _get_variation(prereq_flag, prereq.get('variation')) - if prereq_value is None or not prereq_value == variation: + prereq_var, prereq_value, events = _evaluate(prereq_flag, user, store, events) + if prereq_var is None or not prereq_var == prereq.get('variation'): failed_prereq = prereq else: failed_prereq = prereq - event = {'kind': 'feature', 'key': prereq.get('key'), 'user': user, - 'value': prereq_value, 'version': prereq_flag.get('version'), 'prereqOf': flag.get('key')} + event = {'kind': 'feature', 'key': prereq.get('key'), 'user': user, 'variation': prereq_var, + 'value': prereq_value, 'version': prereq_flag.get('version'), 'prereqOf': flag.get('key'), + 'trackEvents': prereq_flag.get('trackEvents'), + 'debugEventsUntilDate': prereq_flag.get('debugEventsUntilDate')} events.append(event) if failed_prereq is not None: - return None, events + return None, None, events index = _evaluate_index(flag, user, store) - return _get_variation(flag, index), events + return index, _get_variation(flag, index), events def _evaluate_index(feature, user, store): diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index af1caa86..39898408 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -113,16 +113,31 @@ def initialized(self): """ -class EventConsumer(BackgroundOperation): +class EventProcessor(object): """ - Consumes events from the client and sends them to LaunchDarkly + Buffers analytics events and sends them to LaunchDarkly """ __metaclass__ = ABCMeta + @abstractmethod + def send_event(self, event): + """ + Processes an event to be sent at some point. + """ + @abstractmethod def flush(self): """ - Flushes any outstanding events immediately. + Specifies that any buffered events should be sent as soon as possible, rather than waiting + for the next flush interval. This method is asynchronous, so events still may not be sent + until a later time. However, calling stop() will synchronously deliver any events that were + not yet delivered prior to shutting down. + """ + + @abstractmethod + def stop(self): + """ + Shuts down the event processor after first delivering all pending events. """ diff --git a/ldclient/polling.py b/ldclient/polling.py index 4b71f668..8efa5913 100644 --- a/ldclient/polling.py +++ b/ldclient/polling.py @@ -34,7 +34,7 @@ def run(self): log.error('Received 401 error, no further polling requests will be made since SDK key is invalid') self.stop() break - except: + except Exception: log.exception( 'Error: Exception encountered when updating flags.') diff --git a/ldclient/repeating_timer.py b/ldclient/repeating_timer.py new file mode 100644 index 00000000..a1e393ea --- /dev/null +++ b/ldclient/repeating_timer.py @@ -0,0 +1,16 @@ +from threading import Event, Thread + +class RepeatingTimer(Thread): + def __init__(self, interval, callable): + Thread.__init__(self) + self.daemon = True + self._interval = interval + self._action = callable + self._stop = Event() + + def run(self): + while not self._stop.wait(self._interval): + self._action() + + def stop(self): + self._stop.set() diff --git a/ldclient/user_filter.py b/ldclient/user_filter.py new file mode 100644 index 00000000..d48ab23f --- /dev/null +++ b/ldclient/user_filter.py @@ -0,0 +1,40 @@ +import jsonpickle +import six + + +class UserFilter: + IGNORE_ATTRS = frozenset(['key', 'custom', 'anonymous']) + ALLOWED_TOP_LEVEL_ATTRS = frozenset(['key', 'secondary', 'ip', 'country', 'email', + 'firstName', 'lastName', 'avatar', 'name', 'anonymous', 'custom']) + + def __init__(self, config): + self._private_attribute_names = config.private_attribute_names + self._all_attributes_private = config.all_attributes_private + + def _is_private_attr(self, name, user_private_attrs): + if name in UserFilter.IGNORE_ATTRS: + return False + elif self._all_attributes_private: + return True + else: + return (name in self._private_attribute_names) or (name in user_private_attrs) + + def filter_user_props(self, user_props): + all_private_attrs = set() + user_private_attrs = user_props.get('privateAttributeNames', []) + + def filter_private_attrs(attrs, allowed_attrs = frozenset()): + for key, value in six.iteritems(attrs): + if (not allowed_attrs) or (key in allowed_attrs): + if self._is_private_attr(key, user_private_attrs): + all_private_attrs.add(key) + else: + yield key, value + + ret = dict(filter_private_attrs(user_props, UserFilter.ALLOWED_TOP_LEVEL_ATTRS)) + if 'custom' in user_props: + ret['custom'] = dict(filter_private_attrs(user_props['custom'])) + + if all_private_attrs: + ret['privateAttrs'] = sorted(list(all_private_attrs)) # note, only sorting to make tests reliable + return ret diff --git a/python2.6-requirements.txt b/python2.6-requirements.txt deleted file mode 100644 index d73f64f0..00000000 --- a/python2.6-requirements.txt +++ /dev/null @@ -1 +0,0 @@ -ordereddict>=1.1 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index ebdbadf1..c74c7469 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ six>=1.10.0 pyRFC3339>=1.0 jsonpickle==0.9.3 semver>=2.7.9 +pylru>=1.0.9 diff --git a/setup.py b/setup.py index 9391437a..b2d19ced 100644 --- a/setup.py +++ b/setup.py @@ -17,14 +17,12 @@ def parse_requirements(filename): # parse_requirements() returns generator of pip.req.InstallRequirement objects install_reqs = parse_requirements('requirements.txt') -python26_reqs = parse_requirements('python2.6-requirements.txt') test_reqs = parse_requirements('test-requirements.txt') redis_reqs = parse_requirements('redis-requirements.txt') # reqs is a list of requirement # e.g. ['django==1.5.1', 'mezzanine==1.4.6'] reqs = [ir for ir in install_reqs] -python26reqs = [ir for ir in python26_reqs] testreqs = [ir for ir in test_reqs] redisreqs = [ir for ir in redis_reqs] @@ -59,17 +57,17 @@ def run(self): 'License :: OSI Approved :: Apache Software License', 'Operating System :: OS Independent', 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + 'Programming Language :: Python :: 3.6', 'Topic :: Software Development', 'Topic :: Software Development :: Libraries', ], extras_require={ - "redis": redisreqs, - "python2.6": python26reqs + "redis": redisreqs }, tests_require=testreqs, cmdclass={'test': PyTest}, diff --git a/testing/test_event_processor.py b/testing/test_event_processor.py new file mode 100644 index 00000000..bb307773 --- /dev/null +++ b/testing/test_event_processor.py @@ -0,0 +1,435 @@ +from email.utils import formatdate +import json +import pytest +from requests.structures import CaseInsensitiveDict +import time + +from ldclient.config import Config +from ldclient.event_processor import DefaultEventProcessor + +from ldclient.util import log + +default_config = Config() +user = { + 'key': 'userkey', + 'name': 'Red' +} +filtered_user = { + 'key': 'userkey', + 'privateAttrs': [ 'name' ] +} + +ep = None +mock_session = None + + +class MockResponse(object): + def __init__(self, status, headers): + self._status = status + self._headers = headers + + @property + def status_code(self): + return self._status + + @property + def headers(self): + return self._headers + + def raise_for_status(self): + pass + +class MockSession(object): + def __init__(self): + self._request_data = None + self._request_headers = None + self._response_status = 200 + self._server_time = None + + def post(self, uri, headers, timeout, data): + self._request_headers = headers + self._request_data = data + resp_hdr = CaseInsensitiveDict() + if self._server_time is not None: + resp_hdr['Date'] = formatdate(self._server_time / 1000, localtime=False, usegmt=True) + return MockResponse(self._response_status, resp_hdr) + + def close(self): + pass + + @property + def request_data(self): + return self._request_data + + @property + def request_headers(self): + return self._request_headers + + def set_response_status(self, status): + self._response_status = status + + def set_server_time(self, timestamp): + self._server_time = timestamp + + def clear(self): + self._request_headers = None + self._request_data = None + + +def setup_function(): + global mock_session + mock_session = MockSession() + +def teardown_function(): + if ep is not None: + ep.stop() + +def setup_processor(config): + global ep + ep = DefaultEventProcessor(config, mock_session) + + +def test_identify_event_is_queued(): + setup_processor(Config()) + + e = { 'kind': 'identify', 'user': user } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 1 + assert output == [{ + 'kind': 'identify', + 'creationDate': e['creationDate'], + 'key': user['key'], + 'user': user + }] + +def test_user_is_filtered_in_identify_event(): + setup_processor(Config(all_attributes_private = True)) + + e = { 'kind': 'identify', 'user': user } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 1 + assert output == [{ + 'kind': 'identify', + 'creationDate': e['creationDate'], + 'key': user['key'], + 'user': filtered_user + }] + +def test_individual_feature_event_is_queued_with_index_event(): + setup_processor(Config()) + + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', 'trackEvents': True + } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 3 + check_index_event(output[0], e, user) + check_feature_event(output[1], e, False, None) + check_summary_event(output[2]) + +def test_user_is_filtered_in_index_event(): + setup_processor(Config(all_attributes_private = True)) + + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', 'trackEvents': True + } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 3 + check_index_event(output[0], e, filtered_user) + check_feature_event(output[1], e, False, None) + check_summary_event(output[2]) + +def test_feature_event_can_contain_inline_user(): + setup_processor(Config(inline_users_in_events = True)) + + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', 'trackEvents': True + } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 2 + check_feature_event(output[0], e, False, user) + check_summary_event(output[1]) + +def test_user_is_filtered_in_feature_event(): + setup_processor(Config(inline_users_in_events = True, all_attributes_private = True)) + + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', 'trackEvents': True + } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 2 + check_feature_event(output[0], e, False, filtered_user) + check_summary_event(output[1]) + +def test_index_event_is_still_generated_if_inline_users_is_true_but_feature_event_is_not_tracked(): + setup_processor(Config(inline_users_in_events = True)) + + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', 'trackEvents': False + } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 2 + check_index_event(output[0], e, user) + check_summary_event(output[1]) + +def test_event_kind_is_debug_if_flag_is_temporarily_in_debug_mode(): + setup_processor(Config()) + + future_time = now() + 100000 + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', + 'trackEvents': False, 'debugEventsUntilDate': future_time + } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 3 + check_index_event(output[0], e, user) + check_feature_event(output[1], e, True, user) + check_summary_event(output[2]) + +def test_event_can_be_both_tracked_and_debugged(): + setup_processor(Config()) + + future_time = now() + 100000 + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', + 'trackEvents': True, 'debugEventsUntilDate': future_time + } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 4 + check_index_event(output[0], e, user) + check_feature_event(output[1], e, False, None) + check_feature_event(output[2], e, True, user) + check_summary_event(output[3]) + +def test_debug_mode_expires_based_on_client_time_if_client_time_is_later_than_server_time(): + setup_processor(Config()) + + # Pick a server time that is somewhat behind the client time + server_time = now() - 20000 + + # Send and flush an event we don't care about, just to set the last server time + mock_session.set_server_time(server_time) + ep.send_event({ 'kind': 'identify', 'user': { 'key': 'otherUser' }}) + flush_and_get_events() + + # Now send an event with debug mode on, with a "debug until" time that is further in + # the future than the server time, but in the past compared to the client. + debug_until = server_time + 1000 + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', + 'trackEvents': False, 'debugEventsUntilDate': debug_until + } + ep.send_event(e) + + # Should get a summary event only, not a full feature event + output = flush_and_get_events() + assert len(output) == 2 + check_index_event(output[0], e, user) + check_summary_event(output[1]) + +def test_debug_mode_expires_based_on_server_time_if_server_time_is_later_than_client_time(): + setup_processor(Config()) + + # Pick a server time that is somewhat ahead of the client time + server_time = now() + 20000 + + # Send and flush an event we don't care about, just to set the last server time + mock_session.set_server_time(server_time) + ep.send_event({ 'kind': 'identify', 'user': { 'key': 'otherUser' }}) + flush_and_get_events() + + # Now send an event with debug mode on, with a "debug until" time that is further in + # the future than the client time, but in the past compared to the server. + debug_until = server_time - 1000 + e = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value', 'default': 'default', + 'trackEvents': False, 'debugEventsUntilDate': debug_until + } + ep.send_event(e) + + # Should get a summary event only, not a full feature event + output = flush_and_get_events() + assert len(output) == 2 + check_index_event(output[0], e, user) + check_summary_event(output[1]) + +def test_two_feature_events_for_same_user_generate_only_one_index_event(): + setup_processor(Config()) + + e1 = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value1', 'default': 'default', 'trackEvents': False + } + e2 = { + 'kind': 'feature', 'key': 'flagkey', 'version': 11, 'user': user, + 'variation': 2, 'value': 'value2', 'default': 'default', 'trackEvents': False + } + ep.send_event(e1) + ep.send_event(e2) + + output = flush_and_get_events() + assert len(output) == 2 + check_index_event(output[0], e1, user) + check_summary_event(output[1]) + +def test_nontracked_events_are_summarized(): + setup_processor(Config()) + + e1 = { + 'kind': 'feature', 'key': 'flagkey1', 'version': 11, 'user': user, + 'variation': 1, 'value': 'value1', 'default': 'default1', 'trackEvents': False + } + e2 = { + 'kind': 'feature', 'key': 'flagkey2', 'version': 22, 'user': user, + 'variation': 2, 'value': 'value2', 'default': 'default2', 'trackEvents': False + } + ep.send_event(e1) + ep.send_event(e2) + + output = flush_and_get_events() + assert len(output) == 2 + check_index_event(output[0], e1, user) + se = output[1] + assert se['kind'] == 'summary' + assert se['startDate'] == e1['creationDate'] + assert se['endDate'] == e2['creationDate'] + assert se['features'] == { + 'flagkey1': { + 'default': 'default1', + 'counters': [ { 'version': 11, 'variation': 1, 'value': 'value1', 'count': 1 } ] + }, + 'flagkey2': { + 'default': 'default2', + 'counters': [ { 'version': 22, 'variation': 2, 'value': 'value2', 'count': 1 } ] + } + } + +def test_custom_event_is_queued_with_user(): + setup_processor(Config()) + + e = { 'kind': 'custom', 'key': 'eventkey', 'user': user, 'data': { 'thing': 'stuff '} } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 2 + check_index_event(output[0], e, user) + check_custom_event(output[1], e, None) + +def test_custom_event_can_contain_inline_user(): + setup_processor(Config(inline_users_in_events = True)) + + e = { 'kind': 'custom', 'key': 'eventkey', 'user': user, 'data': { 'thing': 'stuff '} } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 1 + check_custom_event(output[0], e, user) + +def test_user_is_filtered_in_custom_event(): + setup_processor(Config(inline_users_in_events = True, all_attributes_private = True)) + + e = { 'kind': 'custom', 'key': 'eventkey', 'user': user, 'data': { 'thing': 'stuff '} } + ep.send_event(e) + + output = flush_and_get_events() + assert len(output) == 1 + check_custom_event(output[0], e, filtered_user) + +def test_nothing_is_sent_if_there_are_no_events(): + setup_processor(Config()) + ep.flush() + ep._wait_until_inactive() + assert mock_session.request_data is None + +def test_sdk_key_is_sent(): + setup_processor(Config(sdk_key = 'SDK_KEY')) + + ep.send_event({ 'kind': 'identify', 'user': user }) + ep.flush() + ep._wait_until_inactive() + + assert mock_session.request_headers.get('Authorization') is 'SDK_KEY' + +def test_no_more_payloads_are_sent_after_401_error(): + setup_processor(Config(sdk_key = 'SDK_KEY')) + + mock_session.set_response_status(401) + ep.send_event({ 'kind': 'identify', 'user': user }) + ep.flush() + ep._wait_until_inactive() + mock_session.clear() + + ep.send_event({ 'kind': 'identify', 'user': user }) + ep.flush() + ep._wait_until_inactive() + assert mock_session.request_data is None + + +def flush_and_get_events(): + ep.flush() + ep._wait_until_inactive() + if mock_session.request_data is None: + raise AssertionError('Expected to get an HTTP request but did not get one') + else: + return json.loads(mock_session.request_data) + +def check_index_event(data, source, user): + assert data['kind'] == 'index' + assert data['creationDate'] == source['creationDate'] + assert data['user'] == user + +def check_feature_event(data, source, debug, inline_user): + assert data['kind'] == ('debug' if debug else 'feature') + assert data['creationDate'] == source['creationDate'] + assert data['key'] == source['key'] + assert data.get('version') == source.get('version') + assert data.get('variation') == source.get('variation') + assert data.get('value') == source.get('value') + assert data.get('default') == source.get('default') + if inline_user is None: + assert data['userKey'] == source['user']['key'] + else: + assert data['user'] == inline_user + +def check_custom_event(data, source, inline_user): + assert data['kind'] == 'custom' + assert data['creationDate'] == source['creationDate'] + assert data['key'] == source['key'] + assert data['data'] == source['data'] + if inline_user is None: + assert data['userKey'] == source['user']['key'] + else: + assert data['user'] == inline_user + +def check_summary_event(data): + assert data['kind'] == 'summary' + +def now(): + return int(time.time() * 1000) diff --git a/testing/test_event_serializer.py b/testing/test_event_serializer.py deleted file mode 100644 index fd84ecac..00000000 --- a/testing/test_event_serializer.py +++ /dev/null @@ -1,148 +0,0 @@ -from builtins import object -import json -from ldclient.client import Config -from ldclient.event_serializer import EventSerializer - - -base_config = Config() -config_with_all_attrs_private = Config(all_attributes_private = True) -config_with_some_attrs_private = Config(private_attribute_names=[u'firstName', u'bizzle']) - -# users to serialize - -user = { - u'key': u'abc', - u'firstName': u'Sue', - u'custom': { - u'bizzle': u'def', - u'dizzle': u'ghi' - } -} - -user_specifying_own_private_attr = { - u'key': u'abc', - u'firstName': u'Sue', - u'custom': { - u'bizzle': u'def', - u'dizzle': u'ghi' - }, - u'privateAttributeNames': [ u'dizzle', u'unused' ] -} - -user_with_unknown_top_level_attrs = { - u'key': u'abc', - u'firstName': u'Sue', - u'species': u'human', - u'hatSize': 6, - u'custom': { - u'bizzle': u'def', - u'dizzle': u'ghi' - } -} - -anon_user = { - u'key': u'abc', - u'anonymous': True, - u'custom': { - u'bizzle': u'def', - u'dizzle': u'ghi' - } -} - -# expected results from serializing user - -user_with_all_attrs_hidden = { - u'key': u'abc', - u'custom': { }, - u'privateAttrs': [ u'bizzle', u'dizzle', u'firstName' ] -} - -user_with_some_attrs_hidden = { - u'key': u'abc', - u'custom': { - u'dizzle': u'ghi' - }, - u'privateAttrs': [ u'bizzle', u'firstName' ] -} - -user_with_own_specified_attr_hidden = { - u'key': u'abc', - u'firstName': u'Sue', - u'custom': { - u'bizzle': u'def' - }, - u'privateAttrs': [ u'dizzle' ] -} - -anon_user_with_all_attrs_hidden = { - u'key': u'abc', - u'anonymous': True, - u'custom': { }, - u'privateAttrs': [ u'bizzle', u'dizzle' ] -} - -def make_event(u, key = u'xyz'): - return { - u'creationDate': 1000000, - u'key': key, - u'kind': u'thing', - u'user': u - } - - -def test_all_user_attrs_serialized(): - es = EventSerializer(base_config) - event = make_event(user) - j = es.serialize_events(event) - assert json.loads(j) == [event] - -def test_all_user_attrs_private(): - es = EventSerializer(config_with_all_attrs_private) - event = make_event(user) - filtered_event = make_event(user_with_all_attrs_hidden) - j = es.serialize_events(event) - assert json.loads(j) == [filtered_event] - -def test_some_user_attrs_private(): - es = EventSerializer(config_with_some_attrs_private) - event = make_event(user) - filtered_event = make_event(user_with_some_attrs_hidden) - j = es.serialize_events(event) - assert json.loads(j) == [filtered_event] - -def test_per_user_private_attr(): - es = EventSerializer(base_config) - event = make_event(user_specifying_own_private_attr) - filtered_event = make_event(user_with_own_specified_attr_hidden) - j = es.serialize_events(event) - assert json.loads(j) == [filtered_event] - -def test_per_user_private_attr_plus_global_private_attrs(): - es = EventSerializer(config_with_some_attrs_private) - event = make_event(user_specifying_own_private_attr) - filtered_event = make_event(user_with_all_attrs_hidden) - j = es.serialize_events(event) - assert json.loads(j) == [filtered_event] - -def test_all_events_serialized(): - es = EventSerializer(config_with_all_attrs_private) - event0 = make_event(user, 'key0') - event1 = make_event(user, 'key1') - filtered0 = make_event(user_with_all_attrs_hidden, 'key0') - filtered1 = make_event(user_with_all_attrs_hidden, 'key1') - j = es.serialize_events([event0, event1]) - assert json.loads(j) == [filtered0, filtered1] - -def test_unknown_top_level_attrs_stripped(): - es = EventSerializer(base_config) - event = make_event(user_with_unknown_top_level_attrs) - filtered_event = make_event(user) - j = es.serialize_events(event) - assert json.loads(j) == [filtered_event] - -def test_leave_anonymous_attr_as_is(): - es = EventSerializer(config_with_all_attrs_private) - event = make_event(anon_user) - filtered_event = make_event(anon_user_with_all_attrs_hidden) - j = es.serialize_events(event) - assert json.loads(j) == [filtered_event] diff --git a/testing/test_event_summarizer.py b/testing/test_event_summarizer.py new file mode 100644 index 00000000..ae411aaf --- /dev/null +++ b/testing/test_event_summarizer.py @@ -0,0 +1,63 @@ +import pytest + +from ldclient.event_summarizer import EventSummarizer + + +user = { 'key': 'user1' } + +def test_summarize_event_does_nothing_for_identify_event(): + es = EventSummarizer() + snapshot = es.snapshot() + es.summarize_event({ 'kind': 'identify', 'creationDate': 1000, 'user': user }) + + assert es.snapshot() == snapshot + +def test_summarize_event_does_nothing_for_custom_event(): + es = EventSummarizer() + snapshot = es.snapshot() + es.summarize_event({ 'kind': 'custom', 'creationDate': 1000, 'key': 'eventkey', 'user': user }) + + assert es.snapshot() == snapshot + +def test_summarize_event_sets_start_and_end_dates(): + es = EventSummarizer() + event1 = { 'kind': 'feature', 'creationDate': 2000, 'key': 'flag', 'user': user, + 'version': 1, 'variation': 0, 'value': '', 'default': None } + event2 = { 'kind': 'feature', 'creationDate': 1000, 'key': 'flag', 'user': user, + 'version': 1, 'variation': 0, 'value': '', 'default': None } + event3 = { 'kind': 'feature', 'creationDate': 1500, 'key': 'flag', 'user': user, + 'version': 1, 'variation': 0, 'value': '', 'default': None } + es.summarize_event(event1) + es.summarize_event(event2) + es.summarize_event(event3) + data = es.snapshot() + + assert data.start_date == 1000 + assert data.end_date == 2000 + +def test_summarize_event_increments_counters(): + es = EventSummarizer() + event1 = { 'kind': 'feature', 'creationDate': 1000, 'key': 'flag1', 'user': user, + 'version': 11, 'variation': 1, 'value': 'value1', 'default': 'default1' } + event2 = { 'kind': 'feature', 'creationDate': 1000, 'key': 'flag1', 'user': user, + 'version': 11, 'variation': 2, 'value': 'value2', 'default': 'default1' } + event3 = { 'kind': 'feature', 'creationDate': 1000, 'key': 'flag2', 'user': user, + 'version': 22, 'variation': 1, 'value': 'value99', 'default': 'default2' } + event4 = { 'kind': 'feature', 'creationDate': 1000, 'key': 'flag1', 'user': user, + 'version': 11, 'variation': 1, 'value': 'value1', 'default': 'default1' } + event5 = { 'kind': 'feature', 'creationDate': 1000, 'key': 'badkey', 'user': user, + 'version': None, 'variation': None, 'value': 'default3', 'default': 'default3' } + es.summarize_event(event1) + es.summarize_event(event2) + es.summarize_event(event3) + es.summarize_event(event4) + es.summarize_event(event5) + data = es.snapshot() + + expected = { + ('flag1', 1, 11): { 'count': 2, 'value': 'value1', 'default': 'default1' }, + ('flag1', 2, 11): { 'count': 1, 'value': 'value2', 'default': 'default1' }, + ('flag2', 1, 22): { 'count': 1, 'value': 'value99', 'default': 'default2' }, + ('badkey', None, None): { 'count': 1, 'value': 'default3', 'default': 'default3' } + } + assert data.counters == expected diff --git a/testing/test_flag.py b/testing/test_flag.py index 8b9740aa..29d2bb61 100644 --- a/testing/test_flag.py +++ b/testing/test_flag.py @@ -1,6 +1,6 @@ import pytest from ldclient.feature_store import InMemoryFeatureStore -from ldclient.flag import _bucket_user, evaluate +from ldclient.flag import EvalResult, _bucket_user, evaluate from ldclient.versioned_data_kind import FEATURES, SEGMENTS @@ -16,7 +16,7 @@ def test_flag_returns_off_variation_if_flag_is_off(): 'variations': ['a', 'b', 'c'] } user = { 'key': 'x' } - assert evaluate(flag, user, empty_store) == ('b', []) + assert evaluate(flag, user, empty_store) == EvalResult(1, 'b', []) def test_flag_returns_none_if_flag_is_off_and_off_variation_is_unspecified(): flag = { @@ -26,7 +26,7 @@ def test_flag_returns_none_if_flag_is_off_and_off_variation_is_unspecified(): 'variations': ['a', 'b', 'c'] } user = { 'key': 'x' } - assert evaluate(flag, user, empty_store) == (None, []) + assert evaluate(flag, user, empty_store) == EvalResult(None, None, []) def test_flag_returns_off_variation_if_prerequisite_not_found(): flag = { @@ -38,7 +38,7 @@ def test_flag_returns_off_variation_if_prerequisite_not_found(): 'variations': ['a', 'b', 'c'] } user = { 'key': 'x' } - assert evaluate(flag, user, empty_store) == ('b', []) + assert evaluate(flag, user, empty_store) == EvalResult(1, 'b', []) def test_flag_returns_off_variation_and_event_if_prerequisite_is_not_met(): store = InMemoryFeatureStore() @@ -56,13 +56,14 @@ def test_flag_returns_off_variation_and_event_if_prerequisite_is_not_met(): 'on': True, 'fallthrough': { 'variation': 0 }, 'variations': ['d', 'e'], - 'version': 2 + 'version': 2, + 'trackEvents': False } store.upsert(FEATURES, flag1) user = { 'key': 'x' } - events_should_be = [{'kind': 'feature', 'key': 'feature1', 'value': 'd', 'version': 2, - 'user': user, 'prereqOf': 'feature0'}] - assert evaluate(flag, user, store) == ('b', events_should_be) + events_should_be = [{'kind': 'feature', 'key': 'feature1', 'variation': 0, 'value': 'd', + 'version': 2, 'user': user, 'prereqOf': 'feature0', 'trackEvents': False, 'debugEventsUntilDate': None}] + assert evaluate(flag, user, store) == EvalResult(1, 'b', events_should_be) def test_flag_returns_fallthrough_and_event_if_prereq_is_met_and_there_are_no_rules(): store = InMemoryFeatureStore() @@ -80,13 +81,14 @@ def test_flag_returns_fallthrough_and_event_if_prereq_is_met_and_there_are_no_ru 'on': True, 'fallthrough': { 'variation': 1 }, 'variations': ['d', 'e'], - 'version': 2 + 'version': 2, + 'trackEvents': False } store.upsert(FEATURES, flag1) user = { 'key': 'x' } - events_should_be = [{'kind': 'feature', 'key': 'feature1', 'value': 'e', 'version': 2, - 'user': user, 'prereqOf': 'feature0'}] - assert evaluate(flag, user, store) == ('a', events_should_be) + events_should_be = [{'kind': 'feature', 'key': 'feature1', 'variation': 1, 'value': 'e', + 'version': 2, 'user': user, 'prereqOf': 'feature0', 'trackEvents': False, 'debugEventsUntilDate': None}] + assert evaluate(flag, user, store) == EvalResult(0, 'a', events_should_be) def test_flag_matches_user_from_targets(): flag = { @@ -98,7 +100,7 @@ def test_flag_matches_user_from_targets(): 'variations': ['a', 'b', 'c'] } user = { 'key': 'userkey' } - assert evaluate(flag, user, empty_store) == ('c', []) + assert evaluate(flag, user, empty_store) == EvalResult(2, 'c', []) def test_flag_matches_user_from_rules(): flag = { @@ -121,7 +123,7 @@ def test_flag_matches_user_from_rules(): 'variations': ['a', 'b', 'c'] } user = { 'key': 'userkey' } - assert evaluate(flag, user, empty_store) == ('c', []) + assert evaluate(flag, user, empty_store) == EvalResult(2, 'c', []) def test_segment_match_clause_retrieves_segment_from_store(): store = InMemoryFeatureStore() @@ -152,7 +154,7 @@ def test_segment_match_clause_retrieves_segment_from_store(): ] } - assert evaluate(flag, user, store) == (True, []) + assert evaluate(flag, user, store) == EvalResult(1, True, []) def test_segment_match_clause_falls_through_with_no_errors_if_segment_not_found(): user = { "key": "foo" } @@ -175,7 +177,7 @@ def test_segment_match_clause_falls_through_with_no_errors_if_segment_not_found( ] } - assert evaluate(flag, user, empty_store) == (False, []) + assert evaluate(flag, user, empty_store) == EvalResult(0, False, []) def test_clause_matches_builtin_attribute(): clause = { @@ -185,7 +187,7 @@ def test_clause_matches_builtin_attribute(): } user = { 'key': 'x', 'name': 'Bob' } flag = _make_bool_flag_from_clause(clause) - assert evaluate(flag, user, empty_store) == (True, []) + assert evaluate(flag, user, empty_store) == EvalResult(1, True, []) def test_clause_matches_custom_attribute(): clause = { @@ -195,7 +197,7 @@ def test_clause_matches_custom_attribute(): } user = { 'key': 'x', 'name': 'Bob', 'custom': { 'legs': 4 } } flag = _make_bool_flag_from_clause(clause) - assert evaluate(flag, user, empty_store) == (True, []) + assert evaluate(flag, user, empty_store) == EvalResult(1, True, []) def test_clause_returns_false_for_missing_attribute(): clause = { @@ -205,7 +207,7 @@ def test_clause_returns_false_for_missing_attribute(): } user = { 'key': 'x', 'name': 'Bob' } flag = _make_bool_flag_from_clause(clause) - assert evaluate(flag, user, empty_store) == (False, []) + assert evaluate(flag, user, empty_store) == EvalResult(0, False, []) def test_clause_can_be_negated(): clause = { @@ -216,7 +218,7 @@ def test_clause_can_be_negated(): } user = { 'key': 'x', 'name': 'Bob' } flag = _make_bool_flag_from_clause(clause) - assert evaluate(flag, user, empty_store) == (False, []) + assert evaluate(flag, user, empty_store) == EvalResult(0, False, []) def _make_bool_flag_from_clause(clause): diff --git a/testing/test_ldclient.py b/testing/test_ldclient.py index 8ed0dbe3..b05a0057 100644 --- a/testing/test_ldclient.py +++ b/testing/test_ldclient.py @@ -1,7 +1,9 @@ from builtins import object from ldclient.client import LDClient, Config +from ldclient.event_processor import NullEventProcessor from ldclient.feature_store import InMemoryFeatureStore from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor +from ldclient.versioned_data_kind import FEATURES import pytest from testing.sync_util import wait_until @@ -11,71 +13,53 @@ import Queue as queue -class MockUpdateProcessor(UpdateProcessor): - def __init__(self, config, requestor, store, ready): - ready.set() - - def start(self): - pass +class MockEventProcessor(object): + def __init__(self, *_): + self._running = False + self._events = [] + mock_event_processor = self def stop(self): - pass + self._running = False + + def start(self): + self._running = True def is_alive(self): - return True + return self._running + def send_event(self, event): + self._events.append(event) -class MockFeatureStore(FeatureStore): - def delete(self, key, version): + def flush(self): pass - @property - def initialized(self): - pass - def init(self, features): - pass +class MockUpdateProcessor(UpdateProcessor): + def __init__(self, config, store, ready): + ready.set() - def all(self): + def start(self): pass - def upsert(self, key, feature): + def stop(self): pass - def __init__(self, *_): - pass + def is_alive(self): + return True + + def initialized(self): + return True + - def get(self, key): - if key == "feature.key": - return { - u'key': u'feature.key', - u'salt': u'abc', - u'on': True, - u'variations': [ - { - u'value': True, - u'weight': 100, - u'targets': [] - }, - { - u'value': False, - u'weight': 0, - u'targets': [] - } - ] - } - else: - return None - - -client = LDClient(config=Config(base_uri="http://localhost:3000", feature_store=MockFeatureStore(), - update_processor_class=MockUpdateProcessor)) +client = LDClient(config=Config(base_uri="http://localhost:3000", + event_processor_class = MockEventProcessor, update_processor_class = MockUpdateProcessor)) offline_client = LDClient(config= - Config(sdk_key="secret", base_uri="http://localhost:3000", feature_store=MockFeatureStore(), + Config(sdk_key="secret", base_uri="http://localhost:3000", offline=True)) no_send_events_client = LDClient(config= - Config(sdk_key="secret", base_uri="http://localhost:3000", feature_store=MockFeatureStore(), - send_events=False, update_processor_class=MockUpdateProcessor)) + Config(sdk_key="secret", base_uri="http://localhost:3000", + update_processor_class = MockUpdateProcessor, send_events=False)) user = { u'key': u'xyz', @@ -94,39 +78,6 @@ def get(self, key): } -class MockConsumer(object): - def __init__(self, *_): - self._running = False - - def stop(self): - self._running = False - - def start(self): - self._running = True - - def is_alive(self): - return self._running - - def flush(self): - pass - - -class MockFeatureRequester(FeatureRequester): - def __init__(self, *_): - pass - - def get_all(self): - pass - - -def mock_consumer(): - return MockConsumer() - - -def noop_consumer(): - return - - def setup_function(function): global numeric_key_user numeric_key_user = { @@ -135,13 +86,18 @@ def setup_function(function): u'bizzle': u'def' } } - client._queue = queue.Queue(10) - client._event_consumer = mock_consumer() -def wait_for_event(c, cb): - e = c._queue.get(False) - return cb(e) +def make_client(store): + return LDClient(config=Config(sdk_key = 'SDK_KEY', + base_uri="http://localhost:3000", + event_processor_class=MockEventProcessor, + update_processor_class=MockUpdateProcessor, + feature_store=store)) + + +def get_first_event(c): + return c._event_processor._events.pop(0) def test_ctor_both_sdk_keys_set(): @@ -150,6 +106,14 @@ def test_ctor_both_sdk_keys_set(): LDClient(sdk_key="sdk key b", config=config) +def test_client_has_null_event_processor_if_offline(): + assert isinstance(offline_client._event_processor, NullEventProcessor) + + +def test_client_has_null_event_processor_if_send_events_off(): + assert isinstance(no_send_events_client._event_processor, NullEventProcessor) + + def test_toggle_offline(): assert offline_client.variation('feature.key', user, default=None) is None @@ -159,140 +123,181 @@ def test_sanitize_user(): assert numeric_key_user == sanitized_numeric_key_user -def test_toggle_event_offline(): - offline_client.variation('feature.key', user, default=None) - assert offline_client._queue.empty() - - -def test_toggle_event_with_send_events_off(): - no_send_events_client.variation('feature.key', user, default=None) - assert no_send_events_client._queue.empty() - - def test_identify(): client.identify(user) - def expected_event(e): - return e['kind'] == 'identify' and e['key'] == u'xyz' and e['user'] == user - - assert expected_event(client._queue.get(False)) + e = get_first_event(client) + assert e['kind'] == 'identify' and e['key'] == u'xyz' and e['user'] == user def test_identify_numeric_key_user(): client.identify(numeric_key_user) - def expected_event(e): - return e['kind'] == 'identify' and e['key'] == '33' and e['user'] == sanitized_numeric_key_user - - assert expected_event(client._queue.get(False)) - - -def test_identify_offline(): - offline_client.identify(numeric_key_user) - assert offline_client._queue.empty() - - -def test_identify_with_send_events_off(): - no_send_events_client.identify(numeric_key_user) - assert no_send_events_client._queue.empty() + e = get_first_event(client) + assert e['kind'] == 'identify' and e['key'] == '33' and e['user'] == sanitized_numeric_key_user def test_track(): client.track('my_event', user, 42) - def expected_event(e): - return e['kind'] == 'custom' and e['key'] == 'my_event' and e['user'] == user and e['data'] == 42 - - assert expected_event(client._queue.get(False)) + e = get_first_event(client) + assert e['kind'] == 'custom' and e['key'] == 'my_event' and e['user'] == user and e['data'] == 42 def test_track_numeric_key_user(): client.track('my_event', numeric_key_user, 42) - def expected_event(e): - return e['kind'] == 'custom' and e['key'] == 'my_event' and e['user'] == sanitized_numeric_key_user \ - and e['data'] == 42 - - assert expected_event(client._queue.get(False)) - - -def test_track_offline(): - offline_client.track('my_event', user, 42) - assert offline_client._queue.empty() - - -def test_track_with_send_events_off(): - no_send_events_client.track('my_event', user, 42) - assert no_send_events_client._queue.empty() + e = get_first_event(client) + assert e['kind'] == 'custom' and e['key'] == 'my_event' and e['user'] == sanitized_numeric_key_user \ + and e['data'] == 42 def test_defaults(): - client = LDClient(config=Config(base_uri="http://localhost:3000", - defaults={"foo": "bar"}, - offline=True)) - assert "bar" == client.variation('foo', user, default=None) + my_client = LDClient(config=Config(base_uri="http://localhost:3000", + defaults={"foo": "bar"}, + offline=True)) + assert "bar" == my_client.variation('foo', user, default=None) def test_defaults_and_online(): expected = "bar" my_client = LDClient(config=Config(base_uri="http://localhost:3000", defaults={"foo": expected}, - event_consumer_class=MockConsumer, - feature_requester_class=MockFeatureRequester, + event_processor_class=MockEventProcessor, update_processor_class=MockUpdateProcessor, feature_store=InMemoryFeatureStore())) actual = my_client.variation('foo', user, default="originalDefault") assert actual == expected - assert wait_for_event(my_client, lambda e: e['kind'] == 'feature' and e['key'] == u'foo' and e['user'] == user) + e = get_first_event(my_client) + assert e['kind'] == 'feature' and e['key'] == u'foo' and e['user'] == user def test_defaults_and_online_no_default(): - client = LDClient(config=Config(base_uri="http://localhost:3000", - defaults={"foo": "bar"}, - event_consumer_class=MockConsumer, - update_processor_class=MockUpdateProcessor, - feature_requester_class=MockFeatureRequester)) - assert "jim" == client.variation('baz', user, default="jim") - assert wait_for_event(client, lambda e: e['kind'] == 'feature' and e['key'] == u'baz' and e['user'] == user) + my_client = LDClient(config=Config(base_uri="http://localhost:3000", + defaults={"foo": "bar"}, + event_processor_class=MockEventProcessor, + update_processor_class=MockUpdateProcessor)) + assert "jim" == my_client.variation('baz', user, default="jim") + e = get_first_event(my_client) + assert e['kind'] == 'feature' and e['key'] == u'baz' and e['user'] == user -def test_exception_in_retrieval(): - class ExceptionFeatureRequester(FeatureRequester): - def __init__(self, *_): - pass +def test_no_defaults(): + assert "bar" == offline_client.variation('foo', user, default="bar") - def get_all(self): - raise Exception("blah") - client = LDClient(config=Config(base_uri="http://localhost:3000", - defaults={"foo": "bar"}, - feature_store=InMemoryFeatureStore(), - feature_requester_class=ExceptionFeatureRequester, +def test_event_for_existing_feature(): + feature = { + u'key': u'feature.key', + u'salt': u'abc', + u'on': True, + u'variations': ['a', 'b'], + u'fallthrough': { + u'variation': 1 + }, + u'trackEvents': True + } + store = InMemoryFeatureStore() + store.init({FEATURES: {'feature.key': feature}}) + client = make_client(store) + assert 'b' == client.variation('feature.key', user, default='c') + e = get_first_event(client) + assert (e['kind'] == 'feature' and + e['key'] == 'feature.key' and + e['user'] == user and + e['value'] == 'b' and + e['variation'] == 1 and + e['default'] == 'c' and + e['trackEvents'] == True) + + +def test_event_for_unknown_feature(): + store = InMemoryFeatureStore() + store.init({FEATURES: {}}) + client = make_client(store) + assert 'c' == client.variation('feature.key', user, default='c') + e = get_first_event(client) + assert (e['kind'] == 'feature' and + e['key'] == 'feature.key' and + e['user'] == user and + e['value'] == 'c' and + e['variation'] == None and + e['default'] == 'c') + + +def test_event_for_existing_feature_with_no_user(): + feature = { + u'key': u'feature.key', + u'salt': u'abc', + u'on': True, + u'variations': ['a', 'b'], + u'fallthrough': { + u'variation': 1 + }, + u'trackEvents': True + } + store = InMemoryFeatureStore() + store.init({FEATURES: {'feature.key': feature}}) + client = make_client(store) + assert 'c' == client.variation('feature.key', None, default='c') + e = get_first_event(client) + assert (e['kind'] == 'feature' and + e['key'] == 'feature.key' and + e['user'] == None and + e['value'] == 'c' and + e['variation'] == None and + e['default'] == 'c' and + e['trackEvents'] == True) + + +def test_event_for_existing_feature_with_no_user_key(): + feature = { + u'key': u'feature.key', + u'salt': u'abc', + u'on': True, + u'variations': ['a', 'b'], + u'fallthrough': { + u'variation': 1 + }, + u'trackEvents': True + } + store = InMemoryFeatureStore() + store.init({FEATURES: {'feature.key': feature}}) + client = make_client(store) + bad_user = { u'name': u'Bob' } + assert 'c' == client.variation('feature.key', bad_user, default='c') + e = get_first_event(client) + assert (e['kind'] == 'feature' and + e['key'] == 'feature.key' and + e['user'] == bad_user and + e['value'] == 'c' and + e['variation'] == None and + e['default'] == 'c' and + e['trackEvents'] == True) + + +def test_all_flags(): + feature = { + u'key': u'feature.key', + u'salt': u'abc', + u'on': True, + u'variations': ['a', 'b'], + u'fallthrough': { + u'variation': 1 + } + } + store = InMemoryFeatureStore() + store.init({FEATURES: {'feature.key': feature}}) + client = LDClient(config=Config(sdk_key = 'SDK_KEY', + base_uri="http://localhost:3000", + event_processor_class=MockEventProcessor, update_processor_class=MockUpdateProcessor, - event_consumer_class=MockConsumer)) - assert "bar" == client.variation('foo', user, default="jim") - assert wait_for_event(client, lambda e: e['kind'] == 'feature' and e['key'] == u'foo' and e['user'] == user) - - -def test_no_defaults(): - assert "bar" == offline_client.variation('foo', user, default="bar") + feature_store=store)) + result = client.all_flags(user) + assert (len(result) == 1 and + result.get('feature.key') == 'b') def test_secure_mode_hash(): user = {'key': 'Message'} assert offline_client.secure_mode_hash(user) == "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597" - - -def drain(queue): - while not queue.empty(): - queue.get() - queue.task_done() - return - - -def test_flush_empties_queue(): - client.track('my_event', user, 42) - client.track('my_event', user, 33) - drain(client._queue) - client.flush() - assert client._queue.empty() diff --git a/testing/test_user_filter.py b/testing/test_user_filter.py new file mode 100644 index 00000000..15550541 --- /dev/null +++ b/testing/test_user_filter.py @@ -0,0 +1,118 @@ +from builtins import object +import json +from ldclient.client import Config +from ldclient.user_filter import UserFilter + + +base_config = Config() +config_with_all_attrs_private = Config(all_attributes_private = True) +config_with_some_attrs_private = Config(private_attribute_names=[u'firstName', u'bizzle']) + +# users to serialize + +user = { + u'key': u'abc', + u'firstName': u'Sue', + u'custom': { + u'bizzle': u'def', + u'dizzle': u'ghi' + } +} + +user_specifying_own_private_attr = { + u'key': u'abc', + u'firstName': u'Sue', + u'custom': { + u'bizzle': u'def', + u'dizzle': u'ghi' + }, + u'privateAttributeNames': [ u'dizzle', u'unused' ] +} + +user_with_unknown_top_level_attrs = { + u'key': u'abc', + u'firstName': u'Sue', + u'species': u'human', + u'hatSize': 6, + u'custom': { + u'bizzle': u'def', + u'dizzle': u'ghi' + } +} + +anon_user = { + u'key': u'abc', + u'anonymous': True, + u'custom': { + u'bizzle': u'def', + u'dizzle': u'ghi' + } +} + +# expected results from serializing user + +user_with_all_attrs_hidden = { + u'key': u'abc', + u'custom': { }, + u'privateAttrs': [ u'bizzle', u'dizzle', u'firstName' ] +} + +user_with_some_attrs_hidden = { + u'key': u'abc', + u'custom': { + u'dizzle': u'ghi' + }, + u'privateAttrs': [ u'bizzle', u'firstName' ] +} + +user_with_own_specified_attr_hidden = { + u'key': u'abc', + u'firstName': u'Sue', + u'custom': { + u'bizzle': u'def' + }, + u'privateAttrs': [ u'dizzle' ] +} + +anon_user_with_all_attrs_hidden = { + u'key': u'abc', + u'anonymous': True, + u'custom': { }, + u'privateAttrs': [ u'bizzle', u'dizzle' ] +} + + +def test_all_user_attrs_serialized(): + uf = UserFilter(base_config) + j = uf.filter_user_props(user) + assert j == user + +def test_all_user_attrs_private(): + uf = UserFilter(config_with_all_attrs_private) + j = uf.filter_user_props(user) + assert j == user_with_all_attrs_hidden + +def test_some_user_attrs_private(): + uf = UserFilter(config_with_some_attrs_private) + j = uf.filter_user_props(user) + assert j == user_with_some_attrs_hidden + +def test_per_user_private_attr(): + uf = UserFilter(base_config) + j = uf.filter_user_props(user_specifying_own_private_attr) + assert j == user_with_own_specified_attr_hidden + +def test_per_user_private_attr_plus_global_private_attrs(): + uf = UserFilter(config_with_some_attrs_private) + j = uf.filter_user_props(user_specifying_own_private_attr) + assert j == user_with_all_attrs_hidden + +def test_unknown_top_level_attrs_stripped(): + uf = UserFilter(base_config) + j = uf.filter_user_props(user_with_unknown_top_level_attrs) + assert j == user + +def test_leave_anonymous_attr_as_is(): + uf = UserFilter(config_with_all_attrs_private) + j = uf.filter_user_props(anon_user) + assert j == anon_user_with_all_attrs_hidden