Skip to content

Commit ed41dd3

Browse files
committed
feat: pluggable transport capabilities (Phase 1, 2, 3)
1 parent 5112ca6 commit ed41dd3

13 files changed

Lines changed: 478 additions & 45 deletions

File tree

libp2p/bitswap/cid.py

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,19 @@
2424
import hashlib
2525
from typing import TypeAlias
2626

27-
from cid import CIDv0, CIDv1, V0Builder, V1Builder, from_string, make_cid
28-
from cid.prefix import Prefix
27+
try:
28+
from cid import CIDv0, CIDv1, V0Builder, V1Builder, from_string, make_cid
29+
from cid.prefix import Prefix
30+
_HAS_CID_BUILDERS = True
31+
except Exception:
32+
# Older/newer py-cid variations may not expose builder/prefix helpers.
33+
from cid import CIDv0, CIDv1, from_string, make_cid
34+
Prefix = None # type: ignore
35+
V0Builder = None # type: ignore
36+
V1Builder = None # type: ignore
37+
_HAS_CID_BUILDERS = False
38+
39+
import multihash as _multihash
2940
from multicodec import Code, is_codec
3041
from multicodec.code_table import DAG_PB, RAW, SHA2_256
3142

@@ -61,7 +72,12 @@ def _normalise_codec(codec: Code | str | int) -> Code:
6172

6273
def compute_cid_v0_obj(data: bytes) -> CIDv0:
6374
"""Compute a CIDv0 object for data."""
64-
return V0Builder().sum(data)
75+
if _HAS_CID_BUILDERS and V0Builder is not None:
76+
return V0Builder().sum(data)
77+
78+
# Fallback: compute sha2-256 multihash and construct CIDv0
79+
mh = _multihash.digest(data, "sha2-256")
80+
return CIDv0(mh.encode())
6581

6682

6783
def compute_cid_v0(data: bytes) -> bytes:
@@ -84,7 +100,13 @@ def compute_cid_v0(data: bytes) -> bytes:
84100
def compute_cid_v1_obj(data: bytes, codec: Code | str | int = CODEC_RAW) -> CIDv1:
85101
"""Compute a CIDv1 object for data and codec."""
86102
code_obj = _normalise_codec(codec)
87-
return V1Builder(codec=str(code_obj), mh_type=str(HASH_SHA256)).sum(data)
103+
if _HAS_CID_BUILDERS and V1Builder is not None:
104+
return V1Builder(codec=str(code_obj), mh_type=str(HASH_SHA256)).sum(data)
105+
106+
# Fallback: compute multihash and construct CIDv1 using codec name
107+
codec_name = getattr(code_obj, "name", str(code_obj))
108+
mh = _multihash.digest(data, "sha2-256")
109+
return CIDv1(codec_name, mh.encode())
88110

89111

90112
def compute_cid_v1(data: bytes, codec: Code | str | int = CODEC_RAW) -> bytes:
@@ -132,7 +154,26 @@ def get_cid_prefix(cid: bytes) -> bytes:
132154
if cid_obj.version != CID_V1:
133155
return b""
134156

135-
return cid_obj.prefix().to_bytes()
157+
# Prefer high-level Prefix helper when available
158+
if Prefix is not None and hasattr(cid_obj, "prefix"):
159+
try:
160+
return cid_obj.prefix().to_bytes()
161+
except Exception:
162+
pass
163+
164+
# Fallback: reconstruct prefix by removing the digest bytes from the
165+
# raw CID buffer using py-multihash to determine digest length.
166+
try:
167+
mh_bytes = getattr(cid_obj, "multihash", None)
168+
if mh_bytes is None:
169+
# Some CID implementations expose different attributes
170+
# Fall back to parsing the tail of the buffer.
171+
return b""
172+
mh = _multihash.decode(mh_bytes)
173+
digest_len = mh.length
174+
return cid_obj.buffer[:-digest_len]
175+
except Exception:
176+
return b""
136177

137178

138179
def reconstruct_cid_from_prefix_and_data(prefix: bytes, data: bytes) -> bytes:
@@ -153,12 +194,16 @@ def reconstruct_cid_from_prefix_and_data(prefix: bytes, data: bytes) -> bytes:
153194
# No prefix means CIDv0
154195
return compute_cid_v0(data)
155196

