Skip to content

Commit 9c7777c

Browse files
authored
fix: Set explicit names for each spawned thread (#311)
1 parent cde6cfb commit 9c7777c

File tree

10 files changed

+35
-28
lines changed

10 files changed

+35
-28
lines changed

ldclient/client.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def __update_availability(self, available: bool):
108108
return
109109

110110
log.warn("Detected persistent store unavailability; updates will be cached until it recovers")
111-
task = RepeatingTask(0.5, 0, self.__check_availability)
111+
task = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability)
112112

113113
self.__lock.lock()
114114
self.__poller = task
@@ -172,6 +172,7 @@ class LDClient:
172172
173173
Client instances are thread-safe.
174174
"""
175+
175176
def __init__(self, config: Config, start_wait: float=5):
176177
"""Constructs a new LDClient instance.
177178
@@ -248,7 +249,7 @@ def _set_event_processor(self, config):
248249
if not config.event_processor_class:
249250
diagnostic_id = create_diagnostic_id(config)
250251
diagnostic_accumulator = None if config.diagnostic_opt_out else _DiagnosticAccumulator(diagnostic_id)
251-
self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator = diagnostic_accumulator)
252+
self._event_processor = DefaultEventProcessor(config, diagnostic_accumulator=diagnostic_accumulator)
252253
return diagnostic_accumulator
253254
self._event_processor = config.event_processor_class(config)
254255
return None
@@ -340,7 +341,7 @@ def track(self, event_name: str, context: Context, data: Optional[Any]=None,
340341
log.warning("Invalid context for track (%s)" % context.error)
341342
else:
342343
self._send_event(self._event_factory_default.new_custom_event(event_name,
343-
context, data, metric_value))
344+
context, data, metric_value))
344345

