|
1 | 1 | import asyncio |
2 | 2 | import os |
3 | 3 | import sys |
4 | | -from contextlib import asynccontextmanager |
| 4 | +from contextlib import asynccontextmanager, suppress |
5 | 5 | from enum import Enum, auto |
6 | 6 | from pathlib import Path |
7 | 7 | from typing import AsyncGenerator, Optional |
|
39 | 39 | if sys.platform == "win32": # pragma: no cover |
40 | 40 | asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
41 | 41 |
|
| 42 | + |
| 43 | +def maybe_install_uvloop(config: BasicMemoryConfig) -> bool: |
| 44 | + """Install the uvloop event-loop policy for the Postgres backend. |
| 45 | +
|
| 46 | + Trigger: process entrypoint starting with database_backend == postgres, |
| 47 | + uvloop importable, and a non-Windows platform. |
| 48 | + Why: asyncpg engine teardown (engine.dispose()) races the stdlib asyncio |
| 49 | + loop shutdown and surfaces "IndexError: pop from an empty deque" from |
| 50 | + base_events._run_once (see #831/#877). uvloop's C scheduler has no |
| 51 | + self._ready.popleft() codepath, so that class of crash cannot fire under it. |
| 52 | + Outcome: Postgres deployments run on uvloop; SQLite users keep the default |
| 53 | + loop (no behavior change, smaller blast radius). Must run before the event |
| 54 | + loop is created, i.e. before asyncio.run(). |
| 55 | +
|
| 56 | + Returns: |
| 57 | + True if the uvloop policy was installed, False otherwise. |
| 58 | + """ |
| 59 | + # uvloop is not available on Windows; the default loop already differs there. |
| 60 | + if sys.platform == "win32": # pragma: no cover |
| 61 | + return False |
| 62 | + |
| 63 | + # Limit the change to the backend that actually hits the asyncpg dispose race. |
| 64 | + if config.database_backend != DatabaseBackend.POSTGRES: |
| 65 | + return False |
| 66 | + |
| 67 | + # Deferred import: uvloop is an optional, platform-gated dependency and the |
| 68 | + # default (SQLite) path must not require it to be installed. |
| 69 | + try: |
| 70 | + import uvloop |
| 71 | + except ImportError: # pragma: no cover |
| 72 | + logger.warning("uvloop not available - using default event loop for Postgres backend") |
| 73 | + return False |
| 74 | + |
| 75 | + asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
| 76 | + logger.info("Installed uvloop event-loop policy for Postgres backend") |
| 77 | + return True |
| 78 | + |
| 79 | + |
42 | 80 | # Module level state |
43 | 81 | _engine: Optional[AsyncEngine] = None |
44 | 82 | _session_maker: Optional[async_sessionmaker[AsyncSession]] = None |
@@ -320,7 +358,15 @@ async def shutdown_db() -> None: # pragma: no cover |
320 | 358 | global _engine, _session_maker |
321 | 359 |
|
322 | 360 | if _engine: |
323 | | - await _engine.dispose() |
| 361 | + # Trigger: teardown can run while the surrounding task is being cancelled |
| 362 | + # (e.g. lifespan shutdown, unshielded CLI cleanup). |
| 363 | + # Why: a cancellation landing mid-dispose surfaces the asyncpg |
| 364 | + # "IndexError: pop from an empty deque" race (#831/#877); shielding lets |
| 365 | + # dispose finish atomically, and suppressing CancelledError keeps a |
| 366 | + # cancelled shutdown from re-raising the underlying race. |
| 367 | + # Outcome: connections always close cleanly even under cancellation. |
| 368 | + with suppress(asyncio.CancelledError): |
| 369 | + await asyncio.shield(_engine.dispose()) |
324 | 370 | _engine = None |
325 | 371 | _session_maker = None |
326 | 372 |
|
@@ -364,7 +410,14 @@ async def engine_session_factory( |
364 | 410 |
|
365 | 411 | yield created_engine, created_session_maker |
366 | 412 | finally: |
367 | | - await created_engine.dispose() |
| 413 | + # Trigger: context-manager teardown can run while the surrounding task is |
| 414 | + # being cancelled (e.g. a test aborting mid-fixture). |
| 415 | + # Why: on the asyncpg backend a cancellation landing mid-dispose surfaces |
| 416 | + # the "IndexError: pop from an empty deque" race (#831/#877); shield the |
| 417 | + # dispose and suppress CancelledError to match the other dispose seams. |
| 418 | + # Outcome: the per-context engine always disposes cleanly under cancellation. |
| 419 | + with suppress(asyncio.CancelledError): |
| 420 | + await asyncio.shield(created_engine.dispose()) |
368 | 421 |
|
369 | 422 | # Only clear module-level globals if they still point to this context's |
370 | 423 | # engine/session. This avoids clobbering newer globals from other callers. |
@@ -433,7 +486,11 @@ async def run_migrations( |
433 | 486 | # Trigger: run_migrations() created a temporary engine while module-level |
434 | 487 | # session maker was not initialized. |
435 | 488 | # Why: temporary aiosqlite worker threads can outlive CLI command execution |
436 | | - # and block process shutdown if the engine is not disposed. |
437 | | - # Outcome: always dispose temporary engines after migration work completes. |
| 489 | + # and block process shutdown if the engine is not disposed. On the asyncpg |
| 490 | + # backend a cancellation landing mid-dispose surfaces the same "IndexError: |
| 491 | + # pop from an empty deque" race as the other dispose seams (#831/#877), so |
| 492 | + # shield the dispose and suppress CancelledError to match them. |
| 493 | + # Outcome: always dispose temporary engines cleanly, even under cancellation. |
438 | 494 | if temp_engine is not None: |
439 | | - await temp_engine.dispose() |
| 495 | + with suppress(asyncio.CancelledError): |
| 496 | + await asyncio.shield(temp_engine.dispose()) |
0 commit comments