Skip to content

Commit 7908810

Browse files
fix(transport): remove nursery param from IListener.listen (adopts #1055) (#1308)
* refactor(abc): remove nursery parameter from IListener.listen The caller-supplied nursery was a leaky abstraction that forced Swarm to manage a listener_nursery on behalf of every transport. Listeners should own their own background task lifetimes internally. This commit updates the interface only; concrete implementations follow in subsequent commits. Part of #726. Co-authored-by: Michael Eze <code.maestro64@gmail.com> * refactor(transport/tcp): move nursery ownership inside TCPListener TCPListener now spawns its own internal nursery as a trio system task in listen(). The nursery hosts trio.serve_tcp and stays open until close() cancels it. listen() drops the nursery parameter and returns once the socket is bound (signalled via an internal trio.Event). Startup errors are captured and re-raised as OpenConnectionError to preserve the existing error contract. close() cancels the internal nursery, closes all SocketListeners, and waits for the background system task to finish so callers observe a fully-quiesced listener on return. Safe to call multiple times. Part of #726. Co-authored-by: Michael Eze <code.maestro64@gmail.com> * refactor(transport/websocket): move nursery ownership inside WebsocketListener Same pattern as TCPListener: listen() drops the nursery parameter and spawns a trio system task that hosts an internal nursery. serve_websocket runs under that nursery until close() cancels it. Startup errors are captured and re-raised as OpenConnectionError. close() now also awaits the background task's completion after cancelling the nursery, so callers observe a fully-quiesced listener on return. Part of #726. Co-authored-by: Michael Eze <code.maestro64@gmail.com> * refactor(transport/quic): remove nursery param from QUICListener.listen QUIC is a special case: the transport already carries a background nursery via set_background_nursery(). Prefer that nursery when available (the common path when used through a Swarm), and fall back to a self-spawned trio system task with its own internal nursery when it isn't. Either way, the listen() API no longer requires a caller- supplied nursery. close() cancels the self-spawned nursery when it was used and waits for the background task to finish. Part of #726. Co-authored-by: Michael Eze <code.maestro64@gmail.com> * refactor(relay/circuit_v2): drop unused nursery param from CircuitV2Listener.listen The CircuitV2 listener never used the nursery parameter — it just records the multiaddr to advertise. Drop the parameter to align with the updated IListener interface. Part of #726. Co-authored-by: Michael Eze <code.maestro64@gmail.com> * refactor(swarm): drop listener_nursery in favor of listener-owned nurseries Listeners now own their internal nursery, so Swarm no longer manages listener_nursery / event_listener_nursery_created. The outer nursery in run() is still needed for transport-level background tasks (QUIC / WebSocket set_background_nursery) and for the auto-connector, so it is renamed to background_nursery / event_background_nursery_created to reflect its actual purpose. run() now also calls listener.close() on every listener during shutdown, so their internal system-task nurseries are cancelled and fully awaited. listen() drops the nursery argument from its listener.listen() call. Part of #726. Co-authored-by: Michael Eze <code.maestro64@gmail.com> * test(transport): drop nursery arg from all listener.listen() call sites Updates the full test suite to match the new IListener.listen(maddr) signature: TCP, WebSocket, QUIC listener/integration tests, shared trio I/O fixtures, and the tests/utils/factories factory. Part of #726. Co-authored-by: Michael Eze <code.maestro64@gmail.com> * docs(news): add breaking changelog entry for #726 Co-authored-by: Michael Eze <code.maestro64@gmail.com> * fix: resolve CI lint failures (ruff F841, mypy no-redef/union-attr, format) - test_listener.py, test_websocket.py: remove now-unused `nursery` variable from `async with trio.open_nursery() as nursery:` blocks (ruff F841) — listen() no longer takes a nursery parameter - quic/listener.py: avoid re-declaring _owned_start_error with a type annotation in listen() (mypy no-redef); capture Event objects in local vars before entering the system-task closure so the `set()` calls resolve against non-Optional types (mypy union-attr) - tcp/tcp.py: ruff format reformatting Co-authored-by: Michael Eze <code.maestro64@gmail.com> * fix(tcp): own a single long-lived nursery across repeated listen() calls TCPListener used to reset per-call lifecycle state and open a fresh nursery on every listen(), orphaning earlier server tasks so close() could only cancel the last bind. Now the background system task and nursery are created once on first listen() and reused for subsequent binds. close() cancels them together, is idempotent, and listen() after close() raises OpenConnectionError. A trio.Lock serializes concurrent listen()/close() so the lazy spawn can't race. Adds test_tcp_listener_close_cancels_all_binds covering multi-bind teardown, idempotent close, and post-close rejection. Fixes the multi-bind regression acul71 flagged in the PR #1308 review. Renames the newsfragment to match issue #1314. * test: close listener explicitly in direct-listener tests and factory Tests that create TCPListener directly (test_trio_real_connection and raw_conn_factory) now call await listener.close() before letting the enclosing nursery cancel. This exercises the new listener-owns-its- lifetime contract and stops leaking listener tasks between tests. Addresses acul71's review on PR #1308. --------- Co-authored-by: Michael Eze <code.maestro64@gmail.com>
1 parent 0e88584 commit 7908810

14 files changed

Lines changed: 355 additions & 136 deletions

File tree

libp2p/abc.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,16 +1421,18 @@ class IListener(ABC):
14211421
"""
14221422

14231423
@abstractmethod
1424-
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> None:
1424+
async def listen(self, maddr: Multiaddr) -> None:
14251425
"""
14261426
Start listening on the specified multiaddress.
14271427
1428+
The listener manages its own background tasks internally and keeps
1429+
them alive until :meth:`close` is called. Callers do not need to
1430+
supply a nursery.
1431+
14281432
Parameters
14291433
----------
14301434
maddr : Multiaddr
14311435
The multiaddress on which to listen.
1432-
nursery : trio.Nursery
1433-
The nursery for spawning listening tasks.
14341436
14351437
Raises
14361438
------

libp2p/network/swarm.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,11 @@ class Swarm(Service, INetworkService):
9898
connections: dict[ID, list[INetConn]]
9999
listeners: dict[str, IListener]
100100
common_stream_handler: StreamHandlerFn
101-
listener_nursery: trio.Nursery | None
102-
event_listener_nursery_created: trio.Event
101+
# Background nursery used for transport-level background tasks (QUIC /
102+
# WebSocket set_background_nursery, auto-connector). Listeners no longer
103+
# need a caller-supplied nursery — they manage their own internally.
104+
background_nursery: trio.Nursery | None
105+
event_background_nursery_created: trio.Event
103106

104107
notifees: list[INotifee]
105108

@@ -145,8 +148,8 @@ def __init__(
145148

146149
self.common_stream_handler = create_default_stream_handler(self)
147150

148-
self.listener_nursery = None
149-
self.event_listener_nursery_created = trio.Event()
151+
self.background_nursery = None
152+
self.event_background_nursery_created = trio.Event()
150153

151154
# Load balancing state
152155
self._round_robin_index = {}
@@ -205,8 +208,10 @@ def set_resource_manager(
205208

206209
async def run(self) -> None:
207210
async with trio.open_nursery() as nursery:
208-
# Create a nursery for listener tasks.
209-
self.listener_nursery = nursery
211+
# This nursery hosts transport-level background tasks (QUIC /
212+
# WebSocket) and the auto-connector. Listeners own their own
213+
# internal nurseries and no longer use this one.
214+
self.background_nursery = nursery
210215

211216
# Set background nursery BEFORE setting the event
212217
# This ensures transports have the nursery when they check
@@ -218,9 +223,8 @@ async def run(self) -> None:
218223
# for connection management
219224
self.transport.set_background_nursery(nursery) # type: ignore[attr-defined]
220225

221-
# Set event after background nursery is configured
222-
# This ensures transports have the nursery when they check the event
223-
self.event_listener_nursery_created.set()
226+
# Signal that the background nursery is available.
227+
self.event_background_nursery_created.set()
224228

225229
# Start connection management components (go-libp2p style)
226230
try:
@@ -244,10 +248,17 @@ async def run(self) -> None:
244248
f"Error stopping connection management components: {e}"
245249
)
246250

247-
# The service ended. Cancel listener tasks.
251+
# Close all listeners so their internal nurseries are
252+
# cancelled and system tasks finish cleanly.
253+
for listener in list(self.listeners.values()):
254+
try:
255+
await listener.close()
256+
except Exception as e:
257+
logger.debug("Error closing listener during shutdown: %s", e)
258+
259+
# Cancel the background nursery (transport / auto-connector).
248260
nursery.cancel_scope.cancel()
249-
# Indicate that the nursery has been cancelled.
250-
self.listener_nursery = None
261+
self.background_nursery = None
251262

252263
def get_peer_id(self) -> ID:
253264
return self.self_id
@@ -1119,9 +1130,11 @@ async def listen(self, *multiaddrs: Multiaddr) -> bool:
11191130
- Map multiaddr to listener
11201131
"""
11211132
logger.debug(f"Swarm.listen called with multiaddrs: {multiaddrs}")
1122-
# We need to wait until `self.listener_nursery` is created.
1133+
# Wait until the background nursery is available so that transports
1134+
# which need it (QUIC, WebSocket) can reach it via their transport
1135+
# reference. Listeners themselves no longer require a nursery.
11231136
logger.debug("Starting to listen")
1124-
await self.event_listener_nursery_created.wait()
1137+
await self.event_background_nursery_created.wait()
11251138

11261139
success_count = 0
11271140
for maddr in multiaddrs:
@@ -1199,13 +1212,10 @@ async def conn_handler(
11991212
listener = self.transport.create_listener(conn_handler)
12001213
logger.debug(f"Swarm.listen: listener created for {maddr}")
12011214
self.listeners[str(maddr)] = listener
1202-
# TODO: `listener.listen` is not bounded with nursery. If we want to be
1203-
# I/O agnostic, we should change the API.
1204-
if self.listener_nursery is None:
1215+
if self.background_nursery is None:
12051216
raise SwarmException("swarm instance hasn't been run")
1206-
assert self.listener_nursery is not None # For type checker
12071217
logger.debug(f"Swarm.listen: calling listener.listen for {maddr}")
1208-
await listener.listen(maddr, self.listener_nursery)
1218+
await listener.listen(maddr)
12091219
logger.debug(f"Swarm.listen: listener.listen completed for {maddr}")
12101220

12111221
# Call notifiers since event occurred

libp2p/relay/circuit_v2/transport.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,16 +1077,14 @@ async def stream_handler(stream: INetStream) -> None:
10771077
finally:
10781078
logger.debug("CircuitV2Listener stopped")
10791079

1080-
async def listen(self, maddr: multiaddr.Multiaddr, nursery: trio.Nursery) -> None:
1080+
async def listen(self, maddr: multiaddr.Multiaddr) -> None:
10811081
"""
10821082
Start listening on the given multiaddr.
10831083
10841084
Parameters
10851085
----------
10861086
maddr : multiaddr.Multiaddr
10871087
The multiaddr to listen on
1088-
nursery : trio.Nursery
1089-
The nursery to run tasks in
10901088
10911089
"""
10921090
# Convert string to Multiaddr if needed

libp2p/transport/quic/listener.py

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,12 @@ def __init__(
107107
self._listening = False
108108
self._nursery: trio.Nursery | None = None
109109

110+
# State for the "owned nursery" fallback path in listen(). Populated
111+
# only when no transport background nursery is available.
112+
self._owned_started: trio.Event | None = None
113+
self._owned_stopped: trio.Event | None = None
114+
self._owned_start_error: BaseException | None = None
115+
110116
# Serialize promotion of pending connections to avoid duplicate promotions
111117
# and repeated connect() calls under concurrent packet processing.
112118
self._promotion_lock = trio.Lock()
@@ -1197,49 +1203,90 @@ async def _transmit_for_connection(
11971203
except Exception as e:
11981204
logger.error(f"Transmission error: {e}", exc_info=True)
11991205

1200-
async def listen(self, maddr: Multiaddr, nursery: trio.Nursery) -> None:
1201-
"""Start listening on the given multiaddr with enhanced connection handling."""
1206+
async def listen(self, maddr: Multiaddr) -> None:
1207+
"""
1208+
Start listening on the given multiaddr with enhanced connection handling.
1209+
1210+
The listener uses the transport's background nursery when available
1211+
(set via :meth:`QUICTransport.set_background_nursery`), and otherwise
1212+
spawns a private internal nursery as a trio system task. In either
1213+
case the listener no longer requires the caller to supply a nursery.
1214+
"""
12021215
if self._listening:
12031216
raise QUICListenError("Already listening")
12041217

12051218
if not is_quic_multiaddr(maddr):
12061219
raise QUICListenError(f"Invalid QUIC multiaddr: {maddr}")
12071220

1208-
if self._transport._background_nursery:
1209-
active_nursery = self._transport._background_nursery
1210-
logger.debug("Using transport background nursery for listener")
1211-
elif nursery:
1212-
active_nursery = nursery
1213-
self._transport._background_nursery = nursery
1214-
logger.debug("Using provided nursery for listener")
1215-
else:
1216-
raise QUICListenError("No nursery available")
1221+
host, port = quic_multiaddr_to_endpoint(maddr)
12171222

1218-
try:
1219-
host, port = quic_multiaddr_to_endpoint(maddr)
1223+
if self._transport._background_nursery is not None:
1224+
# Preferred path: reuse the transport's already-running nursery.
1225+
try:
1226+
self._socket = await self._create_socket(host, port)
1227+
self._nursery = self._transport._background_nursery
12201228

1221-
# Create and configure socket
1222-
self._socket = await self._create_socket(host, port)
1223-
self._nursery = active_nursery
1229+
bound_host, bound_port = self._socket.getsockname()[:2]
1230+
quic_version = multiaddr_to_quic_version(maddr)
1231+
bound_maddr = create_quic_multiaddr(
1232+
bound_host, bound_port, quic_version
1233+
)
1234+
self._bound_addresses = [bound_maddr]
1235+
self._listening = True
12241236

1225-
# Get the actual bound address (IPv4: 2-tuple, IPv6: 4-tuple)
1226-
bound_host, bound_port = self._socket.getsockname()[:2]
1227-
quic_version = multiaddr_to_quic_version(maddr)
1228-
bound_maddr = create_quic_multiaddr(bound_host, bound_port, quic_version)
1229-
self._bound_addresses = [bound_maddr]
1237+
self._nursery.start_soon(self._handle_incoming_packets)
1238+
logger.info(
1239+
f"QUIC listener started on {bound_maddr} with connection ID support"
1240+
)
1241+
return
1242+
except Exception as e:
1243+
await self.close()
1244+
raise QUICListenError(f"Failed to start listening: {e}") from e
1245+
1246+
# Fallback: spawn our own internal nursery as a trio system task
1247+
# so the listener is fully self-contained.
1248+
started = trio.Event()
1249+
stopped = trio.Event()
1250+
self._owned_started = started
1251+
self._owned_stopped = stopped
1252+
self._owned_start_error = None
1253+
1254+
async def _run_server() -> None:
1255+
try:
1256+
async with trio.open_nursery() as inner_nursery:
1257+
try:
1258+
self._socket = await self._create_socket(host, port)
1259+
self._nursery = inner_nursery
12301260

1231-
self._listening = True
1261+
bound_host, bound_port = self._socket.getsockname()[:2]
1262+
quic_version = multiaddr_to_quic_version(maddr)
1263+
bound_maddr = create_quic_multiaddr(
1264+
bound_host, bound_port, quic_version
1265+
)
1266+
self._bound_addresses = [bound_maddr]
1267+
self._listening = True
12321268

1233-
# Start packet handling loop
1234-
active_nursery.start_soon(self._handle_incoming_packets)
1269+
inner_nursery.start_soon(self._handle_incoming_packets)
1270+
logger.info(
1271+
f"QUIC listener started on {bound_maddr} "
1272+
"with connection ID support"
1273+
)
1274+
except BaseException as error:
1275+
self._owned_start_error = error
1276+
finally:
1277+
started.set()
1278+
finally:
1279+
stopped.set()
1280+
self._nursery = None
12351281

1236-
logger.info(
1237-
f"QUIC listener started on {bound_maddr} with connection ID support"
1238-
)
1282+
trio.lowlevel.spawn_system_task(_run_server)
1283+
await self._owned_started.wait()
12391284

1240-
except Exception as e:
1285+
if self._owned_start_error is not None:
12411286
await self.close()
1242-
raise QUICListenError(f"Failed to start listening: {e}") from e
1287+
raise QUICListenError(
1288+
f"Failed to start listening: {self._owned_start_error}"
1289+
) from self._owned_start_error
12431290

12441291
async def _create_socket(self, host: str, port: int) -> trio.socket.SocketType:
12451292
"""Create and configure UDP socket."""
@@ -1333,6 +1380,14 @@ async def close(self) -> None:
13331380

13341381
self._bound_addresses.clear()
13351382

1383+
# If we spawned our own internal nursery (fallback path),
1384+
# cancel it and wait for the background system task to finish.
1385+
if self._owned_started is not None and self._owned_started.is_set():
1386+
if self._nursery is not None:
1387+
self._nursery.cancel_scope.cancel()
1388+
if self._owned_stopped is not None:
1389+
await self._owned_stopped.wait()
1390+
13361391
logger.info("QUIC listener closed")
13371392

13381393
except Exception as e:

0 commit comments

Comments
 (0)