345346
def identify(self, context: Context):
346347
"""Reports details about an evaluation context.
@@ -711,5 +712,4 @@ def flag_tracker(self) -> FlagTracker:
711712
return self.__flag_tracker
712713

713714

714-
715715
__all__ = ['LDClient', 'Config']

ldclient/impl/big_segments.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@
1515
class BigSegmentStoreStatusProviderImpl(BigSegmentStoreStatusProvider):
1616
"""
1717
Default implementation of the BigSegmentStoreStatusProvider interface.
18-
18+
1919
The real implementation of getting the status is in BigSegmentStoreManager - we pass in a lambda that
2020
allows us to get the current status from that class. So this class provides a facade for that, and
2121
also adds the listener mechanism.
2222
"""
23+
2324
def __init__(self, status_getter: Callable[[], BigSegmentStoreStatus]):
2425
self.__status_getter = status_getter
2526
self.__status_listeners = Listeners()
2627
self.__last_status = None # type: Optional[BigSegmentStoreStatus]
27-
28+
2829
@property
2930
def status(self) -> BigSegmentStoreStatus:
3031
return self.__status_getter()
@@ -43,15 +44,17 @@ def _update_status(self, new_status: BigSegmentStoreStatus):
4344
self.__last_status = new_status
4445
self.__status_listeners.notify(new_status)
4546

47+
4648
class BigSegmentStoreManager:
4749
# use EMPTY_MEMBERSHIP as a singleton whenever a membership query returns None; it's safe to reuse it
4850
# because we will never modify the membership properties after they're queried
4951
EMPTY_MEMBERSHIP = {} # type: dict
50-
52+
5153
"""
5254
Internal component that decorates the Big Segment store with caching behavior, and also polls the
5355
store to track its status.
5456
"""
57+
5558
def __init__(self, config: BigSegmentsConfig):
5659
self.__store = config.store
5760

@@ -61,8 +64,8 @@ def __init__(self, config: BigSegmentsConfig):
6164
self.__poll_task = None # type: Optional[RepeatingTask]
6265

6366
if self.__store:
64-
self.__cache = ExpiringDict(max_len = config.context_cache_size, max_age_seconds=config.context_cache_time)
65-
self.__poll_task = RepeatingTask(config.status_poll_interval, 0, self.poll_store_and_update_status)
67+
self.__cache = ExpiringDict(max_len=config.context_cache_size, max_age_seconds=config.context_cache_time)
68+
self.__poll_task = RepeatingTask("ldclient.bigsegment.status-poll", config.status_poll_interval, 0, self.poll_store_and_update_status)
6669
self.__poll_task.start()
6770

6871
def stop(self):
@@ -74,7 +77,7 @@ def stop(self):
7477
@property
7578
def status_provider(self) -> BigSegmentStoreStatusProvider:
7679
return self.__status_provider
77-
80+
7881
def get_user_membership(self, user_key: str) -> Tuple[Optional[dict], str]:
7982
if not self.__store:
8083
return (None, BigSegmentsStatus.NOT_CONFIGURED)
@@ -101,7 +104,7 @@ def get_status(self) -> BigSegmentStoreStatus:
101104
return status if status else self.poll_store_and_update_status()
102105

103106
def poll_store_and_update_status(self) -> BigSegmentStoreStatus:
104-
new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below
107+
new_status = BigSegmentStoreStatus(False, False) # default to "unavailable" if we don't get a new status below
105108
if self.__store:
106109
try:
107110
metadata = self.__store.get_metadata()
@@ -115,5 +118,6 @@ def poll_store_and_update_status(self) -> BigSegmentStoreStatus:
115118
def is_stale(self, timestamp) -> bool:
116119
return (timestamp is None) or ((int(time.time() * 1000) - timestamp) >= self.__stale_after_millis)
117120

121+
118122
def _hash_for_user_key(user_key: str) -> str:
119123
return base64.b64encode(sha256(user_key.encode('utf-8')).digest()).decode('utf-8')

ldclient/impl/datasource/polling.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def __init__(self, config: Config, requester: FeatureRequester, store: FeatureSt
2121
self._requester = requester
2222
self._store = store
2323
self._ready = ready
24-
self._task = RepeatingTask(config.poll_interval, 0, self._poll)
24+
self._task = RepeatingTask("ldclient.datasource.polling", config.poll_interval, 0, self._poll)
2525

2626
def start(self):
2727
log.info("Starting PollingUpdateProcessor with request interval: " + str(self._config.poll_interval))

ldclient/impl/datasource/streaming.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
class StreamingUpdateProcessor(Thread, UpdateProcessor):
3333
def __init__(self, config, store, ready, diagnostic_accumulator):
34-
Thread.__init__(self)
34+
Thread.__init__(self, name="ldclient.datasource.streaming")
3535
self.daemon = True
3636
self._uri = config.stream_base_uri + STREAM_ALL_PATH
3737
self._config = config

ldclient/impl/events/event_processor.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -344,15 +344,15 @@ def __init__(self, inbox, config, http_client, diagnostic_accumulator=None):
344344
self._omit_anonymous_contexts = config.omit_anonymous_contexts
345345

346346
self._flush_workers = FixedThreadPool(__MAX_FLUSH_THREADS__, "ldclient.flush")
347-
self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.diag_flush")
347+
self._diagnostic_flush_workers = None if self._diagnostic_accumulator is None else FixedThreadPool(1, "ldclient.events.diag_flush")
348348
if self._diagnostic_accumulator is not None:
349349
init_event = create_diagnostic_init(self._diagnostic_accumulator.data_since_date,
350350
self._diagnostic_accumulator.diagnostic_id,
351351
config)
352352
task = DiagnosticEventSendTask(self._http, self._config, init_event)
353353
self._diagnostic_flush_workers.execute(task.run)
354354

355-
self._main_thread = Thread(target=self._run_main_loop)
355+
self._main_thread = Thread(target=self._run_main_loop, name="ldclient.events.processor")
356356
self._main_thread.daemon = True
357357
self._main_thread.start()
358358

@@ -504,13 +504,13 @@ class DefaultEventProcessor(EventProcessor):
504504
def __init__(self, config, http=None, dispatcher_class=None, diagnostic_accumulator=None):
505505
self._inbox = queue.Queue(config.events_max_pending)
506506
self._inbox_full = False
507-
self._flush_timer = RepeatingTask(config.flush_interval, config.flush_interval, self.flush)
508-
self._contexts_flush_timer = RepeatingTask(config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts)
507+
self._flush_timer = RepeatingTask("ldclient.events.flush", config.flush_interval, config.flush_interval, self.flush)
508+
self._contexts_flush_timer = RepeatingTask("ldclient.events.context-flush", config.context_keys_flush_interval, config.context_keys_flush_interval, self._flush_contexts)
509509
self._flush_timer.start()
510510
self._contexts_flush_timer.start()
511511
if diagnostic_accumulator is not None:
512-
self._diagnostic_event_timer = RepeatingTask(config.diagnostic_recording_interval,
513-
config.diagnostic_recording_interval, self._send_diagnostic)
512+
self._diagnostic_event_timer = RepeatingTask("ldclient.events.send-diagnostic", config.diagnostic_recording_interval,
513+
config.diagnostic_recording_interval, self._send_diagnostic)
514514
self._diagnostic_event_timer.start()
515515
else:
516516
self._diagnostic_event_timer = None

ldclient/impl/integrations/files/file_data_source.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def __init__(self, resolved_paths, reloader, interval):
189189
self._paths = resolved_paths
190190
self._reloader = reloader
191191
self._file_times = self._check_file_times()
192-
self._timer = RepeatingTask(interval, interval, self._poll)
192+
self._timer = RepeatingTask("ldclient.datasource.file.poll", interval, interval, self._poll)
193193
self._timer.start()
194194

195195
def stop(self):

ldclient/impl/repeating_task.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,16 @@
44
import time
55
from typing import Callable
66

7+
78
class RepeatingTask:
89
"""
910
A generic mechanism for calling a callback repeatedly at fixed intervals on a worker thread.
1011
"""
11-
def __init__(self, interval: float, initial_delay: float, callable: Callable):
12+
13+
def __init__(self, label, interval: float, initial_delay: float, callable: Callable):
1214
"""
1315
Creates the task, but does not start the worker thread yet.
14-
16+
1517
:param interval: maximum time in seconds between invocations of the callback
1618
:param initial_delay: time in seconds to wait before the first invocation
1719
:param callable: the function to execute repeatedly
@@ -20,7 +22,7 @@ def __init__(self, interval: float, initial_delay: float, callable: Callable):
2022
self.__initial_delay = initial_delay
2123
self.__action = callable
2224
self.__stop = Event()
23-
self.__thread = Thread(target=self._run)
25+
self.__thread = Thread(target=self._run, name=f"{label}.repeating")
2426
self.__thread.daemon = True
2527