156-
try:
157-
return Prefix.from_bytes(prefix).sum(data).buffer
158-
except ValueError:
159-
# Preserve previous permissive behavior for malformed prefixes.
160-
digest = hashlib.sha256(data).digest()
161-
return prefix + digest
197+
if Prefix is not None:
198+
try:
199+
return Prefix.from_bytes(prefix).sum(data).buffer
200+
except ValueError:
201+
pass
202+
203+
# Fallback: when Prefix helper isn't available, try to conservatively
204+
# append the raw digest to the prefix (preserves prior permissive behavior).
205+
digest = hashlib.sha256(data).digest()
206+
return prefix + digest
162207

163208

164209
def verify_cid(cid: bytes, data: bytes) -> bool:
@@ -187,7 +232,11 @@ def verify_cid(cid: bytes, data: bytes) -> bool:
187232
return False
188233

189234
try:
190-
recomputed = cid_obj.prefix().sum(data).buffer
235+
# Prefer using compute helpers which work across py-cid variants.
236+
if cid_obj.version == CID_V1:
237+
recomputed = compute_cid_v1(data, codec=getattr(cid_obj, "codec", None))
238+
else:
239+
recomputed = compute_cid_v0(data)
191240
except ValueError:
192241
logger.debug(" Failed to recompute CID from parsed prefix")
193242
return False

libp2p/capabilities.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
"""
2+
Transport and connection capability declarations.
3+
4+
Instead of the core checking ``isinstance(transport, QUICTransport)`` to decide
5+
whether to skip security or muxing upgrades, transports and connections *declare*
6+
what they already provide via simple boolean properties.
7+
8+
Any transport or connection that satisfies the structural protocol (duck typing)
9+
is automatically recognised — no base-class inheritance required.
10+
11+
Example — a hypothetical WebRTC transport that bundles its own DTLS + SCTP::
12+
13+
class WebRTCTransport(ITransport):
14+
@property
15+
def provides_security(self) -> bool:
16+
return True
17+
18+
@property
19+
def provides_muxing(self) -> bool:
20+
return True
21+
22+
See Also
23+
--------
24+
:pep:`544` — Structural subtyping (static duck typing) via ``typing.Protocol``.
25+
"""
26+
27+
from typing import Protocol, runtime_checkable
28+
29+
30+
@runtime_checkable
31+
class TransportCapabilities(Protocol):
32+
"""Structural protocol implemented by transports that bundle their own
33+
security and/or multiplexing layers.
34+
35+
A transport that does **not** implement these properties is assumed to
36+
provide neither (``getattr(t, 'provides_security', False)`` → ``False``).
37+
"""
38+
39+
@property
40+
def provides_security(self) -> bool:
41+
"""Return ``True`` if this transport includes built-in encryption /
42+
authentication (e.g. QUIC's integrated TLS 1.3)."""
43+
...
44+
45+
@property
46+
def provides_muxing(self) -> bool:
47+
"""Return ``True`` if this transport includes built-in stream
48+
multiplexing (e.g. QUIC's native streams)."""
49+
...
50+
51+
52+
@runtime_checkable
53+
class ConnectionCapabilities(Protocol):
54+
"""Structural protocol implemented by connections that are already
55+
secured and/or multiplexed at creation time.
56+
57+
A connection that does **not** implement these properties is assumed to
58+
be neither secure nor muxed.
59+
"""
60+
61+
@property
62+
def is_secure(self) -> bool:
63+
"""Return ``True`` if the connection is already encrypted and the
64+
remote peer's identity has been verified."""
65+
...
66+
67+
@property
68+
def is_muxed(self) -> bool:
69+
"""Return ``True`` if the connection already supports opening
70+
multiple independent streams."""
71+
...
72+
73+
74+
@runtime_checkable
75+
class NeedsSetup(Protocol):
76+
"""Structural protocol for transports that require lifecycle hooks
77+
from the swarm (e.g. a background nursery or a back-reference to
78+
the swarm itself).
79+
80+
Transports that do **not** need these hooks simply omit the methods.
81+
"""
82+
83+
def set_background_nursery(self, nursery: object) -> None:
84+
"""Receive the long-lived nursery managed by the swarm."""
85+
...
86+
87+
def set_swarm(self, swarm: object) -> None:
88+
"""Receive a back-reference to the owning swarm."""
89+
...

libp2p/custom_types.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,24 @@
1717
IMuxedStream = cast(type, object)
1818
QUICConnection = cast(type, object)
1919

