Skip to content

Properly await coroutines registered with app.on_shutdown #4641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 60 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
8ad812f
ignore warnings from upstream packages in pytest
rodja Apr 18, 2025
97bf33c
screen tests should not be async
rodja Apr 18, 2025
ebf0878
write backup synchronously if we do not have an event loop
rodja Apr 18, 2025
a9e78dd
rewrite deprecated utcfromtimestamp
rodja Apr 19, 2025
4cc1def
make utc compatible with lower python versions
rodja Apr 19, 2025
8955ff9
improve naming and simplify coroutine closing
rodja Apr 19, 2025
d6010f9
register storage.on_shutdown at startup not in app.__init__
rodja Apr 19, 2025
562ed11
clean up all sheduled coros and tasks on shutdown
rodja Apr 19, 2025
cd5b7c1
properly cancel remaining background tasks on shutdown
rodja Apr 19, 2025
2ef1095
fix typing by ensuring we always store coroutines
rodja Apr 19, 2025
2d20265
make sure list is not modified while iterating
rodja Apr 19, 2025
9a35eba
do not cancel our own shutdown
rodja Apr 19, 2025
52bf435
show test output when it happens
rodja Apr 20, 2025
af9b967
await app stopping on shutdown
rodja Apr 20, 2025
cafeff4
reactivate clean test output
rodja Apr 20, 2025
6cc0288
better naming
rodja Apr 20, 2025
f9936fa
wait for all tasks to complete within time limit
rodja Apr 20, 2025
03922f2
name background tasks where ever possible
rodja Apr 20, 2025
7967bfe
only stop for unexpected ERROR logs
rodja Apr 20, 2025
0648f54
increase execution limit to ensure tests do not get aborted if github…
rodja Apr 20, 2025
3731875
give the loop opportunity to cancel all tasks before stopping
rodja Apr 20, 2025
89afe58
cleanup
rodja Apr 20, 2025
0fdb23b
fix typing
rodja Apr 20, 2025
9b74daa
fix background task name
rodja Apr 20, 2025
f09e1a3
break on cancellation
rodja Apr 20, 2025
714f227
formatting
rodja Apr 20, 2025
fc058f0
do not cancel ongoing backups
rodja Apr 20, 2025
c8419bb
only execute user simulation tests for now
rodja Apr 21, 2025
2b4f4e0
introduce wait-for2 package to properly handle cancellation
rodja Apr 21, 2025
83dfe7b
replace all asyncio.wait_for with wait_for2
rodja Apr 21, 2025
deb1e36
mypy ignore wait_for2 module
rodja Apr 21, 2025
50782f9
properly cancel all tasks and wait for storage tasks
rodja Apr 21, 2025
78eb0ca
fix wait_for call
rodja Apr 21, 2025
6ab210e
reactivate all tests
rodja Apr 22, 2025
e3d4b56
more generic way to mark functions to be awaited on shutdown (instead…
rodja Apr 22, 2025
2987f5e
add some minimal documentation
rodja Apr 22, 2025
6c8af39
use public coro accessor
rodja Apr 22, 2025
fb8a956
fix mypy issue and reorganize
rodja Apr 22, 2025
c516d03
fix "leaked semaphore" warnings as hinted in #4131
rodja Apr 23, 2025
04bd6d3
prevent mypy warning
rodja Apr 23, 2025
d8f1c88
use existing multiprocessing import
rodja Apr 23, 2025
c4e8425
Revert "use existing multiprocessing import"
falkoschindler Apr 23, 2025
f514c66
Revert "prevent mypy warning"
falkoschindler Apr 23, 2025
ff1257a
Revert "fix "leaked semaphore" warnings as hinted in #4131"
falkoschindler Apr 23, 2025
c079949
Revert "increase execution limit to ensure tests do not get aborted i…
falkoschindler Apr 23, 2025
14a6536
isolate usage of wait_for2 for easier removal in Python 3.13
falkoschindler Apr 23, 2025
5732f97
make sure to run shutdown handlers in auto-index context
falkoschindler Apr 23, 2025
d3f5650
revert renaming to avoid breaking changes
falkoschindler Apr 23, 2025
353dbca
fix type annotation
falkoschindler Apr 23, 2025
6dc2d75
Merge branch 'main' into pytest-warnings
falkoschindler Apr 23, 2025
32b6b03
minor cleanup
falkoschindler Apr 23, 2025
deb5f87
code review
falkoschindler Apr 24, 2025
049a1bf
convert code snippets into an interactive demo
falkoschindler Apr 24, 2025
27a0f0d
rewrite @await_on_shutdown to not produce memory leaks
rodja Apr 25, 2025
b48ba1a
write backup with aiofiles
rodja Apr 25, 2025
cafbf4b
Revert "rewrite @await_on_shutdown to not produce memory leaks"
rodja Apr 28, 2025
323490e
use weakSet
rodja Apr 28, 2025
247a276
Merge branch 'main' into pytest-warnings
falkoschindler Apr 28, 2025
0fc1d92
cleanup
rodja Apr 29, 2025
9d4a001
add test to demonstrate problem with click handler
rodja Apr 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/update_version.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
import sys
from datetime import UTC, datetime
from datetime import datetime, timezone
from pathlib import Path

if __name__ == '__main__':
Expand All @@ -20,5 +20,5 @@
if line.startswith('version: '):
lines[i] = f'version: {version}'
if line.startswith('date-released: '):
lines[i] = f'date-released: "{datetime.now(UTC).strftime(r"%Y-%m-%d")}"'
lines[i] = f'date-released: "{datetime.now(timezone.utc).strftime(r"%Y-%m-%d")}"'
path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
7 changes: 3 additions & 4 deletions nicegui/air.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import asyncio
import gzip
import json
import logging
Expand Down Expand Up @@ -225,7 +224,7 @@ async def connect(self) -> None:
self.connecting = True
try:
if self.relay.connected:
await asyncio.wait_for(self.disconnect(), timeout=5)
await helpers.wait_for(self.disconnect(), timeout=5)
self.log.debug('Connecting...')
await self.relay.connect(
f'{RELAY_HOST}?device_token={self.token}',
Expand Down Expand Up @@ -270,10 +269,10 @@ def is_air_target(target_id: str) -> bool:
def connect() -> None:
"""Connect to the NiceGUI On Air server if there is an air instance."""
if core.air:
background_tasks.create(core.air.connect())
background_tasks.create(core.air.connect(), name='On Air connect')


def disconnect() -> None:
"""Disconnect from the NiceGUI On Air server if there is an air instance."""
if core.air:
background_tasks.create(core.air.disconnect())
background_tasks.create(core.air.disconnect(), name='On Air disconnect')
18 changes: 12 additions & 6 deletions nicegui/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ def __init__(self, **kwargs) -> None:
self._disconnect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
self._exception_handlers: List[Callable[..., Any]] = [log.exception]

self.on_shutdown(self.storage.on_shutdown)

@property
def is_starting(self) -> bool:
"""Return whether NiceGUI is starting."""
Expand All @@ -73,13 +71,21 @@ def start(self) -> None:
self._state = State.STARTING
for t in self._startup_handlers:
Client.auto_index_client.safe_invoke(t)
self.on_shutdown(self.storage.on_shutdown)
self.on_shutdown(background_tasks.teardown)
self._state = State.STARTED

def stop(self) -> None:
async def stop(self) -> None:
"""Stop NiceGUI. (For internal use only.)"""
self._state = State.STOPPING
for t in self._shutdown_handlers:
Client.auto_index_client.safe_invoke(t)
with Client.auto_index_client:
for t in self._shutdown_handlers:
if isinstance(t, Awaitable):
await t
else:
result = t(self) if len(inspect.signature(t).parameters) == 1 else t()
if helpers.is_coroutine_function(t):
await result
self._state = State.STOPPED

def on_connect(self, handler: Union[Callable, Awaitable]) -> None:
Expand Down Expand Up @@ -124,7 +130,7 @@ def handle_exception(self, exception: Exception) -> None:
for handler in self._exception_handlers:
result = handler() if not inspect.signature(handler).parameters else handler(exception)
if helpers.is_coroutine_function(handler):
background_tasks.create(result)
background_tasks.create(result, name=f'exception {handler.__name__}')

def shutdown(self) -> None:
"""Shut down NiceGUI.
Expand Down
4 changes: 2 additions & 2 deletions nicegui/app/range_response.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import hashlib
import mimetypes
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Generator

Expand All @@ -13,7 +13,7 @@
def get_range_response(file: Path, request: Request, chunk_size: int) -> Response:
"""Get a Response for the given file, supporting range-requests, E-Tag and Last-Modified."""
file_size = file.stat().st_size
last_modified_time = datetime.utcfromtimestamp(file.stat().st_mtime)
last_modified_time = datetime.fromtimestamp(file.stat().st_mtime, timezone.utc)
start = 0
end = file_size - 1
status_code = 200
Expand Down
66 changes: 57 additions & 9 deletions nicegui/background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
from __future__ import annotations

import asyncio
from typing import Awaitable, Dict, Set
import weakref
from typing import Any, Awaitable, Callable, Coroutine, Dict, Set

from . import core
from . import core, helpers
from .logging import log

running_tasks: Set[asyncio.Task] = set()
lazy_tasks_running: Dict[str, asyncio.Task] = {}
lazy_tasks_waiting: Dict[str, Awaitable] = {}
lazy_coroutines_waiting: Dict[str, Coroutine[Any, Any, Any]] = {}
functions_awaited_on_shutdown: weakref.WeakSet[Callable] = weakref.WeakSet()


def create(coroutine: Awaitable, *, name: str = 'unnamed task') -> asyncio.Task:
Expand All @@ -19,7 +22,7 @@ def create(coroutine: Awaitable, *, name: str = 'unnamed task') -> asyncio.Task:
See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task.
"""
assert core.loop is not None
coroutine = coroutine if asyncio.iscoroutine(coroutine) else asyncio.wait_for(coroutine, None)
coroutine = coroutine if asyncio.iscoroutine(coroutine) else helpers.wait_for(coroutine, None)
task: asyncio.Task = core.loop.create_task(coroutine, name=name)
task.add_done_callback(_handle_task_result)
running_tasks.add(task)
Expand All @@ -33,24 +36,69 @@ def create_lazy(coroutine: Awaitable, *, name: str) -> None:
If a third task with the same name is created while the first one is still running, the second one is discarded.
"""
if name in lazy_tasks_running:
if name in lazy_tasks_waiting:
asyncio.Task(lazy_tasks_waiting[name]).cancel()
lazy_tasks_waiting[name] = coroutine
if name in lazy_coroutines_waiting:
lazy_coroutines_waiting[name].close()
lazy_coroutines_waiting[name] = _ensure_coroutine(coroutine)
return

def finalize(name: str) -> None:
lazy_tasks_running.pop(name)
if name in lazy_tasks_waiting:
create_lazy(lazy_tasks_waiting.pop(name), name=name)
if name in lazy_coroutines_waiting:
create_lazy(lazy_coroutines_waiting.pop(name), name=name)
task = create(coroutine, name=name)
lazy_tasks_running[name] = task
task.add_done_callback(lambda _: finalize(name))


def await_on_shutdown(fn: Callable) -> Callable:
"""Tag a coroutine function so tasks created from it won't be cancelled during shutdown."""
functions_awaited_on_shutdown.add(fn)
return fn


def _ensure_coroutine(awaitable: Awaitable[Any]) -> Coroutine[Any, Any, Any]:
"""Convert an awaitable to a coroutine if it isn't already one."""
if asyncio.iscoroutine(awaitable):
return awaitable

async def wrapper() -> Any:
return await awaitable
return wrapper()


def _handle_task_result(task: asyncio.Task) -> None:
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as e:
core.app.handle_exception(e)


async def teardown() -> None:
"""Cancel all running tasks and coroutines on shutdown. (For internal use only.)"""
while running_tasks or lazy_tasks_running:
tasks = running_tasks | set(lazy_tasks_running.values())
for task in tasks:
if not task.done() and not task.cancelled() and not _should_await_on_shutdown(task):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not task.done() and not task.cancelled() and not _should_await_on_shutdown(task):
if not task.done() and not task.cancelled() and task.get_coro() not in functions_awaited_on_shutdown:

Is this possible?

task.cancel()
if tasks:
await asyncio.sleep(0) # NOTE: ensure the loop can cancel the tasks before it shuts down
try:
await helpers.wait_for(asyncio.gather(*tasks, return_exceptions=True), timeout=2.0)
except asyncio.TimeoutError:
log.error('Could not cancel %s tasks within timeout: %s',
len(tasks),
', '.join(t.get_name() for t in tasks if not t.done()))
except Exception:
log.exception('Error while cancelling tasks')
for coro in lazy_coroutines_waiting.values():
coro.close()


def _should_await_on_shutdown(task: asyncio.Task) -> bool:
try:
return any(fn.__code__ is task.get_coro().cr_frame.f_code # type: ignore
for fn in functions_awaited_on_shutdown)
except AttributeError:
return False
5 changes: 4 additions & 1 deletion nicegui/binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ async def refresh_loop() -> None:
"""Refresh all bindings in an endless loop."""
while True:
_refresh_step()
await asyncio.sleep(core.app.config.binding_refresh_interval)
try:
await asyncio.sleep(core.app.config.binding_refresh_interval)
except asyncio.CancelledError:
break


@contextmanager
Expand Down
13 changes: 9 additions & 4 deletions nicegui/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ async def delete_content() -> None:
self._delete_tasks.pop(document_id)
if not self.shared:
self.delete()
self._delete_tasks[document_id] = background_tasks.create(delete_content())
self._delete_tasks[document_id] = \
background_tasks.create(delete_content(), name=f'delete content {document_id}')

def _cancel_delete_task(self, document_id: str) -> None:
if document_id in self._delete_tasks:
Expand All @@ -302,20 +303,21 @@ def handle_javascript_response(self, msg: Dict) -> None:

def safe_invoke(self, func: Union[Callable[..., Any], Awaitable]) -> None:
"""Invoke the potentially async function in the client context and catch any exceptions."""
func_name = func.__name__ if hasattr(func, '__name__') else str(func)
try:
if isinstance(func, Awaitable):
async def func_with_client():
with self:
await func
background_tasks.create(func_with_client())
background_tasks.create(func_with_client(), name=f'func with client {self.id} {func_name}')
else:
with self:
result = func(self) if len(inspect.signature(func).parameters) == 1 else func()
if helpers.is_coroutine_function(func):
async def result_with_client():
with self:
await result
background_tasks.create(result_with_client())
background_tasks.create(result_with_client(), name=f'result with client {self.id} {func_name}')
except Exception as e:
core.app.handle_exception(e)

Expand Down Expand Up @@ -377,4 +379,7 @@ async def prune_instances(cls) -> None:
except Exception:
# NOTE: make sure the loop doesn't crash
log.exception('Error while pruning clients')
await asyncio.sleep(10)
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
break
2 changes: 1 addition & 1 deletion nicegui/elements/mixins/validation_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def await_error():
self.error = await result
if return_result:
raise NotImplementedError('The validate method cannot return results for async validation functions.')
background_tasks.create(await_error())
background_tasks.create(await_error(), name=f'validate {self.id}')
return True

if callable(self._validation):
Expand Down
2 changes: 1 addition & 1 deletion nicegui/functions/refreshable.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def refresh(self, *args: Any, **kwargs: Any) -> None:
if is_coroutine_function(self.func):
assert isinstance(result, Awaitable)
if core.loop and core.loop.is_running():
background_tasks.create(result)
background_tasks.create(result, name=f'refresh {self.func.__name__}')
else:
core.app.on_startup(result)

Expand Down
12 changes: 12 additions & 0 deletions nicegui/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import threading
import time
import webbrowser
from collections.abc import Awaitable
from pathlib import Path
from typing import Any, Optional, Set, Tuple, Union

import wait_for2

from .logging import log

_shown_warnings: Set[str] = set()
Expand Down Expand Up @@ -106,3 +109,12 @@ def kebab_to_camel_case(string: str) -> str:
def event_type_to_camel_case(string: str) -> str:
"""Convert an event type string to camelCase."""
return '.'.join(kebab_to_camel_case(part) if part != '-' else part for part in string.split('.'))


async def wait_for(fut: Awaitable, timeout: Optional[float] = None) -> None:
"""Wait for a future to complete.

This function is a wrapper around ``wait_for2.wait_for`` which is a drop-in replacement for ``asyncio.wait_for``.
It can be removed once we drop support for older versions than Python 3.13 which fixes ``asyncio.wait_for``.
"""
return await wait_for2.wait_for(fut, timeout)
4 changes: 3 additions & 1 deletion nicegui/javascript_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import asyncio
from typing import Any, ClassVar, Dict

from . import helpers


class JavaScriptRequest:
_instances: ClassVar[Dict[str, JavaScriptRequest]] = {}
Expand All @@ -23,7 +25,7 @@ def resolve(cls, request_id: str, result: Any) -> None:

def __await__(self) -> Any:
try:
yield from asyncio.wait_for(self._event.wait(), self.timeout).__await__()
yield from helpers.wait_for(self._event.wait(), self.timeout).__await__()
except asyncio.TimeoutError as e:
raise TimeoutError(f'JavaScript did not respond within {self.timeout:.1f} s') from e
else:
Expand Down
2 changes: 1 addition & 1 deletion nicegui/nicegui.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def _shutdown() -> None:
if app.native.main_window:
app.native.main_window.signal_server_shutdown()
air.disconnect()
app.stop()
await app.stop()
run.tear_down()


Expand Down
7 changes: 4 additions & 3 deletions nicegui/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections import deque
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple

from . import background_tasks, core
from . import background_tasks, core, helpers

if TYPE_CHECKING:
from .client import Client
Expand Down Expand Up @@ -72,7 +72,7 @@ async def loop(self) -> None:
try:
if not self._enqueue_event.is_set():
try:
await asyncio.wait_for(self._enqueue_event.wait(), timeout=1.0)
await helpers.wait_for(self._enqueue_event.wait(), timeout=1.0)
except (TimeoutError, asyncio.TimeoutError):
continue

Expand Down Expand Up @@ -101,7 +101,8 @@ async def loop(self) -> None:
await coro
except Exception as e:
core.app.handle_exception(e)

except asyncio.CancelledError:
break
except Exception as e:
core.app.handle_exception(e)
await asyncio.sleep(0.1)
Expand Down
10 changes: 6 additions & 4 deletions nicegui/persistence/file_persistent_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ def backup(self) -> None:
return
self.filepath.parent.mkdir(exist_ok=True)

async def backup() -> None:
@background_tasks.await_on_shutdown
async def async_backup() -> None:
async with aiofiles.open(self.filepath, 'w', encoding=self.encoding) as f:
await f.write(json.dumps(self, indent=self.indent))
if core.loop:
background_tasks.create_lazy(backup(), name=self.filepath.stem)

if core.loop and core.loop.is_running():
background_tasks.create_lazy(async_backup(), name=self.filepath.stem)
else:
core.app.on_startup(backup())
self.filepath.write_text(json.dumps(self, indent=self.indent), encoding=self.encoding)

def clear(self) -> None:
super().clear()
Expand Down
5 changes: 4 additions & 1 deletion nicegui/slot.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ async def prune_stacks(cls) -> None:
except Exception:
# NOTE: make sure the loop doesn't crash
log.exception('Error while pruning slot stacks')
await asyncio.sleep(10)
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
break


def get_task_id() -> int:
Expand Down
Loading
Loading