2628
def start(self):

ldclient/testing/http_util.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def start_secure_server():
4242

4343
class MockServerWrapper(Thread):
4444
def __init__(self, port, secure):
45-
Thread.__init__(self)
45+
Thread.__init__(self, name="ldclient.testing.mock-server-wrapper")
4646
self.port = port
4747
self.uri = '%s://localhost:%d' % ('https' if secure else 'http', port)
4848
self.server = HTTPServer(('localhost', port), MockServerRequestHandler)

ldclient/testing/impl/events/test_event_processor.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -646,8 +646,9 @@ def event_consumer():
646646
if message.type == 'stop':
647647
message.param.set()
648648
return
649+
649650
def start_consuming_events():
650-
Thread(target=event_consumer).start()
651+
Thread(target=event_consumer, name="ldclient.testing.events.consumer").start()
651652

652653
with DefaultEventProcessor(config, mock_http, dispatcher_factory) as ep:
653654
ep_inbox = ep_inbox_holder[0]

ldclient/testing/impl/test_repeating_task.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
def test_task_does_not_start_when_created():
99
signal = Event()
10-
task = RepeatingTask(0.01, 0, lambda: signal.set())
10+
task = RepeatingTask("ldclient.testing.set-signal", 0.01, 0, lambda: signal.set())
1111
try:
1212
signal_was_set = signal.wait(0.1)
1313
assert signal_was_set == False
@@ -16,7 +16,7 @@ def test_task_does_not_start_when_created():
1616

1717
def test_task_executes_until_stopped():
1818
queue = Queue()
19-
task = RepeatingTask(0.1, 0, lambda: queue.put(time.time()))
19+
task = RepeatingTask("ldclient.testing.enqueue-time", 0.1, 0, lambda: queue.put(time.time()))
2020
try:
2121
last = None
2222
task.start()
@@ -47,7 +47,7 @@ def do_task():
4747
if counter >= 2:
4848
task.stop()
4949
stopped.set()
50-
task = RepeatingTask(0.01, 0, do_task)
50+
task = RepeatingTask("ldclient.testing.task-runner", 0.01, 0, do_task)
5151
try:
5252
task.start()
5353
assert stopped.wait(0.1) == True

0 commit comments

Comments
 (0)