Skip to content

Commit 803887d

Browse files
author
Review
committed
fix(ex_app): bridge sync logger and model-fetch task to async NextcloudApp
After Phase 2 made ``NextcloudApp.log`` and ``NextcloudApp.set_init_status`` async, two ExApp helpers kept calling them as if they were sync — silently producing ``RuntimeWarning: coroutine '...' was never awaited`` and dropping the operation entirely. ``setup_nextcloud_logging`` is supposed to stream every Python log record into ``nextcloud.log``, and ``fetch_models_task`` is the default ``/init`` handler that reports model-download progress back to AppAPI; both were no-ops. logger.py * Capture the running event loop at ``setup_nextcloud_logging`` time and stash it on the handler. ``logging.Handler.emit`` is sync and can be called from any thread, so each record is dispatched to the captured loop with ``asyncio.run_coroutine_threadsafe``. Fire-and-forget, since logging must not block the caller. If the function is invoked outside a running loop (no-async setup), an inert handler is installed. integration_fastapi.py * ``fetch_models_task`` gains an optional ``loop`` parameter and is wrapped by a small ``_ProgressReporter`` that schedules ``set_init_status`` on the captured loop and waits with a 30s timeout for ordered progress updates. * The ``/init`` handler now captures ``asyncio.get_running_loop()`` and forwards it to the BackgroundTasks worker thread, where there is no loop of its own. * Inner ``__fetch_model_as_file`` / ``__fetch_model_as_snapshot`` accept the reporter callable in place of the bare ``NextcloudApp`` and use it for every progress tick. Bumps the version to 0.31.0.dev0 and adds a CHANGELOG entry covering the breaking sync removal, these regressions, and the conftest event-loop reset.
1 parent 65cbca3 commit 803887d

4 files changed

Lines changed: 104 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [0.31.0 - Unreleased]
6+
7+
### Removed (BREAKING)
8+
9+
- All synchronous API classes: `Nextcloud`, `NextcloudApp`, `TalkBot`, `nc_app`, `talk_bot_msg`, sync `enabled_handler`/`trigger_handler` in `set_handlers`, and the `FilesAPI`/`_TalkAPI`/etc. sync counterparts. The async classes (`AsyncNextcloud`, `AsyncNextcloudApp`, `AsyncTalkBot`, `anc_app`, `atalk_bot_msg`, …) are now the only implementation and have been renamed to drop the `Async` prefix; their sync namesakes were deprecated in v0.30.0 and have now been deleted. Backward-compat aliases (`AsyncNextcloud = Nextcloud`, etc.) are still exported and will be removed in v1.0.0.
10+
- The `caldav` integration is no longer reachable through `Nextcloud.cal` / `NextcloudApp.cal`; the underlying library is sync-only.
11+
12+
### Fixed
13+
14+
- ExApp logger handler (`setup_nextcloud_logging`) now actually delivers records to Nextcloud: the sync `logging.Handler.emit` schedules the now-async `NextcloudApp.log` on the captured event loop via `asyncio.run_coroutine_threadsafe`. Without this fix the coroutine was never awaited and log records were silently dropped.
15+
- `fetch_models_task` (used by the default `/init` handler) no longer leaves `NextcloudApp.set_init_status` coroutines unawaited; progress updates are dispatched onto the main event loop from the `BackgroundTasks` worker thread.
16+
- Test conftest resets `niquests` session adapters after the import-time capability/version probe so pytest-asyncio's loop populates fresh connection pools, preventing `RuntimeError: got Future <Future pending> attached to a different loop` on the first request.
17+
518
## [0.30.1 - 2026-04-26]
619

720
### Added

nc_py_api/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Version of nc_py_api."""
22

3-
__version__ = "0.30.1"
3+
__version__ = "0.31.0.dev0"

nc_py_api/ex_app/integration_fastapi.py

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""FastAPI directly related stuff."""
22

3+
import asyncio
34
import builtins
45
import fnmatch
56
import hashlib
@@ -108,7 +109,8 @@ async def heartbeat_callback():
108109

109110
@fast_api_app.post("/init")
110111
async def init_callback(b_tasks: BackgroundTasks, nc: typing.Annotated[NextcloudApp, Depends(nc_app)]):
111-
b_tasks.add_task(fetch_models_task, nc, models_to_fetch if models_to_fetch else {}, 0)
112+
loop = asyncio.get_running_loop()
113+
b_tasks.add_task(fetch_models_task, nc, models_to_fetch if models_to_fetch else {}, 0, loop)
112114
return JSONResponse(content={})
113115

