Skip to content

Commit e6071f7

Browse files
committed
Add main_thread_only execmodel
In order to prevent tasks from running in a non-main thread, wait for the previous task inside _try_send_to_primary_thread, then schedule the next task. Add a main_thread_only execmodel to distinguish this new behavior from the existing thread execmodel, since users of the thread execmodel expect that tasks can run in multiple threads concurrently.
1 parent 372168e commit e6071f7

File tree

7 files changed

+71
-16
lines changed

7 files changed

+71
-16
lines changed

doc/basics.rst

+5-5
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ processes then you often want to call ``group.terminate()``
138138
yourself and specify a larger or not timeout.
139139

140140

141-
threading models: gevent, eventlet, thread
142-
===========================================
141+
threading models: gevent, eventlet, thread, main_thread_only
142+
====================================================================
143143

144144
.. versionadded:: 1.2 (status: experimental!)
145145

146-
execnet supports "thread", "eventlet" and "gevent" as thread models
147-
on each of the two sides. You need to decide which model to use
148-
before you create any gateways::
146+
execnet supports "main_thread_only", "thread", "eventlet" and "gevent"
147+
as thread models on each of the two sides. You need to decide which
148+
model to use before you create any gateways::
149149

150150
# content of threadmodel.py
151151
import execnet

src/execnet/gateway_base.py

+27-4
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ def Event(self):
134134
return threading.Event()
135135

136136

137+
class MainThreadOnlyExecModel(ThreadExecModel):
138+
backend = "main_thread_only"
139+
140+
137141
class EventletExecModel(ExecModel):
138142
backend = "eventlet"
139143

@@ -254,6 +258,8 @@ def get_execmodel(backend):
254258
return backend
255259
if backend == "thread":
256260
return ThreadExecModel()
261+
elif backend == "main_thread_only":
262+
return MainThreadOnlyExecModel()
257263
elif backend == "eventlet":
258264
return EventletExecModel()
259265
elif backend == "gevent":
@@ -322,7 +328,7 @@ def __init__(self, execmodel, hasprimary=False):
322328
self._shuttingdown = False
323329
self._waitall_events = []
324330
if hasprimary:
325-
if self.execmodel.backend != "thread":
331+
if self.execmodel.backend not in ("thread", "main_thread_only"):
326332
raise ValueError("hasprimary=True requires thread model")
327333
self._primary_thread_task_ready = self.execmodel.Event()
328334
else:
@@ -332,7 +338,7 @@ def integrate_as_primary_thread(self):
332338
"""integrate the thread with which we are called as a primary
333339
thread for executing functions triggered with spawn().
334340
"""
335-
assert self.execmodel.backend == "thread", self.execmodel
341+
assert self.execmodel.backend in ("thread", "main_thread_only"), self.execmodel
336342
primary_thread_task_ready = self._primary_thread_task_ready
337343
# interacts with code at REF1
338344
while 1:
@@ -345,7 +351,11 @@ def integrate_as_primary_thread(self):
345351
with self._running_lock:
346352
if self._shuttingdown:
347353
break
348-
primary_thread_task_ready.clear()
354+
# Only clear if _try_send_to_primary_thread has not
355+
# yet set the next self._primary_thread_task reply
356+
# after waiting for this one to complete.
357+
if reply is self._primary_thread_task:
358+
primary_thread_task_ready.clear()
349359

350360
def trigger_shutdown(self):
351361
with self._running_lock:
@@ -376,6 +386,19 @@ def _try_send_to_primary_thread(self, reply):
376386
# wake up primary thread
377387
primary_thread_task_ready.set()
378388
return True
389+
elif (
390+
self.execmodel.backend == "main_thread_only"
391+
and self._primary_thread_task is not None
392+
):
393+
self._primary_thread_task.waitfinish()
394+
self._primary_thread_task = reply
395+
# wake up primary thread (it's okay if this is already set
396+
# because we waited for the previous task to finish above
397+
# and integrate_as_primary_thread will not clear it when
398+
# it enters self._running_lock if it detects that a new
399+
# task is available)
400+
primary_thread_task_ready.set()
401+
return True
379402
return False
380403

381404
def spawn(self, func, *args, **kwargs):
@@ -1132,7 +1155,7 @@ def serve(self):
11321155
def trace(msg):
11331156
self._trace("[serve] " + msg)
11341157