20+
from libp2p.io.abc import (
21+
ReadWriteCloser,
22+
)
23+
from collections.abc import (
24+
Awaitable,
25+
Callable,
26+
Mapping,
27+
)
28+
from typing import TYPE_CHECKING, NewType, Union, cast
29+
30+
if TYPE_CHECKING:
31+
from libp2p.abc import IMuxedConn, IMuxedStream, INetStream, ISecureTransport
32+
else:
33+
IMuxedConn = cast(type, object)
34+
INetStream = cast(type, object)
35+
ISecureTransport = cast(type, object)
36+
IMuxedStream = cast(type, object)
37+
2038
from libp2p.io.abc import (
2139
ReadWriteCloser,
2240
)
@@ -37,6 +55,11 @@
3755
AsyncValidatorFn = Callable[[ID, rpc_pb2.Message], Awaitable[bool]]
3856
ValidatorFn = Union[SyncValidatorFn, AsyncValidatorFn]
3957
UnsubscribeFn = Callable[[], Awaitable[None]]
40-
TQUICStreamHandlerFn = Callable[[QUICStream], Awaitable[None]]
41-
TQUICConnHandlerFn = Callable[[QUICConnection], Awaitable[None]]
4258
MessageID = NewType("MessageID", str)
59+
60+
# Re-export QUIC-specific types for backward compatibility.
61+
# New code should import directly from libp2p.transport.quic.types.
62+
from libp2p.transport.quic.types import ( # noqa: E402, F401
63+
TQUICConnHandlerFn as TQUICConnHandlerFn,
64+
TQUICStreamHandlerFn as TQUICStreamHandlerFn,
65+
)

libp2p/host/basic_host.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
from libp2p.tools.async_service import (
9393
background_trio_service,
9494
)
95-
from libp2p.transport.quic.connection import QUICConnection
9695
import libp2p.utils.paths
9796
from libp2p.utils.varint import (
9897
read_length_prefixed_protobuf,
@@ -954,7 +953,7 @@ async def _on_notifee_connected(self, conn: INetConn) -> None:
954953
if not is_initiator:
955954
# Only the dialer (initiator) needs to actively run identify.
956955
return
957-
if not self._is_quic_muxer(muxed_conn):
956+
if not self._is_native_muxer(muxed_conn):
958957
return
959958
event_started = getattr(conn, "event_started", None)
960959
if event_started is not None and not event_started.is_set():
@@ -976,15 +975,16 @@ def _get_first_connection(self, peer_id: ID) -> INetConn | None:
976975
return connections[0]
977976
return None
978977

979-
def _is_quic_muxer(self, muxed_conn: IMuxedConn | None) -> bool:
980-
return isinstance(muxed_conn, QUICConnection)
978+
def _is_native_muxer(self, muxed_conn: IMuxedConn | None) -> bool:
979+
"""Return True if the muxed connection is natively muxed (e.g. QUIC)."""
980+
return getattr(muxed_conn, 'is_muxed', False)
981981

982982
def _should_identify_peer(self, peer_id: ID) -> bool:
983983
connection = self._get_first_connection(peer_id)
984984
if connection is None:
985985
return False
986986
muxed_conn = getattr(connection, "muxed_conn", None)
987-
return self._is_quic_muxer(muxed_conn)
987+
return self._is_native_muxer(muxed_conn)
988988

989989
# Reference: `BasicHost.newStreamHandler` in Go.
990990
async def _swarm_stream_handler(self, net_stream: INetStream) -> None:
@@ -1061,6 +1061,18 @@ async def _swarm_stream_handler(self, net_stream: INetStream) -> None:
10611061
await net_stream.reset()
10621062
return
10631063

1064+
# Phase 2: Check handler connection requirements (if declared)
1065+
from libp2p.requirements import check_connection_requirements
1066+
1067+
underlying_conn = getattr(net_stream, "muxed_conn", None)
1068+
if not check_connection_requirements(handler, underlying_conn):
1069+
logger.warning(
1070+
"Handler for protocol %s has unmet connection requirements "
1071+
"on stream from peer %s — proceeding anyway",
1072+
protocol,
1073+
net_stream.muxed_conn.peer_id,
1074+
)
1075+
10641076
await handler(net_stream)
10651077

10661078
def get_live_peers(self) -> list[ID]:

libp2p/network/swarm.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
)
3939
from libp2p.network.auto_connector import AutoConnector
4040
from libp2p.network.config import ConnectionConfig, RetryConfig
41+
from libp2p.transport.quic.config import QUICTransportConfig
4142
from libp2p.network.connection_gate import ConnectionGate
4243
from libp2p.network.connection_pruner import ConnectionPruner
4344
from libp2p.network.tag_store import TagInfo, TagStore
@@ -57,9 +58,6 @@
5758
OpenConnectionError,
5859
SecurityUpgradeFailure,
5960
)
60-
from libp2p.transport.quic.config import QUICTransportConfig
61-
from libp2p.transport.quic.connection import QUICConnection
62-
from libp2p.transport.quic.transport import QUICTransport
6361
from libp2p.transport.upgrader import (
6462
TransportUpgrader,
6563
)
@@ -200,13 +198,10 @@ async def run(self) -> None:
200198