114116
if map_app_static:
@@ -132,7 +134,12 @@ def __map_app_static_folders(fast_api_app: FastAPI):
132134
fast_api_app.mount(f"/{mnt_dir}", staticfiles.StaticFiles(directory=mnt_dir_path), name=mnt_dir)
133135

134136

135-
def fetch_models_task(nc: NextcloudApp, models: dict[str, dict], progress_init_start_value: int) -> None:
137+
def fetch_models_task(
138+
nc: NextcloudApp,
139+
models: dict[str, dict],
140+
progress_init_start_value: int,
141+
loop: asyncio.AbstractEventLoop | None = None,
142+
) -> None:
136143
"""Use for cases when you want to define custom `/init` but still need to easy download models.
137144
138145
:param nc: NextcloudApp instance.
@@ -155,32 +162,69 @@ def fetch_models_task(nc: NextcloudApp, models: dict[str, dict], progress_init_s
155162
All model options are optional and can be left empty.
156163
157164
:param progress_init_start_value: Integer value defining from which percent the progress should start.
165+
:param loop: Optional asyncio event loop used to dispatch progress updates. When this function
166+
runs as a FastAPI ``BackgroundTasks`` task it is invoked from a worker thread that has no
167+
loop of its own, so the caller must pass the main loop in. Defaults to
168+
:func:`asyncio.get_running_loop` if one is active in the current thread.
158169
159170
:raises ModelFetchError: in case of a model download error.
160171
:raises NextcloudException: in case of a network error reaching the Nextcloud server.
161172
"""
173+
if loop is None:
174+
try:
175+
loop = asyncio.get_running_loop()
176+
except RuntimeError:
177+
loop = None
178+
progress = _ProgressReporter(nc, loop)
162179
if models:
163180
current_progress = progress_init_start_value
164181
percent_for_each = min(int((100 - progress_init_start_value) / len(models)), 99)
165182
for model in models:
166183
try:
167184
if model.startswith(("http://", "https://")):
168185
models[model]["path"] = __fetch_model_as_file(
169-
current_progress, percent_for_each, nc, model, models[model]
186+
current_progress, percent_for_each, progress, model, models[model]
170187
)
171188
else:
172189
models[model]["path"] = __fetch_model_as_snapshot(
173-
current_progress, percent_for_each, nc, model, models[model]
190+
current_progress, percent_for_each, progress, model, models[model]
174191
)
175192
current_progress += percent_for_each
176193
except BaseException as e: # noqa pylint: disable=broad-exception-caught
177-
nc.set_init_status(current_progress, f"Downloading of '{model}' failed: {e}: {format_exc()}")
194+
progress(current_progress, f"Downloading of '{model}' failed: {e}: {format_exc()}")
178195
raise ModelFetchError(f"Downloading of '{model}' failed.") from e
179-
nc.set_init_status(100)
196+
progress(100)
197+
198+
199+
class _ProgressReporter:
200+
"""Bridges sync model-fetch code to the async :py:meth:`NextcloudApp.set_init_status`.
201+
202+
``fetch_models_task`` runs in a FastAPI ``BackgroundTasks`` worker thread when the caller
203+
is sync, so we cannot ``await`` here. Each call schedules a coroutine on the captured main
204+
event loop and waits for the result to keep progress reporting linearizable; if no loop
205+
is available the call is silently dropped so model downloads still proceed.
206+
"""
207+
208+
def __init__(self, nc: NextcloudApp, loop: asyncio.AbstractEventLoop | None):
209+
self._nc = nc
210+
self._loop = loop
211+
212+
def __call__(self, progress: int, error: str = "") -> None:
213+
if self._loop is None or self._loop.is_closed():
214+
return
215+
future = asyncio.run_coroutine_threadsafe(self._nc.set_init_status(progress, error), self._loop)
216+
try:
217+
future.result(timeout=30)
218+
except Exception: # noqa pylint: disable=broad-exception-caught
219+
future.cancel()
180220

181221