1135-
hasprimary = self.execmodel.backend == "thread"
1158+
hasprimary = self.execmodel.backend in ("thread", "main_thread_only")
11361159
self._execpool = WorkerPool(self.execmodel, hasprimary=hasprimary)
11371160
trace("spawning receiver thread")
11381161
self._initreceive()

src/execnet/multi.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def makegateway(self, spec=None):
107107
108108
id=<string> specifies the gateway id
109109
python=<path> specifies which python interpreter to execute
110-
execmodel=model 'thread', 'eventlet', 'gevent' model for execution
110+
execmodel=model 'thread', 'main_thread_only', 'eventlet', 'gevent' model for execution
111111
chdir=<path> specifies to which directory to change
112112
nice=<path> specifies process priority of new process
113113
env:NAME=value specifies a remote environment variable setting.

testing/conftest.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def anypython(request):
124124
pytest.skip(f"no {name} found")
125125
if "execmodel" in request.fixturenames and name != "sys.executable":
126126
backend = request.getfixturevalue("execmodel").backend
127-
if backend != "thread":
127+
if backend not in ("thread", "main_thread_only"):
128128
pytest.xfail(f"cannot run {backend!r} execmodel with bare {name}")
129129
return executable
130130

@@ -173,9 +173,11 @@ def gw(request, execmodel, group):
173173
return gw
174174

175175

176-
@pytest.fixture(params=["thread", "eventlet", "gevent"], scope="session")
176+
@pytest.fixture(
177+
params=["thread", "main_thread_only", "eventlet", "gevent"], scope="session"
178+
)
177179
def execmodel(request):
178-
if request.param != "thread":
180+
if request.param not in ("thread", "main_thread_only"):
179181
pytest.importorskip(request.param)
180182
if request.param in ("eventlet", "gevent") and sys.platform == "win32":
181183
pytest.xfail(request.param + " does not work on win32")

testing/test_gateway.py

+30
Original file line numberDiff line numberDiff line change
@@ -525,3 +525,33 @@ def sendback(channel):
525525
if interleave_getstatus:
526526
print(gw.remote_status())
527527
assert ch.receive(timeout=0.5) == 1234
528+
529+
530+
def test_assert_main_thread_only(execmodel, makegateway):
531+
if execmodel.backend != "main_thread_only":
532+
pytest.skip("can only run with main_thread_only")
533+
534+
gw = makegateway(spec=f"execmodel={execmodel.backend}//popen")
535+
536+
# Create multiple channels at once and assert that all tasks
537+
# execute in the main thread.
538+
channels = []
539+
for i in range(10):
540+
channels.append(
541+
gw.remote_exec(
542+
"""
543+
import time, threading
544+
time.sleep(0.02)
545+
channel.send(threading.current_thread() is threading.main_thread())
546+
"""
547+
)
548+
)
549+
550+
for ch in channels:
551+
res = ch.receive()
552+
ch.close()
553+
if res is not True:
554+
pytest.fail("remote raised\n%s" % res)
555+
556+
gw.exit()
557+
gw.join()

testing/test_termination.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def doit():
3636

3737

3838
def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel):
39-
if execmodel.backend != "thread":
39+
if execmodel.backend not in ("thread", "main_thread_only"):
4040
pytest.xfail("test and execnet not compatible to greenlets yet")
4141
gw = makegateway("popen")
4242
q = execmodel.queue.Queue()

testing/test_threadpool.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def wait_then_put():
164164

165165

166166
def test_primary_thread_integration(execmodel):
167-
if execmodel.backend != "thread":
167+
if execmodel.backend not in ("thread", "main_thread_only"):
168168
with pytest.raises(ValueError):
169169
WorkerPool(execmodel=execmodel, hasprimary=True)
170170
return
@@ -188,7 +188,7 @@ def func():
188188

189189

190190
def test_primary_thread_integration_shutdown(execmodel):
191-
if execmodel.backend != "thread":
191+
if execmodel.backend not in ("thread", "main_thread_only"):
192192
pytest.skip("can only run with threading")
193193
pool = WorkerPool(execmodel=execmodel, hasprimary=True)
194194
queue = execmodel.queue.Queue()

0 commit comments

Comments
 (0)