Skip to content

Commit 5112ca6

Browse files
gercebossNilavacul71cursoragent
authored
replace asyncio with trio (#1240)
* migrate to trio * test(yamux): serialize writes in TrioStreamAdapter to fix Windows BusyResourceError Trio allows only one task to use a stream at a time. In test_yamux_fin_on_window_update (and test_yamux_rst_on_window_update) the test injects a raw frame via secured_conn.write() while the server Yamux task also writes to the same stream, causing trio.BusyResourceError on Windows where scheduling exposes the race. Add a write lock so all writes are serialized and the tests pass on all platforms. Co-authored-by: Cursor <cursoragent@cursor.com> * add newsfragment for #174 --------- Co-authored-by: Nilav <nilav@Nilav-MacBook-Air.local> Co-authored-by: acul71 <34693171+acul71@users.noreply.github.com> Co-authored-by: acul71 <34693171+acul71@users.noreply.github.com> Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 55ca5a5 commit 5112ca6

9 files changed

Lines changed: 132 additions & 81 deletions

File tree

examples/autotls_browser/certificate_manager.py

Lines changed: 57 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
validation, and lifecycle management.
77
"""
88

9-
import asyncio
109
from datetime import datetime, timedelta
1110
import logging
1211
from pathlib import Path
@@ -17,6 +16,7 @@
1716
from cryptography.hazmat.primitives import hashes, serialization
1817
from cryptography.hazmat.primitives.asymmetric import rsa
1918
from cryptography.x509.oid import NameOID
19+
import trio
2020

2121
from libp2p.peer.id import ID
2222

@@ -51,7 +51,18 @@ def __init__(
5151
self.renewal_threshold_hours = renewal_threshold_hours
5252

5353
self._certificates: dict[tuple[ID, str], dict[Any, Any]] = {}
54-
self._renewal_tasks: dict[tuple[ID, str], asyncio.Task[Any]] = {}
54+
self._renewal_scopes: dict[tuple[ID, str], trio.CancelScope] = {}
55+
self._nursery: trio.Nursery | None = None
56+
57+
async def start(self, nursery: trio.Nursery) -> None:
58+
"""Attach manager to a long-lived nursery."""
59+
self._nursery = nursery
60+
61+
async def shutdown(self) -> None:
62+
"""Cancel all renewal jobs."""
63+
for scope in self._renewal_scopes.values():
64+
scope.cancel()
65+
logger.info("Certificate manager shutdown complete")
5566

5667
async def get_certificate(
5768
self,
@@ -183,13 +194,15 @@ async def _schedule_renewal(
183194
peer_id: ID,
184195
domain: str,
185196
cert_data: dict[Any, Any],
197+
_current_scope: trio.CancelScope | None = None,
186198
) -> None:
187199
"""Schedule certificate renewal."""
188200
key = (peer_id, domain)
189201

190202
# Cancel existing renewal task
191-
if key in self._renewal_tasks:
192-
self._renewal_tasks[key].cancel()
203+
existing_scope = self._renewal_scopes.get(key)
204+
if existing_scope is not None and existing_scope is not _current_scope:
205+
existing_scope.cancel()
193206

194207
# Calculate renewal time
195208
expires_at = datetime.fromisoformat(cert_data["expires_at"])
@@ -204,24 +217,41 @@ async def _schedule_renewal(
204217
f"Scheduling certificate renewal for {peer_id} in {delay:.0f} seconds"
205218
)
206219

207-
async def renew_certificate() -> None:
208-
try:
209-
await asyncio.sleep(delay)
210-
211-
logger.info(f"Renewing certificate for {peer_id} on {domain}")
212-
new_cert_data = await self.get_certificate(
213-
peer_id, domain, force_renew=True
214-
)
215-
216-
# Update cached certificate
217-
self._certificates[key] = new_cert_data # type: ignore
220+
scope = trio.CancelScope()
221+
self._renewal_scopes[key] = scope
218222

219-
except asyncio.CancelledError:
220-
logger.debug(f"Certificate renewal cancelled for {peer_id}")
221-
except Exception as e:
222-
logger.error(f"Certificate renewal failed for {peer_id}: {e}")
223-
224-
self._renewal_tasks[key] = asyncio.create_task(renew_certificate())
223+
async def renew_certificate() -> None:
224+
with scope:
225+
try:
226+
await trio.sleep(delay)
227+
228+
logger.info(f"Renewing certificate for {peer_id} on {domain}")
229+
new_cert_data = await self._generate_certificate(peer_id, domain)
230+
await self._store_certificate_to_storage(
231+
peer_id, domain, new_cert_data
232+
)
233+
self._certificates[key] = new_cert_data
234+
235+
await self._schedule_renewal(
236+
peer_id,
237+
domain,
238+
new_cert_data,
239+
_current_scope=scope,
240+
)
241+
242+
except trio.Cancelled:
243+
logger.debug(f"Certificate renewal cancelled for {peer_id}")
244+
raise
245+
except Exception as e:
246+
logger.error(f"Certificate renewal failed for {peer_id}: {e}")
247+
finally:
248+
if self._renewal_scopes.get(key) is scope:
249+
self._renewal_scopes.pop(key, None)
250+
251+
if self._nursery is not None:
252+
self._nursery.start_soon(renew_certificate)
253+
else:
254+
trio.lowlevel.spawn_system_task(renew_certificate)
225255

226256
def _get_cert_path(self, peer_id: ID, domain: str) -> Path:
227257
"""Get certificate file path."""
@@ -323,9 +353,9 @@ async def cleanup_expired_certificates(self) -> None:
323353
del self._certificates[key]
324354

325355
# Cancel renewal task
326-
if key in self._renewal_tasks:
327-
self._renewal_tasks[key].cancel()
328-
del self._renewal_tasks[key]
356+
scope = self._renewal_scopes.pop(key, None)
357+
if scope is not None:
358+
scope.cancel()
329359

330360
if expired_keys:
331361
logger.info(f"Cleaned up {len(expired_keys)} expired certificates")
@@ -375,26 +405,13 @@ async def revoke_certificate(
375405
del self._certificates[key]
376406

377407
# Cancel renewal task
378-
if key in self._renewal_tasks:
379-
self._renewal_tasks[key].cancel()
380-
del self._renewal_tasks[key]
408+
scope = self._renewal_scopes.pop(key, None)
409+
if scope is not None:
410+
scope.cancel()
381411

382412
# Remove from storage
383413
cert_path = self._get_cert_path(peer_id, domain)
384414
if cert_path.exists():
385415
cert_path.unlink()
386416

387417
logger.info(f"Revoked certificate for {peer_id} on {domain}")
388-
389-
async def shutdown(self) -> None:
390-
"""Shutdown certificate manager."""
391-
# Cancel all renewal tasks
392-
for task in self._renewal_tasks.values():
393-
if not task.done():
394-
task.cancel()
395-
396-
# Wait for tasks to complete
397-
if self._renewal_tasks:
398-
await asyncio.gather(*self._renewal_tasks.values(), return_exceptions=True)
399-
400-
logger.info("Certificate manager shutdown complete")

examples/persistent_peerstore/example_usage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
with Protocol Buffer serialization instead of unsafe pickle.
77
"""
88

9-
import asyncio
109
from pathlib import Path
1110
import tempfile
1211

1312
from multiaddr import Multiaddr
13+
import trio
1414

1515
from libp2p.peer.id import ID
1616
from libp2p.peer.persistent import (
@@ -111,7 +111,7 @@ def security_example():
111111

112112
if __name__ == "__main__":
113113
# Run async example
114-
asyncio.run(async_example())
114+
trio.run(async_example)
115115

116116
# Run sync example
117117
sync_example()

examples/transport_integration_demo.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
4. How the new system automatically selects the right transport
1010
"""
1111

12-
import asyncio
1312
import logging
1413
from pathlib import Path
1514
import sys
1615

16+
import trio
17+
1718
# Add the libp2p directory to the path so we can import it
1819
sys.path.insert(0, str(Path(__file__).parent.parent))
1920

@@ -200,7 +201,7 @@ async def main():
200201

201202
if __name__ == "__main__":
202203
try:
203-
asyncio.run(main())
204+
trio.run(main)
204205
except KeyboardInterrupt:
205206
print("\n👋 Demo interrupted by user")
206207
except Exception as e:

libp2p/tools/async_service/base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from abc import (
44
abstractmethod,
55
)
6-
import asyncio
76
from collections import (
87
Counter,
98
)
@@ -22,6 +21,8 @@
2221
)
2322
import uuid
2423

24+
import trio
25+
2526
from ._utils import (
2627
is_verbose_logging_enabled,
2728
)
@@ -350,7 +351,7 @@ async def _run_and_manage_task(self, task: TaskAPI) -> None:
350351
child,
351352
new_parent or "root",
352353
)
353-
except asyncio.CancelledError:
354+
except trio.Cancelled:
354355
logger.debug("%s: task %s raised CancelledError.", self, task)
355356
raise
356357
except Exception as err:

libp2p/transport/websocket/autotls.py

Lines changed: 53 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
Based on patterns from JavaScript and Go libp2p implementations.
99
"""
1010

11-
import asyncio
1211
from collections.abc import Callable
1312
from datetime import datetime, timedelta, timezone
1413
import logging
@@ -21,6 +20,7 @@
2120
from cryptography.hazmat.primitives import hashes, serialization
2221
from cryptography.hazmat.primitives.asymmetric import rsa
2322
from cryptography.x509.oid import NameOID
23+
import trio
2424

2525
from libp2p.peer.id import ID
2626

@@ -218,28 +218,26 @@ def __init__(
218218
self.on_certificate_renew = on_certificate_renew
219219

220220
self._active_certificates: dict[tuple[ID, str], TLSCertificate] = {}
221-
self._renewal_tasks: dict[tuple[ID, str], asyncio.Task[None]] = {}
222-
self._shutdown_event = asyncio.Event()
221+
self._renewal_scopes: dict[tuple[ID, str], trio.CancelScope] = {}
222+
self._nursery: trio.Nursery | None = None
223223

224-
async def start(self) -> None:
224+
async def start(self, nursery: trio.Nursery | None = None) -> None:
225225
"""Start the AutoTLS manager."""
226226
logger.info("Starting AutoTLS manager")
227+
if nursery is not None:
228+
self._nursery = nursery
227229
# Manager is ready to handle certificate requests
228230
pass
229231

230232
async def stop(self) -> None:
231233
"""Stop the AutoTLS manager."""
232234
logger.info("Stopping AutoTLS manager")
233-
self._shutdown_event.set()
234235

235236
# Cancel all renewal tasks
236-
for task in self._renewal_tasks.values():
237-
if not task.done():
238-
task.cancel()
237+
for scope in self._renewal_scopes.values():
238+
scope.cancel()
239239

240-
# Wait for tasks to complete
241-
if self._renewal_tasks:
242-
await asyncio.gather(*self._renewal_tasks.values(), return_exceptions=True)
240+
logger.info("AutoTLS manager stopped")
243241

244242
async def get_certificate(
245243
self,
@@ -358,13 +356,15 @@ async def _schedule_renewal(
358356
peer_id: ID,
359357
domain: str,
360358
cert: TLSCertificate,
359+
_current_scope: trio.CancelScope | None = None,
361360
) -> None:
362361
"""Schedule certificate renewal."""
363362
key = (peer_id, domain)
364363

365364
# Cancel existing renewal task
366-
if key in self._renewal_tasks:
367-
self._renewal_tasks[key].cancel()
365+
existing_scope = self._renewal_scopes.get(key)
366+
if existing_scope is not None and existing_scope is not _current_scope:
367+
existing_scope.cancel()
368368

369369
# Calculate renewal time
370370
renewal_time = cert.expires_at - timedelta(hours=self.renewal_threshold_hours)
@@ -378,26 +378,47 @@ async def _schedule_renewal(
378378
f"Scheduling certificate renewal for {peer_id} in {delay:.0f} seconds"
379379
)
380380

381-
async def renew_certificate() -> None:
382-
try:
383-
await asyncio.sleep(delay)
384-
385-
if self._shutdown_event.is_set():
386-
return
387-
388-
logger.info(f"Renewing certificate for {peer_id} on {domain}")
389-
new_cert = await self.get_certificate(peer_id, domain, force_renew=True)
390-
391-
# Notify renewal
392-
if self.on_certificate_renew:
393-
self.on_certificate_renew(new_cert)
394-
395-
except asyncio.CancelledError:
396-
logger.debug(f"Certificate renewal cancelled for {peer_id}")
397-
except Exception as e:
398-
logger.error(f"Certificate renewal failed for {peer_id}: {e}")
381+
scope = trio.CancelScope()
382+
self._renewal_scopes[key] = scope
399383

400-
self._renewal_tasks[key] = asyncio.create_task(renew_certificate())
384+
async def renew_certificate() -> None:
385+
with scope:
386+
try:
387+
await trio.sleep(delay)
388+
389+
logger.info(f"Renewing certificate for {peer_id} on {domain}")
390+
new_cert = await self._renew_certificate_now(peer_id, domain)
391+
392+
# Notify renewal
393+
if self.on_certificate_renew:
394+
self.on_certificate_renew(new_cert)
395+
396+
await self._schedule_renewal(
397+
peer_id,
398+
domain,
399+
new_cert,
400+
_current_scope=scope,
401+
)
402+
except trio.Cancelled:
403+
logger.debug(f"Certificate renewal cancelled for {peer_id}")
404+
raise
405+
except Exception as e:
406+
logger.error(f"Certificate renewal failed for {peer_id}: {e}")
407+
finally:
408+
if self._renewal_scopes.get(key) is scope:
409+
self._renewal_scopes.pop(key, None)
410+
411+
if self._nursery is not None:
412+
self._nursery.start_soon(renew_certificate)
413+
else:
414+
trio.lowlevel.spawn_system_task(renew_certificate)
415+
416+
async def _renew_certificate_now(self, peer_id: ID, domain: str) -> TLSCertificate:
417+
key = (peer_id, domain)
418+
cert = await self._generate_certificate(peer_id, domain)
419+
await self.storage.store_certificate(cert)
420+
self._active_certificates[key] = cert
421+
return cert
401422

402423
def get_ssl_context(self, peer_id: ID | None, domain: str) -> ssl.SSLContext | None:
403424
"""Get SSL context for peer ID and domain."""

newsfragments/174.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
py-libp2p now uses trio exclusively for async operations, removing remaining asyncio usage.

newsfragments/301.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Migrated remaining asyncio usage with trio, so the codebase uses a single async runtime.

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ test = [
9999
"factory-boy>=2.12.0,<3.0.0",
100100
"p2pclient>=0.2.1",
101101
"pytest>=7.0.0",
102-
"pytest-asyncio>=0.21.0",
103102
"pytest-timeout>=2.4.0",
104103
"pytest-trio>=0.5.2",
105104
"pytest-xdist>=2.4.0",

0 commit comments

Comments
 (0)