182222
def __fetch_model_as_file(
183-
current_progress: int, progress_for_task: int, nc: NextcloudApp, model_path: str, download_options: dict
223+
current_progress: int,
224+
progress_for_task: int,
225+
progress: _ProgressReporter,
226+
model_path: str,
227+
download_options: dict,
184228
) -> str:
185229
result_path = download_options.pop("save_path", urlparse(model_path).path.split("/")[-1])
186230
tmp_path = result_path + ".tmp"
@@ -209,7 +253,7 @@ def __fetch_model_as_file(
209253
for byte_block in iter(lambda: file.read(4096), b""):
210254
sha256_hash.update(byte_block)
211255
if f'"{sha256_hash.hexdigest()}"' == linked_etag:
212-
nc.set_init_status(min(current_progress + progress_for_task, 99))
256+
progress(min(current_progress + progress_for_task, 99))
213257
return result_path
214258

215259
try:
@@ -222,7 +266,7 @@ def __fetch_model_as_file(
222266
current_progress + int(progress_for_task * downloaded_size / total_size), 99
223267
)
224268
if new_progress != last_progress:
225-
nc.set_init_status(new_progress)
269+
progress(new_progress)
226270
last_progress = new_progress
227271
os.replace(tmp_path, result_path)
228272
except BaseException:
@@ -238,7 +282,11 @@ def __fetch_model_as_file(
238282

239283

240284
def __fetch_model_as_snapshot(
241-
current_progress: int, progress_for_task, nc: NextcloudApp, model_name: str, download_options: dict
285+
current_progress: int,
286+
progress_for_task,
287+
progress: _ProgressReporter,
288+
model_name: str,
289+
download_options: dict,
242290
) -> str:
243291
from huggingface_hub import snapshot_download # noqa isort:skip pylint: disable=C0415 disable=E0401
244292
from tqdm import tqdm # noqa isort:skip pylint: disable=C0415 disable=E0401
@@ -252,7 +300,7 @@ def __init__(self, *args, **kwargs):
252300

253301
def display(self, msg=None, pos=None):
254302
if self.total:
255-
nc.set_init_status(min(current_progress + int(progress_for_task * self.n / self.total), 99))
303+
progress(min(current_progress + int(progress_for_task * self.n / self.total), 99))
256304
return super().display(msg, pos)
257305

258306
workers = download_options.pop("max_workers", 2)

nc_py_api/ex_app/logger.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Transparent logging support to store logs in the nextcloud.log."""
22

3+
import asyncio
34
import logging
45
import threading
56

@@ -19,28 +20,54 @@
1920

2021

2122
class _NextcloudLogsHandler(logging.Handler):
22-
def __init__(self):
23+
"""Forwards Python ``logging`` records to ``nextcloud.log`` via :py:meth:`NextcloudApp.log`.
24+
25+
Python's logging API is synchronous but ``NextcloudApp.log`` is a coroutine, so each
26+
record is dispatched to a captured event loop with :func:`asyncio.run_coroutine_threadsafe`.
27+
The dispatch is fire-and-forget so emitters never block. If no loop was captured (for
28+
example, ``setup_nextcloud_logging`` was called outside of a running loop), records are
29+
silently dropped to avoid deadlocking caller threads.
30+
"""
31+
32+
def __init__(self, loop: asyncio.AbstractEventLoop | None = None):
2333
super().__init__()
34+
self._loop = loop
2435

2536
def emit(self, record):
37+
loop = self._loop
38+
if loop is None or loop.is_closed():
39+
return
2640
if THREAD_LOCAL.__dict__.get("nc_py_api.loghandler", False):
2741
return
2842

2943
try:
3044
THREAD_LOCAL.__dict__["nc_py_api.loghandler"] = True
3145
log_entry = self.format(record)
3246
log_level = record.levelno
33-
NextcloudApp().log(LOGLVL_MAP.get(log_level, LogLvl.FATAL), log_entry, fast_send=True)
47+
asyncio.run_coroutine_threadsafe(
48+
NextcloudApp().log(LOGLVL_MAP.get(log_level, LogLvl.FATAL), log_entry, fast_send=True),
49+
loop,
50+
)
3451
except Exception: # noqa pylint: disable=broad-exception-caught
3552
self.handleError(record)
3653
finally:
3754
THREAD_LOCAL.__dict__["nc_py_api.loghandler"] = False
3855

3956

4057
def setup_nextcloud_logging(logger_name: str | None = None, logging_level: int = logging.DEBUG):
41-
"""Function to easily send all or selected log entries to Nextcloud."""
58+
"""Attach a handler that streams Python logging records to the Nextcloud log file.
59+
60+
Must be called from a context with a running asyncio event loop (typically your
61+
FastAPI ``lifespan`` or any async startup hook). The captured loop is used to
62+
schedule the async :py:meth:`NextcloudApp.log` calls from sync ``emit``. Calling
63+
this from sync code installs an inert handler that drops records.
64+
"""
65+
try:
66+
loop = asyncio.get_running_loop()
67+
except RuntimeError:
68+
loop = None
4269
logger = logging.getLogger(logger_name)
43-
nextcloud_handler = _NextcloudLogsHandler()
70+
nextcloud_handler = _NextcloudLogsHandler(loop)
4471
nextcloud_handler.setLevel(logging_level)
4572
logger.addHandler(nextcloud_handler)
4673
return nextcloud_handler

0 commit comments

Comments
 (0)