201199
# Set background nursery BEFORE setting the event
202200
# This ensures transports have the nursery when they check
203-
if isinstance(self.transport, QUICTransport):
204-
self.transport.set_background_nursery(nursery)
205-
self.transport.set_swarm(self)
206-
elif hasattr(self.transport, "set_background_nursery"):
207-
# WebSocket transport also needs background nursery
208-
# for connection management
201+
if hasattr(self.transport, "set_background_nursery"):
209202
self.transport.set_background_nursery(nursery) # type: ignore[attr-defined]
203+
if hasattr(self.transport, "set_swarm"):
204+
self.transport.set_swarm(self) # type: ignore[attr-defined]
210205

211206
# Set event after background nursery is configured
212207
# This ensures transports have the nursery when they check the event
@@ -647,11 +642,11 @@ async def _dial_addr_single_attempt(self, addr: Multiaddr, peer_id: ID) -> INetC
647642
pass
648643
raise SwarmException(f"Unexpected error dialing peer {peer_id}") from e
649644

650-
if isinstance(self.transport, QUICTransport) and isinstance(
651-
raw_conn, IMuxedConn
645+
if getattr(self.transport, 'provides_muxing', False) and getattr(
646+
raw_conn, 'is_muxed', False
652647
):
653648
logger.info(
654-
"Skipping upgrade for QUIC, QUIC connections are already multiplexed"
649+
"Skipping upgrade — transport already provides security + muxing"
655650
)
656651
try:
657652
swarm_conn = await self.add_conn(raw_conn, direction="outbound")
@@ -886,7 +881,7 @@ async def new_stream(self, peer_id: ID) -> INetStream:
886881
f"Failed to get a valid connection for peer {peer_id}"
887882
)
888883

889-
if isinstance(self.transport, QUICTransport) and connection is not None:
884+
if getattr(self.transport, 'provides_muxing', False) and connection is not None:
890885
conn = cast("SwarmConn", connection)
891886
try:
892887
stream = await conn.new_stream()
@@ -1115,14 +1110,16 @@ async def conn_handler(
11151110
pass
11161111
return
11171112

1118-
# No need to upgrade QUIC Connection
1119-
if isinstance(self.transport, QUICTransport):
1113+
# No need to upgrade connections from transports with built-in muxing
1114+
if getattr(self.transport, 'provides_muxing', False):
11201115
try:
1121-
quic_conn = cast(QUICConnection, read_write_closer)
1122-
await self.add_conn(quic_conn, direction="inbound")
1123-
peer_id = quic_conn.peer_id
1116+
# The connection is already muxed; add it directly.
1117+
muxed_conn = cast(IMuxedConn, read_write_closer)
1118+
await self.add_conn(muxed_conn, direction="inbound")
1119+
peer_id = muxed_conn.peer_id
11241120
logger.debug(
1125-
f"successfully opened quic connection to peer {peer_id}"
1121+
"successfully opened native-muxed connection "
1122+
f"to peer {peer_id}"
11261123
)
11271124
# NOTE: This is a intentional barrier to prevent from the
11281125
# handler exiting and closing the connection.
@@ -1511,10 +1508,13 @@ async def add_conn(
15111508
logger.debug("Swarm::add_conn | starting muxed connection")
15121509
self.manager.run_task(muxed_conn.start)
15131510
await muxed_conn.event_started.wait()
1514-
# For QUIC connections, also verify connection is established
1515-
if isinstance(muxed_conn, QUICConnection):
1516-
if not muxed_conn.is_established:
1517-
await muxed_conn._connected_event.wait()
1511+
# For connections that need an explicit handshake-completion step,
1512+
# wait until the connection reports itself as established.
1513+
if hasattr(muxed_conn, 'is_established') and hasattr(
1514+
muxed_conn, '_connected_event'
1515+
):
1516+
if not muxed_conn.is_established: # type: ignore[attr-defined]
1517+
await muxed_conn._connected_event.wait() # type: ignore[attr-defined]
15181518
logger.debug("Swarm::add_conn | starting swarm connection")
15191519
self.manager.run_task(swarm_conn.start)
15201520
await swarm_conn.event_started.wait()

0 commit comments

Comments
 (0)