From ef03485ff55be3e9ccc4f9b9a8df691f1d6fb52b Mon Sep 17 00:00:00 2001 From: JimyMa Date: Wed, 27 May 2026 18:01:06 +0000 Subject: [PATCH 1/5] tcp_peer_agent --- .pre-commit-config.yaml | 6 +- dlslime/dlslime/peer_agent/_agent.py | 307 +++++++++++++++--- dlslime/dlslime/peer_agent/_mailbox.py | 6 + dlslime/dlslime/peer_agent/_tcp_endpoint.py | 89 +++++ .../python/p2p_tcp_rc_read_peer_agent.py | 68 ++++ .../examples/python/p2p_tcp_rc_send_recv.py | 91 ++++++ .../python/p2p_tcp_rc_write_peer_agent.py | 78 +++++ .../python/p2p_tcp_send_recv_peer_agent.py | 95 ++++++ dlslime/tests/python/test_peer_agent_tcp.py | 220 +++++++++++++ 9 files changed, 905 insertions(+), 55 deletions(-) create mode 100644 dlslime/dlslime/peer_agent/_tcp_endpoint.py create mode 100644 dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py create mode 100644 dlslime/examples/python/p2p_tcp_rc_send_recv.py create mode 100644 dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py create mode 100644 dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py create mode 100644 dlslime/tests/python/test_peer_agent_tcp.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8b79cbd1..147948a1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,9 +3,9 @@ repos: rev: v2.9.0 hooks: - id: ufmt - # black 25.x requires Python >= 3.9; pin a recent stable to avoid - # pre-commit defaulting to whatever system python happens to be. - language_version: python3.12 + # Use whatever Python pre-commit was launched with (3.9+ required by + # black 25.x). Avoids forcing a specific 3.x venv that may not exist + # on every machine. additional_dependencies: - black==25.9.0 - usort==1.0.8.post1 diff --git a/dlslime/dlslime/peer_agent/_agent.py b/dlslime/dlslime/peer_agent/_agent.py index d63792a6..43a7405e 100644 --- a/dlslime/dlslime/peer_agent/_agent.py +++ b/dlslime/dlslime/peer_agent/_agent.py @@ -14,7 +14,7 @@ import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple, Union try: import httpx @@ -26,10 +26,17 @@ ) from e from dlslime import discover_topology, RDMAContext, RDMAEndpoint, RDMAMemoryPool + +try: # TCP support is a build-time option (BUILD_TCP). Tolerate its absence. + from dlslime import TcpEndpoint as _TcpEndpoint +except ImportError: # pragma: no cover - exercised only on builds without TCP + _TcpEndpoint = None # type: ignore[assignment] + from dlslime.ctrl import NanoCtrlClient from dlslime.logging import get_logger from ._mailbox import StreamMailbox from ._obs import _tlog +from ._tcp_endpoint import TcpEndpointAdapter logger = get_logger("peer_agent") @@ -46,9 +53,58 @@ class RdmaResourceKey: ib_port: int link_type: str + @property + def transport(self) -> str: + return "rdma" + def redis_suffix(self) -> str: return f"{self.device}:{self.ib_port}:{self.link_type}" + def conn_id_segment(self) -> str: + return f"{self.device}:port{self.ib_port}" + + +@dataclass(frozen=True) +class TcpResourceKey: + """Resource key for the TCP transport. + + Carries the local bind ``host``/``port``. The peer's host/port is not + needed at the resource-key level — the rendezvous handshake passes the + peer's ``endpoint_info`` JSON straight to ``TcpEndpoint.connect``. + + The ``device``/``ib_port``/``link_type`` properties keep this key + interchangeable with ``RdmaResourceKey`` in code paths that read those + fields (Redis MR keys, connection metadata, ``PeerConnection.local_nic``). + """ + + host: str = "0.0.0.0" + port: int = 0 + + @property + def transport(self) -> str: + return "tcp" + + @property + def device(self) -> str: + return f"tcp:{self.host}:{self.port}" + + @property + def ib_port(self) -> int: + return int(self.port) + + @property + def link_type(self) -> str: + return "TCP" + + def redis_suffix(self) -> str: + return f"tcp:{self.host}:{self.port}" + + def conn_id_segment(self) -> str: + return f"tcp:{self.host}:{self.port}" + + +ResourceKey = Union[RdmaResourceKey, TcpResourceKey] + @dataclass class LogicalMemoryRegion: @@ -61,7 +117,7 @@ class LogicalMemoryRegion: @dataclass class MaterializedMemoryRegion: name: str - resource_key: RdmaResourceKey + resource_key: ResourceKey handler: int info: Dict[str, Any] @@ -73,8 +129,8 @@ def __init__( self, agent: "PeerAgent", peer_alias: str, - local_key: RdmaResourceKey, - peer_key: RdmaResourceKey, + local_key: ResourceKey, + peer_key: ResourceKey, qp_num: int, profile: str = "default", ) -> None: @@ -84,20 +140,28 @@ def __init__( self.peer_key = peer_key self.qp_num = qp_num self.profile = profile - self.endpoint: Optional[RDMAEndpoint] = None + self.endpoint: Optional[Any] = None self.memory_pool: Optional[RDMAMemoryPool] = None self.state = "connecting" + # Populated by the mailbox after a successful handshake. Lets the + # caller resolve peer MR handles for one-sided ops (TCP path) without + # going through Redis MR records (which are RDMA-specific). + self.peer_endpoint_info: Optional[Dict[str, Any]] = None + + @property + def transport(self) -> str: + return self.local_key.transport @property def conn_id(self) -> str: return ( - f"{self._agent.alias}:{self.local_key.device}:port{self.local_key.ib_port}" - f"->{self.peer_alias}:{self.peer_key.device}:port{self.peer_key.ib_port}" + f"{self._agent.alias}:{self.local_key.conn_id_segment()}" + f"->{self.peer_alias}:{self.peer_key.conn_id_segment()}" f"#qp{self.qp_num}" ) def attach_endpoint( - self, endpoint: RDMAEndpoint, memory_pool: RDMAMemoryPool + self, endpoint: Any, memory_pool: Optional[RDMAMemoryPool] ) -> None: self.endpoint = endpoint self.memory_pool = memory_pool @@ -161,7 +225,18 @@ def state(self) -> str: return self._connection().state @property - def endpoint(self) -> Optional[RDMAEndpoint]: + def peer_endpoint_info(self) -> Optional[Dict[str, Any]]: + """Return the peer's endpoint info as exchanged during the handshake. + + Populated only after the connection is established. For the TCP + transport this is the dict returned by the peer's + ``TcpEndpoint.endpoint_info()`` and includes ``mr_info`` — useful + for resolving remote MR handles for one-sided ``read``/``write``. + """ + return self._connection().peer_endpoint_info + + @property + def endpoint(self) -> Optional[Any]: """Return the selected endpoint once created, otherwise None.""" conn = self._connection() if conn.endpoint is not None: @@ -215,7 +290,7 @@ def __init__( self._resource_cache: Dict[str, Dict[str, Any]] = {} self._resource_cache_lock = threading.Lock() - self._endpoints: Dict[str, RDMAEndpoint] = {} + self._endpoints: Dict[str, Any] = {} self._endpoints_lock = ( threading.Lock() ) # Protects _endpoints for concurrent reconcile @@ -241,13 +316,13 @@ def __init__( self._connections: Dict[str, DirectedConnection] = {} self._connections_lock = threading.Lock() - self._contexts: Dict[RdmaResourceKey, RDMAContext] = {} - self._pools: Dict[RdmaResourceKey, RDMAMemoryPool] = {} + self._contexts: Dict[ResourceKey, RDMAContext] = {} + self._pools: Dict[ResourceKey, RDMAMemoryPool] = {} self._resource_lock = threading.Lock() self._logical_regions: Dict[str, LogicalMemoryRegion] = {} self._materialized_regions: Dict[ - Tuple[str, RdmaResourceKey], MaterializedMemoryRegion + Tuple[str, ResourceKey], MaterializedMemoryRegion ] = {} self._regions_lock = threading.Lock() @@ -382,8 +457,12 @@ def _first_usable_resource_key( raise RuntimeError(f"No usable RDMA resource found ({detail})") def _get_context_and_pool( - self, key: RdmaResourceKey - ) -> Tuple[RDMAContext, RDMAMemoryPool]: + self, key: ResourceKey + ) -> Tuple[Optional[RDMAContext], Optional[RDMAMemoryPool]]: + # TCP endpoints own their memory map internally; no shared + # context/pool object exists for them. + if isinstance(key, TcpResourceKey): + return None, None with self._resource_lock: if key not in self._contexts: ctx = RDMAContext() @@ -764,8 +843,8 @@ def _get_or_create_connection( self, peer_alias: str, *, - local_key: Optional[RdmaResourceKey] = None, - peer_key: Optional[RdmaResourceKey] = None, + local_key: Optional[ResourceKey] = None, + peer_key: Optional[ResourceKey] = None, qp_num: Optional[int] = None, profile: str = "default", ) -> DirectedConnection: @@ -792,11 +871,14 @@ def _get_or_create_connection( link_type=None, ) if peer_key is None: - peer_key = RdmaResourceKey( - local_key.device, - local_key.ib_port, - local_key.link_type, - ) + if isinstance(local_key, TcpResourceKey): + peer_key = TcpResourceKey() + else: + peer_key = RdmaResourceKey( + local_key.device, + local_key.ib_port, + local_key.link_type, + ) for conn in peer_connections: if conn.local_key == local_key and conn.peer_key == peer_key: if qp_num is not None and conn.qp_num != qp_num: @@ -829,12 +911,24 @@ def _connection_meta(self, conn_id: str) -> Dict[str, Any]: "qp_num": conn.qp_num, "link_type": conn.local_key.link_type, "profile": conn.profile, + "transport": conn.local_key.transport, } def _ensure_connection_from_meta( self, peer_alias: str, meta: Dict[str, Any] ) -> DirectedConnection: """Create local connection state from a peer's directed request.""" + if str(meta.get("transport") or "rdma").lower() == "tcp": + local_key: ResourceKey = TcpResourceKey() + peer_key: ResourceKey = TcpResourceKey() + return self._get_or_create_connection( + peer_alias, + local_key=local_key, + peer_key=peer_key, + qp_num=int(meta.get("qp_num") or 1), + profile=str(meta.get("profile") or "default"), + ) + local_key = self._first_usable_resource_key( self._local_resource, device=meta.get("dst_device"), @@ -860,7 +954,7 @@ def _ensure_connection_from_meta( profile=str(meta.get("profile") or "default"), ) - def _ensure_local_endpoint_created(self, conn_id: str) -> RDMAEndpoint: + def _ensure_local_endpoint_created(self, conn_id: str): """ Idempotent: create endpoint for peer if not exists. Returns the endpoint (existing or newly created). Thread-safe. @@ -884,6 +978,10 @@ def _ensure_local_endpoint_created(self, conn_id: str) -> RDMAEndpoint: with self._connections_lock: conn = self._connections[conn_id] + + if isinstance(conn.local_key, TcpResourceKey): + return self._ensure_local_tcp_endpoint(conn_id, conn) + _, pool = self._get_context_and_pool(conn.local_key) self._materialize_all_regions_for_key(conn.local_key) @@ -905,6 +1003,52 @@ def _ensure_local_endpoint_created(self, conn_id: str) -> RDMAEndpoint: conn.attach_endpoint(new_ep, pool) return new_ep + def _ensure_local_tcp_endpoint( + self, conn_id: str, conn: DirectedConnection + ) -> TcpEndpointAdapter: + """Construct (or reuse) a TCP endpoint adapter for a connection.""" + if _TcpEndpoint is None: + raise RuntimeError( + "TCP transport requested but dlslime was built without BUILD_TCP." + ) + local_key: TcpResourceKey = conn.local_key # type: ignore[assignment] + new_ep = TcpEndpointAdapter( + _TcpEndpoint(ip=local_key.host, port=local_key.port) + ) + + # Replay any previously-registered logical regions onto the new + # endpoint so the handshake's endpoint_info() carries them. Done + # before publishing the endpoint so concurrent MR registrations + # see the endpoint via the same lock ordering as the RDMA path. + with self._regions_lock: + regions = list(self._logical_regions.values()) + for region in regions: + try: + new_ep.register_memory_region( + region.name, region.ptr, region.offset, region.length + ) + except Exception as e: + logger.warning( + "PeerAgent %s: TCP MR replay for %s failed during " + "endpoint creation: %s", + self.alias, + region.name, + e, + ) + + with self._endpoints_lock: + existing = self._endpoints.get(conn_id) + if existing is not None: + # Race: another thread won. Drop the spare adapter. + try: + new_ep.shutdown() + except Exception: + pass + return existing + self._endpoints[conn_id] = new_ep + conn.attach_endpoint(new_ep, None) + return new_ep + def get_connections(self) -> Dict[str, Dict[str, PeerConnection]]: """Return local connections grouped by peer and connection id.""" result: Dict[str, Dict[str, PeerConnection]] = {} @@ -977,8 +1121,18 @@ def connect_to( ib_port: Optional[int] = 1, qp_num: Optional[int] = 1, min_bw: Optional[str] = None, + transport: str = "rdma", + local_host: str = "0.0.0.0", + local_port: int = 0, ) -> PeerConnection: - """Start connecting to a peer and return a connection handle.""" + """Start connecting to a peer and return a connection handle. + + ``transport='rdma'`` (default) selects the RDMA path, picking a NIC + from the discovered local topology. ``transport='tcp'`` selects the + TCP path: ``local_host``/``local_port`` configure the local bind + (port 0 = OS-assigned), and ``ib_port``/``qp_num``/``local_device``/ + ``peer_device`` are ignored. + """ if not isinstance(peer_alias, str) or not peer_alias: raise TypeError("connect_to() requires a non-empty peer alias string") if peer_alias == self.alias: @@ -987,28 +1141,43 @@ def connect_to( "Start peer agents with distinct aliases." ) + transport_norm = (transport or "rdma").lower() + if transport_norm not in ("rdma", "tcp"): + raise ValueError( + f"connect_to: unsupported transport {transport!r} " + "(expected 'rdma' or 'tcp')" + ) + _tlog(f"{self.alias}: connect_to({peer_alias}) ENTER") t0 = time.perf_counter() - local_key = self._first_usable_resource_key( - self._local_resource, - device=local_device, - ib_port=ib_port, - link_type=None, - ) - peer_key = self._resolve_peer_resource_key( - peer_alias, - peer_device=peer_device, - ib_port=ib_port if ib_port is not None else local_key.ib_port, - link_type=local_key.link_type, - fallback_device=local_key.device, - ) - if local_key.link_type != peer_key.link_type: - raise RuntimeError( - f"Cannot connect {self.alias}:{local_key.device} to " - f"{peer_alias}:{peer_key.device}: link_type mismatch " - f"{local_key.link_type} != {peer_key.link_type}" + if transport_norm == "tcp": + local_key: ResourceKey = TcpResourceKey( + host=local_host, port=int(local_port) ) + # Peer host/port don't matter for the resource key — the rendezvous + # passes the peer's TcpEndpoint.endpoint_info() JSON to connect(). + peer_key: ResourceKey = TcpResourceKey() + else: + local_key = self._first_usable_resource_key( + self._local_resource, + device=local_device, + ib_port=ib_port, + link_type=None, + ) + peer_key = self._resolve_peer_resource_key( + peer_alias, + peer_device=peer_device, + ib_port=ib_port if ib_port is not None else local_key.ib_port, + link_type=local_key.link_type, + fallback_device=local_key.device, + ) + if local_key.link_type != peer_key.link_type: + raise RuntimeError( + f"Cannot connect {self.alias}:{local_key.device} to " + f"{peer_alias}:{peer_key.device}: link_type mismatch " + f"{local_key.link_type} != {peer_key.link_type}" + ) conn = self._get_or_create_connection( peer_alias, @@ -1094,28 +1263,62 @@ def register_memory_region( offset: int, length: int, ) -> int: - """Register a logical memory region and publish materialized keys.""" + """Register a logical memory region and publish materialized keys. + + For RDMA, this materializes onto the default RDMA resource and + returns its handler (preserving the historical return contract). + For TCP-only agents (no RDMA NICs), this records the logical + region and registers it on every existing TCP endpoint, returning + ``0`` — TCP handles are per-endpoint and must be obtained via + ``conn.endpoint.register_memory_region`` for one-sided ops. + """ region = LogicalMemoryRegion(mr_name, ptr, offset, length) with self._regions_lock: self._logical_regions[mr_name] = region - # Preserve the old return contract by materializing immediately on a - # deterministic default resource. Connection-level read/write will - # materialize again for a different resource key if needed. + # Eagerly register on any existing TCP endpoints so handshake-time + # endpoint_info() carries the MR. Idempotent on the C++ side. + with self._endpoints_lock: + tcp_endpoints = [ + ep + for ep in self._endpoints.values() + if isinstance(ep, TcpEndpointAdapter) + ] + for ep in tcp_endpoints: + try: + ep.register_memory_region(mr_name, ptr, offset, length) + except Exception as e: + logger.warning( + "PeerAgent %s: TCP MR replay for %s failed: %s", + self.alias, + mr_name, + e, + ) + + # If the agent has no RDMA resources to materialize against (TCP-only + # build or TCP-only deployment), skip materialization. Callers using + # one-sided ops should fetch handles from the connection endpoint. key = self._default_local_resource_key() + if isinstance(key, TcpResourceKey): + return 0 + materialized = self._materialize_region(mr_name, key) self._publish_memory_keys() return materialized.handler - def _default_local_resource_key(self) -> RdmaResourceKey: + def _default_local_resource_key(self) -> ResourceKey: with self._connections_lock: if self._connections: return next(iter(self._connections.values())).local_key - return self._first_usable_resource_key( - self._local_resource, - ib_port=1, - link_type=None, - ) + try: + return self._first_usable_resource_key( + self._local_resource, + ib_port=1, + link_type=None, + ) + except RuntimeError: + # No RDMA resources discovered — TCP-only environment. + return TcpResourceKey() def _publish_memory_keys(self) -> None: if self._redis_client is None or not self.alias: diff --git a/dlslime/dlslime/peer_agent/_mailbox.py b/dlslime/dlslime/peer_agent/_mailbox.py index 63caa9d1..ab4a399e 100644 --- a/dlslime/dlslime/peer_agent/_mailbox.py +++ b/dlslime/dlslime/peer_agent/_mailbox.py @@ -374,6 +374,12 @@ def _try_connect_peer_inner( # D. Complete RDMA handshake t_d = time.perf_counter() endpoint.connect(peer_qp_info) + # Stash for one-sided ops on transports (TCP) where remote MR info + # rides on the endpoint_info JSON instead of a separate Redis record. + try: + conn.peer_endpoint_info = peer_qp_info + except Exception: + pass _tlog( f"{self._agent.alias}: [D] endpoint.connect({peer}) " f"+{(time.perf_counter() - t_d) * 1000:.3f}ms" diff --git a/dlslime/dlslime/peer_agent/_tcp_endpoint.py b/dlslime/dlslime/peer_agent/_tcp_endpoint.py new file mode 100644 index 00000000..483d75a6 --- /dev/null +++ b/dlslime/dlslime/peer_agent/_tcp_endpoint.py @@ -0,0 +1,89 @@ +"""Adapter that exposes a ``TcpEndpoint`` with the same surface PeerAgent +already calls on ``RDMAEndpoint`` (``send/recv/read/write``, plus +``endpoint_info``/``connect``/``register_memory_region``). The C++ +``TcpEndpoint`` uses ``async_*`` names and accepts an extra ``timeout_ms`` +kwarg; this thin shim collapses those differences without copying the +underlying object. + +``write_with_imm`` and ``imm_recv`` are RDMA-specific (write-with-immediate- +data) and have no TCP analogue; they raise ``NotImplementedError`` so the +peer agent's I/O facade can keep its uniform call shape and still surface a +clear error if a caller tries to use them on a TCP connection. +""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +from dlslime import TcpEndpoint + + +class TcpEndpointAdapter: + """Wrap a ``TcpEndpoint`` to match the ``RDMAEndpoint`` method surface.""" + + def __init__(self, endpoint: TcpEndpoint) -> None: + self._endpoint = endpoint + + @property + def raw(self) -> TcpEndpoint: + return self._endpoint + + def endpoint_info(self) -> Dict[str, Any]: + return self._endpoint.endpoint_info() + + def connect(self, peer_info: Dict[str, Any]) -> None: + self._endpoint.connect(peer_info) + + def is_connected(self) -> bool: + return self._endpoint.is_connected() + + def shutdown(self) -> None: + self._endpoint.shutdown() + + def mr_info(self) -> Dict[str, Any]: + return self._endpoint.mr_info() + + def register_memory_region( + self, name: str, ptr: int, offset: int, length: int + ) -> int: + return self._endpoint.register_memory_region(name, ptr, offset, length) + + def register_remote_memory_region(self, name: str, mr_info: Dict[str, Any]) -> int: + return self._endpoint.register_remote_memory_region(name, mr_info) + + # ------------------------------------------------------------------ + # Two-sided primitives. ``stream`` is an RDMA/CUDA concept and is + # accepted only for signature parity; TCP ignores it. + # ------------------------------------------------------------------ + def send(self, chunk, stream: Optional[Any] = None): + return self._endpoint.async_send(chunk) + + def recv(self, chunk, stream: Optional[Any] = None): + return self._endpoint.async_recv(chunk) + + # ------------------------------------------------------------------ + # One-sided primitives. + # ------------------------------------------------------------------ + def read(self, assign, stream: Optional[Any] = None): + return self._endpoint.async_read(self._normalize_assign(assign)) + + def write(self, assign, stream: Optional[Any] = None): + return self._endpoint.async_write(self._normalize_assign(assign)) + + def write_with_imm(self, assign, imm_data: int = 0, stream: Optional[Any] = None): + raise NotImplementedError( + "TCP transport does not support write_with_imm; this is RDMA-only." + ) + + def imm_recv(self, stream: Optional[Any] = None): + raise NotImplementedError( + "TCP transport does not support imm_recv; this is RDMA-only." + ) + + @staticmethod + def _normalize_assign(assign): + # The C++ async_read/async_write bindings expect a list of tuples. + # PeerAgent's RDMA path passes either a single tuple or a list. + if isinstance(assign, tuple): + return [assign] + return assign diff --git a/dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py b/dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py new file mode 100644 index 00000000..b89599ac --- /dev/null +++ b/dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +"""TCP one-sided read through PeerAgent — mirror of p2p_rdma_rc_read.py. + +Two PeerAgents in one process. Agent A reads bytes directly out of a +memory region published by agent B, no send() on the source side. + +Prerequisites: + 1. Start NanoCtrl: cd NanoCtrl && cargo run --release + 2. Redis must be reachable. + 3. dlslime built with BUILD_TCP=ON (default). + +Usage: + python p2p_tcp_rc_read_peer_agent.py + python p2p_tcp_rc_read_peer_agent.py --ctrl http://host:4479 +""" + +import argparse +import ctypes + +from dlslime import PeerAgent + + +def main(ctrl_url: str) -> None: + agent_a = PeerAgent(ctrl_url=ctrl_url, alias="tcp_read_a") + agent_b = PeerAgent(ctrl_url=ctrl_url, alias="tcp_read_b") + + buf_a = ctypes.create_string_buffer(64) + buf_b = ctypes.create_string_buffer(64) + addr_a = ctypes.addressof(buf_a) + addr_b = ctypes.addressof(buf_b) + + # Pre-fill B with the payload A is about to read out. + payload = b"one-sided-read-via-peer-agent" + ctypes.memmove(addr_b, payload, len(payload)) + + # Register MRs before connect_to so endpoint_info() carries them when + # the rendezvous fires — required for one-sided ops on TCP. + agent_a.register_memory_region("buf_a", addr_a, 0, 64) + agent_b.register_memory_region("buf_b", addr_b, 0, 64) + + conn_a = agent_a.connect_to("tcp_read_b", transport="tcp") + agent_b.connect_to("tcp_read_a", transport="tcp") + conn_a.wait() + print("Connected over TCP.") + + try: + ep_a = conn_a.endpoint + peer_info = conn_a.peer_endpoint_info + assert peer_info is not None + + h_local = ep_a.register_memory_region("buf_a_loc", addr_a, 0, 64) + h_remote = ep_a.register_remote_memory_region( + "buf_b_rem", peer_info["mr_info"]["buf_b"] + ) + + st = ep_a.read([(h_local, h_remote, 0, 0, len(payload))]).wait() + assert st == 0, f"read failed: {st}" + assert bytes(buf_a[: len(payload)]) == payload, "A did not receive the bytes" + print(f"A<-B one-sided read = {bytes(buf_a[: len(payload)])!r} ok") + finally: + agent_a.shutdown() + agent_b.shutdown() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="TCP one-sided read through PeerAgent") + parser.add_argument("--ctrl", default="http://127.0.0.1:4479", help="NanoCtrl URL") + main(parser.parse_args().ctrl) diff --git a/dlslime/examples/python/p2p_tcp_rc_send_recv.py b/dlslime/examples/python/p2p_tcp_rc_send_recv.py new file mode 100644 index 00000000..d1a375a0 --- /dev/null +++ b/dlslime/examples/python/p2p_tcp_rc_send_recv.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +"""Raw TcpEndpoint send/recv — no PeerAgent, no NanoCtrl, no Redis. + +The TCP transport is symmetric: both sides bind a port, exchange the +endpoint_info JSON out-of-band (here: just a Python dict in the same +process), and call connect() in their own thread before posting I/O. + +Usage: + python p2p_tcp_rc_send_recv.py +""" + +import ctypes +import threading +import time + +from dlslime import TcpEndpoint + + +PAYLOAD_AB = b"hello-from-a" +PAYLOAD_BA = b"hello-from-b" + + +def main() -> None: + buf_a = ctypes.create_string_buffer(64) + buf_b = ctypes.create_string_buffer(64) + addr_a = ctypes.addressof(buf_a) + addr_b = ctypes.addressof(buf_b) + + ep_a = TcpEndpoint(ip="0.0.0.0", port=0) + ep_b = TcpEndpoint(ip="0.0.0.0", port=0) + info_a = ep_a.endpoint_info() + info_b = ep_b.endpoint_info() + print(f"endpoint A bound at {info_a['host']}:{info_a['port']}") + print(f"endpoint B bound at {info_b['host']}:{info_b['port']}") + + err = [] + barrier = threading.Barrier(2) + + def run_a() -> None: + try: + barrier.wait(5) + ep_a.connect(info_b) + ctypes.memmove(addr_a, PAYLOAD_AB, len(PAYLOAD_AB)) + # Small stagger so B has its async_recv posted before our send. + time.sleep(0.5) + assert ep_a.async_send((addr_a, 0, len(PAYLOAD_AB))).wait() == 0 + assert ( + ep_a.async_recv((addr_a, len(PAYLOAD_AB), len(PAYLOAD_BA))).wait() == 0 + ) + received = bytes(buf_a[len(PAYLOAD_AB) : len(PAYLOAD_AB) + len(PAYLOAD_BA)]) + assert received == PAYLOAD_BA, received + except Exception as e: # noqa: BLE001 + err.append(("a", e)) + + def run_b() -> None: + try: + barrier.wait(5) + ep_b.connect(info_a) + assert ep_b.async_recv((addr_b, 0, len(PAYLOAD_AB))).wait() == 0 + received = bytes(buf_b[: len(PAYLOAD_AB)]) + assert received == PAYLOAD_AB, received + ctypes.memmove(addr_b, PAYLOAD_BA, len(PAYLOAD_BA)) + time.sleep(0.5) + assert ep_b.async_send((addr_b, 0, len(PAYLOAD_BA))).wait() == 0 + except Exception as e: # noqa: BLE001 + err.append(("b", e)) + + ta = threading.Thread(target=run_a, daemon=False) + tb = threading.Thread(target=run_b, daemon=False) + ta.start() + tb.start() + ta.join(timeout=30) + tb.join(timeout=30) + + try: + ep_a.shutdown() + except Exception: + pass + try: + ep_b.shutdown() + except Exception: + pass + + if err: + raise RuntimeError(f"raw TCP example failed: {err}") + print(f"A->B = {PAYLOAD_AB!r} ok") + print(f"B->A = {PAYLOAD_BA!r} ok") + + +if __name__ == "__main__": + main() diff --git a/dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py b/dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py new file mode 100644 index 00000000..2d6aa3f8 --- /dev/null +++ b/dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +"""TCP one-sided write through PeerAgent — mirror of p2p_rdma_rc_write.py. + +Two PeerAgents in one process. Agent A writes a payload directly into a +memory region published by agent B, no recv() on the receiver side. + +Prerequisites: + 1. Start NanoCtrl: cd NanoCtrl && cargo run --release + 2. Redis must be reachable. + 3. dlslime built with BUILD_TCP=ON (default). + +Usage: + python p2p_tcp_rc_write_peer_agent.py + python p2p_tcp_rc_write_peer_agent.py --ctrl http://host:4479 +""" + +import argparse +import ctypes +import time + +from dlslime import PeerAgent + + +def main(ctrl_url: str) -> None: + agent_a = PeerAgent(ctrl_url=ctrl_url, alias="tcp_write_a") + agent_b = PeerAgent(ctrl_url=ctrl_url, alias="tcp_write_b") + + buf_a = ctypes.create_string_buffer(64) + buf_b = ctypes.create_string_buffer(64) + addr_a = ctypes.addressof(buf_a) + addr_b = ctypes.addressof(buf_b) + + # Register MRs before connect_to so endpoint_info() carries them when + # the rendezvous fires — required for one-sided ops on TCP. + agent_a.register_memory_region("buf_a", addr_a, 0, 64) + agent_b.register_memory_region("buf_b", addr_b, 0, 64) + + conn_a = agent_a.connect_to("tcp_write_b", transport="tcp") + agent_b.connect_to("tcp_write_a", transport="tcp") + conn_a.wait() + print("Connected over TCP.") + + try: + ep_a = conn_a.endpoint + peer_info = conn_a.peer_endpoint_info + assert peer_info is not None + + h_local = ep_a.register_memory_region("buf_a_loc", addr_a, 0, 64) + h_remote = ep_a.register_remote_memory_region( + "buf_b_rem", peer_info["mr_info"]["buf_b"] + ) + + payload = b"one-sided-write-via-peer-agent" + ctypes.memmove(addr_a, payload, len(payload)) + + st = ep_a.write([(h_local, h_remote, 0, 0, len(payload))]).wait() + assert st == 0, f"write failed: {st}" + + # TCP write completes locally before the bytes land remotely; spin + # briefly until B observes them. + deadline = time.monotonic() + 5 + while time.monotonic() < deadline: + if bytes(buf_b[: len(payload)]) == payload: + break + time.sleep(0.05) + assert bytes(buf_b[: len(payload)]) == payload, "B did not see the write" + print(f"A->B one-sided write = {payload!r} ok") + finally: + agent_a.shutdown() + agent_b.shutdown() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="TCP one-sided write through PeerAgent" + ) + parser.add_argument("--ctrl", default="http://127.0.0.1:4479", help="NanoCtrl URL") + main(parser.parse_args().ctrl) diff --git a/dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py b/dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py new file mode 100644 index 00000000..6d62345a --- /dev/null +++ b/dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +"""TCP peer-agent loopback — two PeerAgents in one process over TCP. + +Prerequisites: + 1. Start NanoCtrl: cd NanoCtrl && cargo run --release + 2. Redis must be reachable (NanoCtrl returns its address automatically). + 3. dlslime built with BUILD_TCP=ON (the default). + +Usage: + python p2p_tcp_send_recv_peer_agent.py # default NanoCtrl + python p2p_tcp_send_recv_peer_agent.py --ctrl http://host:4479 +""" + +import argparse +import ctypes +import threading +import time + +from dlslime import PeerAgent + + +PAYLOAD_AB = b"hello-from-a" +PAYLOAD_BA = b"hello-from-b" + + +def main(ctrl_url: str) -> None: + agent_a = PeerAgent(ctrl_url=ctrl_url, alias="tcp_a") + agent_b = PeerAgent(ctrl_url=ctrl_url, alias="tcp_b") + + buf_a = ctypes.create_string_buffer(64) + buf_b = ctypes.create_string_buffer(64) + addr_a = ctypes.addressof(buf_a) + addr_b = ctypes.addressof(buf_b) + + # Register MRs before connect_to so endpoint_info() carries them when the + # rendezvous fires. Required for one-sided write/read below. + agent_a.register_memory_region("buf_a", addr_a, 0, 64) + agent_b.register_memory_region("buf_b", addr_b, 0, 64) + + conn_a = agent_a.connect_to("tcp_b", transport="tcp") + conn_b = agent_b.connect_to("tcp_a", transport="tcp") + conn_a.wait() + conn_b.wait() + print("Connected over TCP.") + + try: + # ── Two-sided send/recv: A sends, B receives. ───────────────── + ctypes.memmove(addr_a, PAYLOAD_AB, len(PAYLOAD_AB)) + + def post_recv() -> None: + fut = agent_b.recv("tcp_a", (addr_b, 0, len(PAYLOAD_AB))) + assert fut.wait() == 0 + assert bytes(buf_b[: len(PAYLOAD_AB)]) == PAYLOAD_AB + + t = threading.Thread(target=post_recv, daemon=True) + t.start() + send_fut = agent_a.send("tcp_b", (addr_a, 0, len(PAYLOAD_AB))) + assert send_fut.wait() == 0 + t.join(timeout=10) + print(f"A->B send/recv = {PAYLOAD_AB!r} ok") + + # ── One-sided write: A writes into B's MR via the TCP endpoint + # adapter. TCP MR handles are per-endpoint, so we go through + # conn_a.endpoint (a TcpEndpointAdapter) directly. + ep_a = conn_a.endpoint + peer_info = conn_a.peer_endpoint_info # set by mailbox post-handshake + assert peer_info is not None, "peer_endpoint_info missing after handshake" + h_local_a = ep_a.register_memory_region("buf_a_loc", addr_a, 0, 64) + h_remote_b = ep_a.register_remote_memory_region( + "buf_b_remote", peer_info["mr_info"]["buf_b"] + ) + + ctypes.memmove(buf_b, b"\x00" * 64, 64) # clear B + ctypes.memmove(addr_a, PAYLOAD_BA, len(PAYLOAD_BA)) + write_fut = ep_a.write([(h_local_a, h_remote_b, 0, 0, len(PAYLOAD_BA))]) + assert write_fut.wait() == 0 + for _ in range(40): + if bytes(buf_b[: len(PAYLOAD_BA)]) == PAYLOAD_BA: + break + time.sleep(0.05) + assert bytes(buf_b[: len(PAYLOAD_BA)]) == PAYLOAD_BA + print(f"A->B one-sided write = {PAYLOAD_BA!r} ok") + + print("\nAll TCP peer-agent ops passed.") + finally: + agent_a.shutdown() + agent_b.shutdown() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="TCP peer-agent send/recv + one-sided write loopback" + ) + parser.add_argument("--ctrl", default="http://127.0.0.1:4479", help="NanoCtrl URL") + main(parser.parse_args().ctrl) diff --git a/dlslime/tests/python/test_peer_agent_tcp.py b/dlslime/tests/python/test_peer_agent_tcp.py new file mode 100644 index 00000000..50e737a9 --- /dev/null +++ b/dlslime/tests/python/test_peer_agent_tcp.py @@ -0,0 +1,220 @@ +"""Integration tests for PeerAgent over the TCP transport. + +Skipped when NanoCtrl is unreachable so the suite stays green in +RDMA-only or no-control-plane CI environments. +""" + +from __future__ import annotations + +import ctypes +import os +import threading +import time +from typing import Tuple + +import dlslime + +import httpx +import pytest +from dlslime import PeerAgent + +CTRL_URL = os.environ.get("DLSLIME_TEST_CTRL_URL", "http://127.0.0.1:4479") + + +def _ctrl_alive() -> bool: + try: + with httpx.Client(timeout=1.0) as cli: + cli.get(CTRL_URL) + return True + except Exception: + return False + + +pytestmark = [ + pytest.mark.skipif( + not hasattr(dlslime, "TcpEndpoint"), + reason="dlslime built without BUILD_TCP", + ), + pytest.mark.skipif( + not _ctrl_alive(), + reason=f"NanoCtrl not reachable at {CTRL_URL}", + ), +] + + +# ── two-thread barrier harness (adapted from tests/python/test_tcp.py) ── + + +def _sync_run(name: str, fn_a, fn_b, timeout: float = 60.0) -> None: + err = [] + barrier = threading.Barrier(2) + + def wrap(fn): + try: + barrier.wait(10) + fn() + except Exception as e: # noqa: BLE001 + err.append(e) + + ta = threading.Thread(target=wrap, args=(fn_a,), daemon=False) + tb = threading.Thread(target=wrap, args=(fn_b,), daemon=False) + ta.start() + tb.start() + ta.join(timeout) + tb.join(timeout) + if ta.is_alive() or tb.is_alive(): + raise RuntimeError(f"{name}: {timeout}s timeout") + if err: + raise err[0] + + +# ── fixture: two PeerAgents connected over TCP ── + + +@pytest.fixture() +def tcp_pair(request) -> Tuple[PeerAgent, PeerAgent]: + # Unique aliases per test so a stale agent record from an earlier flaky + # run doesn't collide. + suffix = request.node.name.replace("[", "_").replace("]", "") + alias_a = f"tcp_a_{suffix}" + alias_b = f"tcp_b_{suffix}" + agent_a = PeerAgent(ctrl_url=CTRL_URL, alias=alias_a) + agent_b = PeerAgent(ctrl_url=CTRL_URL, alias=alias_b) + + yield agent_a, agent_b + + try: + agent_a.shutdown() + except Exception: + pass + try: + agent_b.shutdown() + except Exception: + pass + + +def _connect_pair(agent_a: PeerAgent, agent_b: PeerAgent) -> Tuple[object, object]: + holder = {} + + def run_a(): + conn = agent_a.connect_to(agent_b.alias, transport="tcp") + conn.wait(timeout=30) + holder["a"] = conn + + def run_b(): + conn = agent_b.connect_to(agent_a.alias, transport="tcp") + conn.wait(timeout=30) + holder["b"] = conn + + _sync_run("connect_pair", run_a, run_b, timeout=60) + return holder["a"], holder["b"] + + +# ── tests ── + + +def test_tcp_send_recv_roundtrip(tcp_pair): + agent_a, agent_b = tcp_pair + conn_a, conn_b = _connect_pair(agent_a, agent_b) + + buf_a = ctypes.create_string_buffer(32) + buf_b = ctypes.create_string_buffer(32) + addr_a = ctypes.addressof(buf_a) + addr_b = ctypes.addressof(buf_b) + + ctypes.memmove(addr_a, b"hello", 5) + + def run_a(): + st = agent_a.send(agent_b.alias, (addr_a, 0, 5)).wait() + assert st == 0 + st = agent_a.recv(agent_b.alias, (addr_a, 5, 5)).wait() + assert st == 0 + assert bytes(buf_a[5:10]) == b"world" + + def run_b(): + st = agent_b.recv(agent_a.alias, (addr_b, 0, 5)).wait() + assert st == 0 + assert bytes(buf_b[:5]) == b"hello" + ctypes.memmove(addr_b, b"world", 5) + st = agent_b.send(agent_a.alias, (addr_b, 0, 5)).wait() + assert st == 0 + + _sync_run("send_recv_roundtrip", run_a, run_b, timeout=60) + + +def test_tcp_one_sided_write(tcp_pair): + agent_a, agent_b = tcp_pair + + buf_a = ctypes.create_string_buffer(64) + buf_b = ctypes.create_string_buffer(64) + addr_a = ctypes.addressof(buf_a) + addr_b = ctypes.addressof(buf_b) + + # Register before connect so endpoint_info()'s mr_info carries the MRs. + agent_a.register_memory_region("buf_a", addr_a, 0, 64) + agent_b.register_memory_region("buf_b", addr_b, 0, 64) + + conn_a, _conn_b = _connect_pair(agent_a, agent_b) + + payload = b"one-sided-write!" + ctypes.memmove(addr_a, payload, len(payload)) + + ep_a = conn_a.endpoint + peer_info = conn_a.peer_endpoint_info + assert peer_info is not None + assert "buf_b" in peer_info.get("mr_info", {}) + + h_loc = ep_a.register_memory_region("buf_a_loc", addr_a, 0, 64) + h_rem = ep_a.register_remote_memory_region( + "buf_b_rem", peer_info["mr_info"]["buf_b"] + ) + + st = ep_a.write([(h_loc, h_rem, 0, 0, len(payload))]).wait() + assert st == 0 + + deadline = time.monotonic() + 5 + while time.monotonic() < deadline: + if bytes(buf_b[: len(payload)]) == payload: + break + time.sleep(0.05) + assert bytes(buf_b[: len(payload)]) == payload + + +def test_tcp_one_sided_read(tcp_pair): + agent_a, agent_b = tcp_pair + + buf_a = ctypes.create_string_buffer(64) + buf_b = ctypes.create_string_buffer(64) + addr_a = ctypes.addressof(buf_a) + addr_b = ctypes.addressof(buf_b) + + payload = b"one-sided-read" + ctypes.memmove(addr_b, payload, len(payload)) + + agent_a.register_memory_region("buf_a", addr_a, 0, 64) + agent_b.register_memory_region("buf_b", addr_b, 0, 64) + + conn_a, _conn_b = _connect_pair(agent_a, agent_b) + + ep_a = conn_a.endpoint + peer_info = conn_a.peer_endpoint_info + assert peer_info is not None + h_loc = ep_a.register_memory_region("buf_a_loc", addr_a, 0, 64) + h_rem = ep_a.register_remote_memory_region( + "buf_b_rem", peer_info["mr_info"]["buf_b"] + ) + + st = ep_a.read([(h_loc, h_rem, 0, 0, len(payload))]).wait() + assert st == 0 + assert bytes(buf_a[: len(payload)]) == payload + + +def test_tcp_imm_ops_raise(tcp_pair): + agent_a, agent_b = tcp_pair + conn_a, _conn_b = _connect_pair(agent_a, agent_b) + + ep_a = conn_a.endpoint + with pytest.raises(NotImplementedError): + ep_a.write_with_imm([(0, 0, 0, 0, 0)], imm_data=42) + with pytest.raises(NotImplementedError): + ep_a.imm_recv() From a485b086e233a69e57032b44c945a8511a688f48 Mon Sep 17 00:00:00 2001 From: JimyMa Date: Wed, 27 May 2026 18:07:27 +0000 Subject: [PATCH 2/5] update docs --- README.md | 34 ++++++- docs/mkdocs.yml | 2 + docs/src/api_reference.md | 1 + docs/src/guide/endpoint-api.md | 5 ++ docs/src/guide/index.md | 1 + docs/src/guide/peeragent-api.md | 31 +++++-- docs/src/guide/tcp-transport.md | 152 ++++++++++++++++++++++++++++++++ 7 files changed, 216 insertions(+), 10 deletions(-) create mode 100644 docs/src/guide/tcp-transport.md diff --git a/README.md b/README.md index b6ba912e..b6f9d590 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ DLSlime is a PeerAgent-centered communication and microservice toolkit for distributed AI systems. PeerAgent is the runtime hub: application services such as SlimeRPC and DLSlimeCache build on it, NanoCtrl supplies service governance and coordination metadata around it, and endpoint APIs below it drive -heterogeneous transports such as RDMA, NVLink, and Ascend Direct. +heterogeneous transports such as RDMA, TCP, NVLink, and Ascend Direct. DLSlime is designed to be adopted one layer at a time. Applications can start with direct endpoints, add PeerAgent coordination, use NanoCtrl for governance, @@ -96,6 +96,36 @@ python dlslime/examples/python/p2p_ascend_read.py Ascend Direct setup details live in [docs/huawei_ascend/README.md](docs/huawei_ascend/README.md). +### TCP Fallback Transport + +Use TCP when the hosts have no RDMA NICs or when a peer connection has to +traverse a network without RDMA capability. The TCP transport exposes the same +primitives — `endpoint_info` / `connect`, two-sided `send` / `recv`, one-sided +`read` / `write`, and named memory regions — and plugs into PeerAgent through +the same control plane via `connect_to(transport="tcp")`. Immediate-data ops +(`write_with_imm`, `imm_recv`) are RDMA-only and raise `NotImplementedError` +on TCP. + +`BUILD_TCP` is `ON` by default. + +Raw `TcpEndpoint`, no NanoCtrl required: + +```bash +python dlslime/examples/python/p2p_tcp_rc_send_recv.py +``` + +PeerAgent over TCP: + +```bash +nanoctrl start +python dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py +python dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py +python dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py +``` + +See [docs/src/guide/tcp-transport.md](docs/src/guide/tcp-transport.md) for the +full surface, one-sided I/O setup, and the test reference. + ### PeerAgent-to-PeerAgent Access Use PeerAgent when the application wants peer-to-peer data movement without @@ -241,6 +271,7 @@ cmake --build build | Flag | Default | Description | | --------------------- | ---------------------------------------: | ------------------------------------------------------ | | `BUILD_RDMA` | `ON` | Build the RDMA transfer engine | +| `BUILD_TCP` | `ON` | Build the TCP transfer engine (fallback transport) | | `BUILD_PYTHON` | `OFF` in CMake, `ON` in `pyproject.toml` | Build Python bindings | | `BUILD_NVLINK` | `OFF` | Build the NVLink transfer engine | | `BUILD_ASCEND_DIRECT` | `OFF` | Build Ascend Direct transport | @@ -289,6 +320,7 @@ scripts/ Repo-wide automation (release.sh, ...) - [Documentation index](docs/README.md) - [Roadmap](docs/roadmap.md) +- [TCP transport guide](docs/src/guide/tcp-transport.md) - [DLSlimeCache design](docs/design/dlslime-cache.md) - [Endpoint ownership model](docs/endpoint-ownership-model.md) - [Endpoint DeviceSignal refactor](docs/endpoint-device-signal-refactor.md) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index a3f4a441..2b62fb86 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -84,6 +84,7 @@ plugins: Overview: 总览 Endpoint API: Endpoint API PeerAgent API: PeerAgent API + TCP Transport: TCP 传输 SlimeRPC: SlimeRPC Deployment: 部署 DLSlimeCache Service: DLSlimeCache 服务 @@ -119,6 +120,7 @@ nav: - Overview: guide/index.md - Endpoint API: guide/endpoint-api.md - PeerAgent API: guide/peeragent-api.md + - TCP Transport: guide/tcp-transport.md - SlimeRPC: guide/slimerpc.md - Deployment: guide/deployment.md - DLSlimeCache Service: guide/dlslime-cache.md diff --git a/docs/src/api_reference.md b/docs/src/api_reference.md index 6866f5bc..292a8e48 100644 --- a/docs/src/api_reference.md +++ b/docs/src/api_reference.md @@ -32,6 +32,7 @@ from dlslime.logging import get_logger, set_log_level - [Endpoint API](guide/endpoint-api.md) - [PeerAgent API](guide/peeragent-api.md) +- [TCP Transport](guide/tcp-transport.md) - [SlimeRPC](guide/slimerpc.md) - [DLSlimeCache Service](guide/dlslime-cache.md) - [Versions](versions.md) diff --git a/docs/src/guide/endpoint-api.md b/docs/src/guide/endpoint-api.md index d3cee6c1..b0dac935 100644 --- a/docs/src/guide/endpoint-api.md +++ b/docs/src/guide/endpoint-api.md @@ -8,6 +8,11 @@ For most service deployments, prefer [PeerAgent API](peeragent-api.md). Endpoint API is best for transport bring-up, microbenchmarks, explicit two-process tests, and systems that already have their own control plane. +DLSlime ships an `RDMAEndpoint` (covered below) and a `TcpEndpoint` for hosts +without RDMA. Both expose `endpoint_info` / `connect`, `register_memory_region`, +and the same async send/recv/read/write futures. See [TCP Transport](tcp-transport.md) +for the TCP-specific surface. + ## Main Types | Type | Purpose | diff --git a/docs/src/guide/index.md b/docs/src/guide/index.md index 900dcc44..eae6d03e 100644 --- a/docs/src/guide/index.md +++ b/docs/src/guide/index.md @@ -8,6 +8,7 @@ than a one-off transfer library. - [Deployment](deployment.md): run NanoCtrl, Redis, DLSlimeCache, and examples in a predictable layout. - [Endpoint API](endpoint-api.md): use the low-level RDMA endpoint surface directly. - [PeerAgent API](peeragent-api.md): use control-plane discovery, named memory regions, and service-friendly I/O. +- [TCP Transport](tcp-transport.md): use TCP as a fallback transport, with raw `TcpEndpoint` or PeerAgent's `transport="tcp"`. - [SlimeRPC](slimerpc.md): define Python services and call them through PeerAgent-backed RDMA RPC. - [DLSlimeCache Service](dlslime-cache.md): service lifecycle and client flow. - [SlimeRPC Benchmark](benchmark-rpc.md): benchmark SlimeRPC against Ray. diff --git a/docs/src/guide/peeragent-api.md b/docs/src/guide/peeragent-api.md index 48f8aa49..31dbc960 100644 --- a/docs/src/guide/peeragent-api.md +++ b/docs/src/guide/peeragent-api.md @@ -66,15 +66,28 @@ conn.wait(timeout=60) `connect_to` returns a `PeerConnection` handle. -| `PeerConnection` member | Meaning | -| -------------------------- | ---------------------------------------------------------------- | -| `wait(timeout=60)` | Block until the directed connection is ready. | -| `is_connected()` | Return whether the local connection is established. | -| `conn_id` | Stable directed connection id. | -| `peer_alias` | Remote PeerAgent alias. | -| `local_nic` / `remote_nic` | Selected local and remote NICs. | -| `state` | Connection state such as `connecting`, `connected`, or `failed`. | -| `endpoint` | Underlying `RDMAEndpoint` once created. | +| `PeerConnection` member | Meaning | +| -------------------------- | -------------------------------------------------------------------------- | +| `wait(timeout=60)` | Block until the directed connection is ready. | +| `is_connected()` | Return whether the local connection is established. | +| `conn_id` | Stable directed connection id. | +| `peer_alias` | Remote PeerAgent alias. | +| `local_nic` / `remote_nic` | Selected local and remote NICs. | +| `state` | Connection state such as `connecting`, `connected`, or `failed`. | +| `endpoint` | Underlying `RDMAEndpoint` (or `TcpEndpointAdapter`) once created. | +| `peer_endpoint_info` | Peer's `endpoint_info` dict captured during handshake (TCP one-sided ops). | + +### Selecting the transport + +`connect_to(transport=...)` picks the underlying transport. RDMA is the +default; pass `transport="tcp"` to bind a `TcpEndpoint` instead. See +[TCP Transport](tcp-transport.md) for the full TCP flow, the +`local_host`/`local_port` kwargs, and one-sided I/O over TCP. + +```python +conn = agent.connect_to("worker-b", transport="tcp") +conn.wait() +``` For bidirectional flows, both agents normally call `connect_to` and wait on their local handle: diff --git a/docs/src/guide/tcp-transport.md b/docs/src/guide/tcp-transport.md new file mode 100644 index 00000000..86577a6e --- /dev/null +++ b/docs/src/guide/tcp-transport.md @@ -0,0 +1,152 @@ +# TCP Transport + +DLSlime ships a TCP transfer engine alongside RDMA, NVLink, and Ascend Direct. +TCP is the right transport when: + +- The hosts have no RDMA NICs (laptops, dev boxes, mixed clusters). +- You want a quick smoke test before a real RDMA deployment. +- A connection has to traverse a network without RDMA capability. + +It exposes the same primitives as RDMA — `endpoint_info` / `connect`, two-sided +`async_send` / `async_recv`, one-sided `async_read` / `async_write`, and named +memory regions — and plugs into PeerAgent through the same control plane. +Immediate-data ops (`write_with_imm`, `imm_recv`) are RDMA-only and raise +`NotImplementedError` on TCP. + +TCP is enabled by default at build time (`BUILD_TCP=ON`). + +## Raw `TcpEndpoint` + +Use `TcpEndpoint` when the application already owns connection setup, metadata +exchange, and memory lifetime — the same scope as raw `RDMAEndpoint`. No +NanoCtrl or Redis needed. + +```python +import ctypes +from dlslime import TcpEndpoint + +ep_a = TcpEndpoint(ip="0.0.0.0", port=0) # 0 = OS-assigned +ep_b = TcpEndpoint(ip="0.0.0.0", port=0) +info_a = ep_a.endpoint_info() # {"host", "port", "mr_info"} +info_b = ep_b.endpoint_info() + +# Both sides bind a port and connect; the handshake is symmetric. +ep_a.connect(info_b) +ep_b.connect(info_a) + +buf = ctypes.create_string_buffer(64) +fut = ep_a.async_send((ctypes.addressof(buf), 0, 5)) +assert fut.wait() == 0 +``` + +`async_send`, `async_recv`, `async_read`, and `async_write` return future +objects with a `wait()` method. `wait_for(seconds)` returns `None` on timeout +and the status code otherwise. + +For one-sided ops, register both sides' MRs **before** calling `connect()` so +`endpoint_info()` carries them across the handshake: + +```python +h_local = ep_a.register_memory_region("a", addr_a, 0, 256) +h_local_b = ep_b.register_memory_region("b", addr_b, 0, 256) +info_b = ep_b.endpoint_info() # carries mr_info["b"] +h_remote = ep_a.register_remote_memory_region("rb", info_b["mr_info"]["b"]) +ep_a.connect(info_b) +ep_b.connect(ep_a.endpoint_info()) +ep_a.async_write([(h_local, h_remote, 0, 0, 12)]).wait() +``` + +Examples: + +- `dlslime/examples/python/p2p_tcp_rc_send_recv.py` — raw two-sided send/recv. +- `dlslime/tests/python/test_tcp.py` — exhaustive raw-endpoint reference. + +## TCP through PeerAgent + +PeerAgent supports TCP via a `transport="tcp"` kwarg on `connect_to`. RDMA +remains the default; existing RDMA callers are unchanged. + +```python +from dlslime import PeerAgent + +agent_a = PeerAgent(ctrl_url="http://127.0.0.1:4479", alias="tcp_a") +agent_b = PeerAgent(ctrl_url="http://127.0.0.1:4479", alias="tcp_b") + +conn_a = agent_a.connect_to("tcp_b", transport="tcp") +conn_b = agent_b.connect_to("tcp_a", transport="tcp") +conn_a.wait() +conn_b.wait() + +# Two-sided I/O works through the same agent facade as RDMA: +agent_a.send("tcp_b", (addr_a, 0, 5)).wait() +agent_b.recv("tcp_a", (addr_b, 0, 5)).wait() +``` + +`connect_to(transport="tcp")` accepts two extra kwargs: + +| Kwarg | Default | Meaning | +| ------------ | --------- | ----------------------------------- | +| `local_host` | `0.0.0.0` | Local bind IP for the TCP endpoint. | +| `local_port` | `0` | Local bind port (0 = OS-assigned). | + +`ib_port`, `qp_num`, `local_device`, and `peer_device` are accepted for +signature parity with the RDMA path but are ignored. + +### One-sided read / write + +Register memory regions **before** `connect_to`, so the rendezvous picks them +up when it captures `endpoint_info()`: + +```python +agent_a.register_memory_region("buf_a", addr_a, 0, 64) +agent_b.register_memory_region("buf_b", addr_b, 0, 64) + +conn_a = agent_a.connect_to("tcp_b", transport="tcp") +agent_b.connect_to("tcp_a", transport="tcp") +conn_a.wait() + +ep_a = conn_a.endpoint # TcpEndpointAdapter +peer_info = conn_a.peer_endpoint_info # set by mailbox post-handshake +h_local = ep_a.register_memory_region("buf_a_loc", addr_a, 0, 64) +h_remote = ep_a.register_remote_memory_region( + "buf_b_rem", peer_info["mr_info"]["buf_b"] +) +ep_a.write([(h_local, h_remote, 0, 0, 32)]).wait() +``` + +`PeerConnection.peer_endpoint_info` carries the peer's `endpoint_info` dict +once the handshake completes — including its `mr_info` — so callers can +resolve remote MR handles for one-sided ops without going through Redis. + +### What does *not* work on TCP + +| Op | Behavior on TCP | +| ------------------------------------------------------------------- | ---------------------------------------------------------------------- | +| `agent.write_with_imm(...)` | Raises `NotImplementedError`. | +| `agent.imm_recv(...)` | Raises `NotImplementedError`. | +| Named-MR auto-resolution (`agent.read("peer", [("mr_name", ...)])`) | Use the `(local_handle, remote_handle, ...)` form via `conn.endpoint`. | + +The named-MR auto-resolution path uses Redis MR records that are RDMA-specific +(`addr` / `rkey` fields). For TCP, register on `conn.endpoint` and pass +explicit local and remote handles — see the example above. + +## Examples + +PeerAgent + TCP: + +- `dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py` — combined demo. +- `dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py` — one-sided write. +- `dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py` — one-sided read. + +Raw `TcpEndpoint`: + +- `dlslime/examples/python/p2p_tcp_rc_send_recv.py` +- `dlslime/tests/python/test_tcp.py` — also exercises read/write/timeout. + +## Tests + +- `dlslime/tests/python/test_peer_agent_tcp.py` — PeerAgent send/recv, + one-sided read, one-sided write, and the `NotImplementedError` contract for + immediate-data ops. Skipped when NanoCtrl is not reachable at + `DLSLIME_TEST_CTRL_URL` (default `http://127.0.0.1:4479`) or the binary was + built without `BUILD_TCP`. From dcc3cf9f25d8eb01f8d5bbebe1036d6652c46442 Mon Sep 17 00:00:00 2001 From: JimyMa Date: Wed, 27 May 2026 18:24:48 +0000 Subject: [PATCH 3/5] identity interface of tcp and RDMA --- dlslime/bench/python/tcp_bench_spmd.py | 2 +- .../dlslime/csrc/engine/tcp/tcp_endpoint.cpp | 39 ++++---- .../dlslime/csrc/engine/tcp/tcp_endpoint.h | 35 ++++++-- dlslime/dlslime/csrc/python/bind.cpp | 88 +++++++++++++----- dlslime/dlslime/peer_agent/_agent.py | 15 ++-- dlslime/dlslime/peer_agent/_tcp_endpoint.py | 89 ------------------- .../examples/python/p2p_tcp_rc_send_recv.py | 10 +-- .../python/p2p_tcp_send_recv_peer_agent.py | 6 +- dlslime/tests/python/test_tcp.py | 54 +++++------ docs/src/guide/peeragent-api.md | 2 +- docs/src/guide/tcp-transport.md | 20 ++--- 11 files changed, 165 insertions(+), 195 deletions(-) delete mode 100644 dlslime/dlslime/peer_agent/_tcp_endpoint.py diff --git a/dlslime/bench/python/tcp_bench_spmd.py b/dlslime/bench/python/tcp_bench_spmd.py index 63c695ff..76ae0568 100755 --- a/dlslime/bench/python/tcp_bench_spmd.py +++ b/dlslime/bench/python/tcp_bench_spmd.py @@ -197,7 +197,7 @@ def rank_0_print(*args): def transfer_batch_concurrency_dlslime( role, opcode, local_handle, remote_handle, tensor, batch_size, num_concurrency ): - fn = tcp_endpoint.async_read if opcode == "read" else tcp_endpoint.async_write + fn = tcp_endpoint.read if opcode == "read" else tcp_endpoint.write if role == "initiator": slots = [] for concurrent_id in range(num_concurrency): diff --git a/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.cpp b/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.cpp index b91136f5..df18ea4e 100644 --- a/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.cpp +++ b/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.cpp @@ -170,10 +170,10 @@ int32_t TcpEndpoint::register_remote_memory_region(const std::string& name, cons return remote_pool_->register_remote_memory_region(mr_info, name); } -// ── async_send ────────────────────────────────────────── +// ── send ────────────────────────────────────────── // chunk_tuple_t = (src_ptr, offset, length) — raw pointers, no MR lookup. -std::shared_ptr TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/) +std::shared_ptr TcpEndpoint::send(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/) { uintptr_t src = std::get<0>(chunk) + std::get<1>(chunk); size_t len = std::get<2>(chunk); @@ -198,7 +198,7 @@ std::shared_ptr TcpEndpoint::async_send(const chunk_tuple_t& chun auto* buf = new char[len]; auto cu_err = cudaMemcpy(buf, send_ptr, len, cudaMemcpyDeviceToHost); if (cu_err != cudaSuccess) { - SLIME_LOG_ERROR("async_send cudaMemcpy D2H: ", cudaGetErrorString(cu_err)); + SLIME_LOG_ERROR("send cudaMemcpy D2H: ", cudaGetErrorString(cu_err)); delete[] buf; op->completion_status.store(TCP_FAILED, std::memory_order_release); op->signal->force_complete(); @@ -213,7 +213,7 @@ std::shared_ptr TcpEndpoint::async_send(const chunk_tuple_t& chun auto session = std::make_shared( std::move(conn->socket), [op, conn, &pool, send_ptr, is_cuda](asio::error_code ec) { if (ec) - SLIME_LOG_WARN("async_send: ", ec.message()); + SLIME_LOG_WARN("send: ", ec.message()); op->completion_status.store(ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release); if (op->signal) op->signal->set_comm_done(0); @@ -228,10 +228,10 @@ std::shared_ptr TcpEndpoint::async_send(const chunk_tuple_t& chun return std::make_shared(op); } -// ── async_recv ────────────────────────────────────────── +// ── recv ────────────────────────────────────────── // chunk_tuple_t = (dst_ptr, offset, length) — raw pointers, no MR lookup. -std::shared_ptr TcpEndpoint::async_recv(const chunk_tuple_t& chunk, bool exact_size) +std::shared_ptr TcpEndpoint::recv(const chunk_tuple_t& chunk, bool exact_size) { auto op = TcpOpState::create(); op->signal->reset_all(); @@ -258,15 +258,14 @@ std::shared_ptr TcpEndpoint::async_recv(const chunk_tuple_t& chun return std::make_shared(op); } -// ── async_read ────────────────────────────────────────── +// ── read ────────────────────────────────────────── // Each assign creates an independent ClientSession; all share one OpState. // Future.wait() blocks until every session has signalled its bit. -std::shared_ptr TcpEndpoint::async_read(const std::vector& assign, - int64_t /*timeout_ms*/) +std::shared_ptr TcpEndpoint::read(const std::vector& assign, int64_t /*timeout_ms*/) { if (assign.empty()) - throw std::runtime_error("TcpEndpoint::async_read: empty assignment"); + throw std::runtime_error("TcpEndpoint::read: empty assignment"); size_t N = assign.size(); auto op = TcpOpState::create(); @@ -288,7 +287,7 @@ std::shared_ptr TcpEndpoint::async_read(const std::vectorget_mr_fast(local_h); auto remote = remote_pool_->get_remote_mr_fast(remote_h); if (local.length == 0 || remote.length == 0) - throw std::runtime_error("TcpEndpoint::async_read: invalid MR handle"); + throw std::runtime_error("TcpEndpoint::read: invalid MR handle"); uintptr_t local_dst = local.addr + local_off; SessionHeader hdr{length, remote.addr + remote_off, OP_READ}; @@ -313,14 +312,14 @@ std::shared_ptr TcpEndpoint::async_read(const std::vectorsocket), [op, conn, i, &pool, read_dst, is_cuda, real_dst = local_dst, len = length](asio::error_code ec) { if (ec) { - SLIME_LOG_WARN("async_read session ", i, ": ", ec.message()); + SLIME_LOG_WARN("read session ", i, ": ", ec.message()); op->completion_status.store(TCP_FAILED, std::memory_order_release); } #ifdef USE_CUDA if (!ec && is_cuda) { auto cu_err = cudaMemcpy(reinterpret_cast(real_dst), read_dst, len, cudaMemcpyHostToDevice); if (cu_err != cudaSuccess) { - SLIME_LOG_ERROR("async_read cudaMemcpy H2D: ", cudaGetErrorString(cu_err)); + SLIME_LOG_ERROR("read cudaMemcpy H2D: ", cudaGetErrorString(cu_err)); op->completion_status.store(TCP_FAILED, std::memory_order_release); } } @@ -337,14 +336,14 @@ std::shared_ptr TcpEndpoint::async_read(const std::vector(op); } -// ── async_write ───────────────────────────────────────── +// ── write ───────────────────────────────────────── // Each assign creates an independent ClientSession; all share one OpState. -std::shared_ptr TcpEndpoint::async_write(const std::vector& assign, - int64_t /*timeout_ms*/) +std::shared_ptr TcpEndpoint::write(const std::vector& assign, + int64_t /*timeout_ms*/) { if (assign.empty()) - throw std::runtime_error("TcpEndpoint::async_write: empty assignment"); + throw std::runtime_error("TcpEndpoint::write: empty assignment"); size_t N = assign.size(); auto op = TcpOpState::create(); @@ -366,7 +365,7 @@ std::shared_ptr TcpEndpoint::async_write(const std::vectorget_mr_fast(local_h); auto remote = remote_pool_->get_remote_mr_fast(remote_h); if (local.length == 0 || remote.length == 0) - throw std::runtime_error("TcpEndpoint::async_write: invalid MR handle"); + throw std::runtime_error("TcpEndpoint::write: invalid MR handle"); uintptr_t src = local.addr + local_off; SessionHeader hdr{length, remote.addr + remote_off, OP_WRITE}; @@ -385,7 +384,7 @@ std::shared_ptr TcpEndpoint::async_write(const std::vectorcompletion_status.store(TCP_FAILED, std::memory_order_release); op->signal->force_complete(); @@ -400,7 +399,7 @@ std::shared_ptr TcpEndpoint::async_write(const std::vector( std::move(conn->socket), [op, conn, i, &pool, send_ptr, is_cuda](asio::error_code ec) { if (ec) { - SLIME_LOG_WARN("async_write session ", i, ": ", ec.message()); + SLIME_LOG_WARN("write session ", i, ": ", ec.message()); op->completion_status.store(TCP_FAILED, std::memory_order_release); } if (op->signal) diff --git a/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.h b/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.h index 928e5601..200a8bd7 100644 --- a/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.h +++ b/dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -20,6 +21,14 @@ #include "tcp_session.h" namespace dlslime { + +// Thrown by transport operations that exist in the abstract endpoint +// surface (for parity with RDMAEndpoint) but have no analogue on this +// transport. Translated to Python ``NotImplementedError`` by the bindings. +struct not_implemented: public std::logic_error { + using std::logic_error::logic_error; +}; + namespace tcp { using json = nlohmann::json; @@ -48,16 +57,30 @@ class TcpEndpoint: public std::enable_shared_from_this { json mr_info() const; // ── Async I/O (all return Future immediately; I/O runs on io_context thread) ── + // Method names match RDMAEndpoint so PeerAgent can hold either type + // without an adapter. ``stream`` exists in the bindings for signature + // parity with RDMA and is ignored on TCP. - std::shared_ptr async_send(const chunk_tuple_t& chunk, int64_t timeout_ms = kDefaultTimeoutMs); + std::shared_ptr send(const chunk_tuple_t& chunk, int64_t timeout_ms = kDefaultTimeoutMs); - std::shared_ptr async_recv(const chunk_tuple_t& chunk, bool exact_size = false); + std::shared_ptr recv(const chunk_tuple_t& chunk, bool exact_size = false); - std::shared_ptr async_read(const std::vector& assign, - int64_t timeout_ms = kDefaultTimeoutMs); + std::shared_ptr read(const std::vector& assign, + int64_t timeout_ms = kDefaultTimeoutMs); - std::shared_ptr async_write(const std::vector& assign, - int64_t timeout_ms = kDefaultTimeoutMs); + std::shared_ptr write(const std::vector& assign, + int64_t timeout_ms = kDefaultTimeoutMs); + + // ── RDMA-only ops; provided for interface parity, never succeed on TCP. ── + [[noreturn]] void write_with_imm(const std::vector& /*assign*/, uint32_t /*imm_data*/ = 0) + { + throw not_implemented("TCP transport does not support write_with_imm; this is RDMA-only."); + } + + [[noreturn]] void imm_recv() + { + throw not_implemented("TCP transport does not support imm_recv; this is RDMA-only."); + } // ── Accessors ─────────────────────────────────────── void setId(int64_t id) diff --git a/dlslime/dlslime/csrc/python/bind.cpp b/dlslime/dlslime/csrc/python/bind.cpp index 9c52988f..d4b1d160 100644 --- a/dlslime/dlslime/csrc/python/bind.cpp +++ b/dlslime/dlslime/csrc/python/bind.cpp @@ -617,28 +617,72 @@ PYBIND11_MODULE(_slime_c, m) py::arg("name"), py::arg("mr_info"), py::call_guard()) - .def("async_send", - py::overload_cast(&dlslime::tcp::TcpEndpoint::async_send), - py::arg("chunk"), - py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, - py::call_guard()) - .def("async_recv", - &dlslime::tcp::TcpEndpoint::async_recv, - py::arg("chunk"), - py::arg("exact_size") = false, - py::call_guard()) - .def("async_read", - py::overload_cast&, int64_t>( - &dlslime::tcp::TcpEndpoint::async_read), - py::arg("assign"), - py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, - py::call_guard()) - .def("async_write", - py::overload_cast&, int64_t>( - &dlslime::tcp::TcpEndpoint::async_write), - py::arg("assign"), - py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, - py::call_guard()); + .def( + "send", + [](dlslime::tcp::TcpEndpoint& self, + const dlslime::chunk_tuple_t& chunk, + py::object /*stream*/, + int64_t timeout_ms) { return self.send(chunk, timeout_ms); }, + py::arg("chunk"), + py::arg("stream") = py::none(), + py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, + py::call_guard()) + .def( + "recv", + [](dlslime::tcp::TcpEndpoint& self, + const dlslime::chunk_tuple_t& chunk, + py::object /*stream*/, + bool exact_size) { return self.recv(chunk, exact_size); }, + py::arg("chunk"), + py::arg("stream") = py::none(), + py::arg("exact_size") = false, + py::call_guard()) + .def( + "read", + [](dlslime::tcp::TcpEndpoint& self, + const std::vector& assign, + py::object /*stream*/, + int64_t timeout_ms) { return self.read(assign, timeout_ms); }, + py::arg("assign"), + py::arg("stream") = py::none(), + py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, + py::call_guard()) + .def( + "write", + [](dlslime::tcp::TcpEndpoint& self, + const std::vector& assign, + py::object /*stream*/, + int64_t timeout_ms) { return self.write(assign, timeout_ms); }, + py::arg("assign"), + py::arg("stream") = py::none(), + py::arg("timeout_ms") = dlslime::tcp::TcpEndpoint::kDefaultTimeoutMs, + py::call_guard()) + .def( + "write_with_imm", + [](dlslime::tcp::TcpEndpoint& self, + const std::vector& assign, + uint32_t imm_data, + py::object /*stream*/) { self.write_with_imm(assign, imm_data); }, + py::arg("assign"), + py::arg("imm_data") = 0u, + py::arg("stream") = py::none()) + .def( + "imm_recv", + [](dlslime::tcp::TcpEndpoint& self, py::object /*stream*/) { self.imm_recv(); }, + py::arg("stream") = py::none()); + + // Translate dlslime::not_implemented thrown by transports without a + // given op (e.g. TcpEndpoint::write_with_imm) to Python's + // NotImplementedError so callers see the right exception type. + py::register_exception_translator([](std::exception_ptr p) { + try { + if (p) + std::rethrow_exception(p); + } + catch (const dlslime::not_implemented& e) { + PyErr_SetString(PyExc_NotImplementedError, e.what()); + } + }); #endif // BUILD_TCP // ========================================================================= diff --git a/dlslime/dlslime/peer_agent/_agent.py b/dlslime/dlslime/peer_agent/_agent.py index 43a7405e..98a62455 100644 --- a/dlslime/dlslime/peer_agent/_agent.py +++ b/dlslime/dlslime/peer_agent/_agent.py @@ -36,7 +36,6 @@ from dlslime.logging import get_logger from ._mailbox import StreamMailbox from ._obs import _tlog -from ._tcp_endpoint import TcpEndpointAdapter logger = get_logger("peer_agent") @@ -1003,18 +1002,14 @@ def _ensure_local_endpoint_created(self, conn_id: str): conn.attach_endpoint(new_ep, pool) return new_ep - def _ensure_local_tcp_endpoint( - self, conn_id: str, conn: DirectedConnection - ) -> TcpEndpointAdapter: - """Construct (or reuse) a TCP endpoint adapter for a connection.""" + def _ensure_local_tcp_endpoint(self, conn_id: str, conn: DirectedConnection): + """Construct (or reuse) a TCP endpoint for a connection.""" if _TcpEndpoint is None: raise RuntimeError( "TCP transport requested but dlslime was built without BUILD_TCP." ) local_key: TcpResourceKey = conn.local_key # type: ignore[assignment] - new_ep = TcpEndpointAdapter( - _TcpEndpoint(ip=local_key.host, port=local_key.port) - ) + new_ep = _TcpEndpoint(ip=local_key.host, port=local_key.port) # Replay any previously-registered logical regions onto the new # endpoint so the handshake's endpoint_info() carries them. Done @@ -1039,7 +1034,7 @@ def _ensure_local_tcp_endpoint( with self._endpoints_lock: existing = self._endpoints.get(conn_id) if existing is not None: - # Race: another thread won. Drop the spare adapter. + # Race: another thread won. Drop the spare endpoint. try: new_ep.shutdown() except Exception: @@ -1282,7 +1277,7 @@ def register_memory_region( tcp_endpoints = [ ep for ep in self._endpoints.values() - if isinstance(ep, TcpEndpointAdapter) + if _TcpEndpoint is not None and isinstance(ep, _TcpEndpoint) ] for ep in tcp_endpoints: try: diff --git a/dlslime/dlslime/peer_agent/_tcp_endpoint.py b/dlslime/dlslime/peer_agent/_tcp_endpoint.py deleted file mode 100644 index 483d75a6..00000000 --- a/dlslime/dlslime/peer_agent/_tcp_endpoint.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Adapter that exposes a ``TcpEndpoint`` with the same surface PeerAgent -already calls on ``RDMAEndpoint`` (``send/recv/read/write``, plus -``endpoint_info``/``connect``/``register_memory_region``). The C++ -``TcpEndpoint`` uses ``async_*`` names and accepts an extra ``timeout_ms`` -kwarg; this thin shim collapses those differences without copying the -underlying object. - -``write_with_imm`` and ``imm_recv`` are RDMA-specific (write-with-immediate- -data) and have no TCP analogue; they raise ``NotImplementedError`` so the -peer agent's I/O facade can keep its uniform call shape and still surface a -clear error if a caller tries to use them on a TCP connection. -""" - -from __future__ import annotations - -from typing import Any, Dict, Optional - -from dlslime import TcpEndpoint - - -class TcpEndpointAdapter: - """Wrap a ``TcpEndpoint`` to match the ``RDMAEndpoint`` method surface.""" - - def __init__(self, endpoint: TcpEndpoint) -> None: - self._endpoint = endpoint - - @property - def raw(self) -> TcpEndpoint: - return self._endpoint - - def endpoint_info(self) -> Dict[str, Any]: - return self._endpoint.endpoint_info() - - def connect(self, peer_info: Dict[str, Any]) -> None: - self._endpoint.connect(peer_info) - - def is_connected(self) -> bool: - return self._endpoint.is_connected() - - def shutdown(self) -> None: - self._endpoint.shutdown() - - def mr_info(self) -> Dict[str, Any]: - return self._endpoint.mr_info() - - def register_memory_region( - self, name: str, ptr: int, offset: int, length: int - ) -> int: - return self._endpoint.register_memory_region(name, ptr, offset, length) - - def register_remote_memory_region(self, name: str, mr_info: Dict[str, Any]) -> int: - return self._endpoint.register_remote_memory_region(name, mr_info) - - # ------------------------------------------------------------------ - # Two-sided primitives. ``stream`` is an RDMA/CUDA concept and is - # accepted only for signature parity; TCP ignores it. - # ------------------------------------------------------------------ - def send(self, chunk, stream: Optional[Any] = None): - return self._endpoint.async_send(chunk) - - def recv(self, chunk, stream: Optional[Any] = None): - return self._endpoint.async_recv(chunk) - - # ------------------------------------------------------------------ - # One-sided primitives. - # ------------------------------------------------------------------ - def read(self, assign, stream: Optional[Any] = None): - return self._endpoint.async_read(self._normalize_assign(assign)) - - def write(self, assign, stream: Optional[Any] = None): - return self._endpoint.async_write(self._normalize_assign(assign)) - - def write_with_imm(self, assign, imm_data: int = 0, stream: Optional[Any] = None): - raise NotImplementedError( - "TCP transport does not support write_with_imm; this is RDMA-only." - ) - - def imm_recv(self, stream: Optional[Any] = None): - raise NotImplementedError( - "TCP transport does not support imm_recv; this is RDMA-only." - ) - - @staticmethod - def _normalize_assign(assign): - # The C++ async_read/async_write bindings expect a list of tuples. - # PeerAgent's RDMA path passes either a single tuple or a list. - if isinstance(assign, tuple): - return [assign] - return assign diff --git a/dlslime/examples/python/p2p_tcp_rc_send_recv.py b/dlslime/examples/python/p2p_tcp_rc_send_recv.py index d1a375a0..5a581371 100644 --- a/dlslime/examples/python/p2p_tcp_rc_send_recv.py +++ b/dlslime/examples/python/p2p_tcp_rc_send_recv.py @@ -43,10 +43,8 @@ def run_a() -> None: ctypes.memmove(addr_a, PAYLOAD_AB, len(PAYLOAD_AB)) # Small stagger so B has its async_recv posted before our send. time.sleep(0.5) - assert ep_a.async_send((addr_a, 0, len(PAYLOAD_AB))).wait() == 0 - assert ( - ep_a.async_recv((addr_a, len(PAYLOAD_AB), len(PAYLOAD_BA))).wait() == 0 - ) + assert ep_a.send((addr_a, 0, len(PAYLOAD_AB))).wait() == 0 + assert ep_a.recv((addr_a, len(PAYLOAD_AB), len(PAYLOAD_BA))).wait() == 0 received = bytes(buf_a[len(PAYLOAD_AB) : len(PAYLOAD_AB) + len(PAYLOAD_BA)]) assert received == PAYLOAD_BA, received except Exception as e: # noqa: BLE001 @@ -56,12 +54,12 @@ def run_b() -> None: try: barrier.wait(5) ep_b.connect(info_a) - assert ep_b.async_recv((addr_b, 0, len(PAYLOAD_AB))).wait() == 0 + assert ep_b.recv((addr_b, 0, len(PAYLOAD_AB))).wait() == 0 received = bytes(buf_b[: len(PAYLOAD_AB)]) assert received == PAYLOAD_AB, received ctypes.memmove(addr_b, PAYLOAD_BA, len(PAYLOAD_BA)) time.sleep(0.5) - assert ep_b.async_send((addr_b, 0, len(PAYLOAD_BA))).wait() == 0 + assert ep_b.send((addr_b, 0, len(PAYLOAD_BA))).wait() == 0 except Exception as e: # noqa: BLE001 err.append(("b", e)) diff --git a/dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py b/dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py index 6d62345a..2234c76e 100644 --- a/dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py +++ b/dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py @@ -59,9 +59,9 @@ def post_recv() -> None: t.join(timeout=10) print(f"A->B send/recv = {PAYLOAD_AB!r} ok") - # ── One-sided write: A writes into B's MR via the TCP endpoint - # adapter. TCP MR handles are per-endpoint, so we go through - # conn_a.endpoint (a TcpEndpointAdapter) directly. + # ── One-sided write: A writes into B's MR via the TCP endpoint. + # TCP MR handles are per-endpoint, so we go through conn_a.endpoint + # (a TcpEndpoint) directly. ep_a = conn_a.endpoint peer_info = conn_a.peer_endpoint_info # set by mailbox post-handshake assert peer_info is not None, "peer_endpoint_info missing after handshake" diff --git a/dlslime/tests/python/test_tcp.py b/dlslime/tests/python/test_tcp.py index 7d6874d2..cac0e836 100755 --- a/dlslime/tests/python/test_tcp.py +++ b/dlslime/tests/python/test_tcp.py @@ -86,10 +86,10 @@ def run_a(): ep_a.connect(info_b) ctypes.memmove(ctypes.addressof(buf_a), b"hello", 5) time.sleep(5) - st = ep_a.async_send((ctypes.addressof(buf_a), 0, 5)).wait() + st = ep_a.send((ctypes.addressof(buf_a), 0, 5)).wait() if st != 0: raise RuntimeError(f"send: {st}") - st = ep_a.async_recv((ctypes.addressof(buf_a), 5, 5)).wait() + st = ep_a.recv((ctypes.addressof(buf_a), 5, 5)).wait() if st != 0: raise RuntimeError(f"recv: {st}") if bytes(buf_a[5:10]) != b"world": @@ -98,14 +98,14 @@ def run_a(): def run_b(): ep_b.connect(info_a) - st = ep_b.async_recv((ctypes.addressof(buf_b), 0, 5)).wait() + st = ep_b.recv((ctypes.addressof(buf_b), 0, 5)).wait() if st != 0: raise RuntimeError(f"recv: {st}") if bytes(buf_b[:5]) != b"hello": raise RuntimeError(f"data: {bytes(buf_b[:5])}") ctypes.memmove(ctypes.addressof(buf_b), b"world", 5) time.sleep(5) - st = ep_b.async_send((ctypes.addressof(buf_b), 0, 5)).wait() + st = ep_b.send((ctypes.addressof(buf_b), 0, 5)).wait() if st != 0: raise RuntimeError(f"send: {st}") ep_b.shutdown() @@ -128,14 +128,14 @@ def run_a(): ep_a.connect(info_b) ctypes.memmove(ctypes.addressof(buf_a), b"one", 3) time.sleep(5) - st = ep_a.async_send((ctypes.addressof(buf_a), 0, 3)).wait() + st = ep_a.send((ctypes.addressof(buf_a), 0, 3)).wait() if st != 0: raise RuntimeError(f"send: {st}") ep_a.shutdown() def run_b(): ep_b.connect(info_a) - st = ep_b.async_recv((ctypes.addressof(buf_b), 0, 3)).wait() + st = ep_b.recv((ctypes.addressof(buf_b), 0, 3)).wait() if st != 0: raise RuntimeError(f"recv: {st}") if bytes(buf_b[:3]) != b"one": @@ -165,7 +165,7 @@ def test_async_write( def run_a(): ep_a.connect(info_b) ctypes.memmove(addr_a, test_data, len(test_data)) - st = ep_a.async_write([(h_a, h_br, 0, 0, len(test_data))]).wait() + st = ep_a.write([(h_a, h_br, 0, 0, len(test_data))]).wait() if st != 0: raise RuntimeError(f"write: {st}") ep_a.shutdown() @@ -203,7 +203,7 @@ def test_async_read( def run_a(): ep_a.connect(info_b) - st = ep_a.async_read([(h_a, h_br, 0, 0, len(test_data))]).wait() + st = ep_a.read([(h_a, h_br, 0, 0, len(test_data))]).wait() if st != 0: raise RuntimeError(f"read: {st}") if bytes(buf_a[: len(test_data)]) != test_data: @@ -236,7 +236,7 @@ def run_b(): def run_a(): ep_a.connect(ep_b.endpoint_info()) - fut = ep_a.async_recv((ctypes.addressof(buf_a), 0, 5)) + fut = ep_a.recv((ctypes.addressof(buf_a), 0, 5)) result = fut.wait_for(0.3) if result is not None: raise RuntimeError(f"expected None, got {result}") @@ -256,7 +256,7 @@ def test_send_timeout_ms( def run_b(): ep_b.connect(ep_a.endpoint_info()) - st = ep_b.async_recv((ctypes.addressof(buf_b), 0, 5)).wait() + st = ep_b.recv((ctypes.addressof(buf_b), 0, 5)).wait() if st != 0: raise RuntimeError(f"recv: {st}") ep_b.shutdown() @@ -264,7 +264,7 @@ def run_b(): def run_a(): ep_a.connect(ep_b.endpoint_info()) ctypes.memmove(ctypes.addressof(buf_a), b"world", 5) - st = ep_a.async_send((ctypes.addressof(buf_a), 0, 5), timeout_ms=10000).wait() + st = ep_a.send((ctypes.addressof(buf_a), 0, 5), timeout_ms=10000).wait() if st != 0: raise RuntimeError(f"send: {st}") ep_a.shutdown() @@ -283,7 +283,7 @@ def test_default_timeout( def run_b(): ep_b.connect(ep_a.endpoint_info()) - st = ep_b.async_recv((ctypes.addressof(buf_b), 0, 5)).wait() + st = ep_b.recv((ctypes.addressof(buf_b), 0, 5)).wait() if st != 0: raise RuntimeError(f"recv: {st}") ep_b.shutdown() @@ -291,7 +291,7 @@ def run_b(): def run_a(): ep_a.connect(ep_b.endpoint_info()) ctypes.memmove(ctypes.addressof(buf_a), b"test!", 5) - st = ep_a.async_send((ctypes.addressof(buf_a), 0, 5)).wait() + st = ep_a.send((ctypes.addressof(buf_a), 0, 5)).wait() if st != 0: raise RuntimeError(f"send: {st}") ep_a.shutdown() @@ -310,7 +310,7 @@ def test_exact_size_mismatch( def run_b(): ep_b.connect(ep_a.endpoint_info()) - st = ep_b.async_recv((ctypes.addressof(buf_b), 0, 4), exact_size=True).wait() + st = ep_b.recv((ctypes.addressof(buf_b), 0, 4), exact_size=True).wait() if st != -1: raise RuntimeError(f"expected TCP_FAILED(-1), got {st}") ep_b.shutdown() @@ -318,7 +318,7 @@ def run_b(): def run_a(): ep_a.connect(ep_b.endpoint_info()) ctypes.memmove(ctypes.addressof(buf_a), b"overflow", 8) - st = ep_a.async_send((ctypes.addressof(buf_a), 0, 8)).wait() + st = ep_a.send((ctypes.addressof(buf_a), 0, 8)).wait() if st != 0: raise RuntimeError(f"send: {st}") ep_a.shutdown() @@ -337,12 +337,12 @@ def test_overflow_truncate( def run_b(): ep_b.connect(ep_a.endpoint_info()) - st = ep_b.async_recv((ctypes.addressof(buf_b), 0, 4)).wait() + st = ep_b.recv((ctypes.addressof(buf_b), 0, 4)).wait() if st != 0: raise RuntimeError(f"recv1: {st}") if bytes(buf_b[:4]) != b"LONG": raise RuntimeError(f"truncated: {bytes(buf_b[:4])}") - st = ep_b.async_recv((ctypes.addressof(buf_b), 4, 5)).wait() + st = ep_b.recv((ctypes.addressof(buf_b), 4, 5)).wait() if st != 0: raise RuntimeError(f"recv2: {st}") if bytes(buf_b[4:9]) != b"HELLO": @@ -352,11 +352,11 @@ def run_b(): def run_a(): ep_a.connect(ep_b.endpoint_info()) ctypes.memmove(ctypes.addressof(buf_a), b"LONGDATA", 8) - st = ep_a.async_send((ctypes.addressof(buf_a), 0, 8)).wait() + st = ep_a.send((ctypes.addressof(buf_a), 0, 8)).wait() if st != 0: raise RuntimeError(f"send1: {st}") ctypes.memmove(ctypes.addressof(buf_a), b"HELLO", 5) - st = ep_a.async_send((ctypes.addressof(buf_a), 0, 5)).wait() + st = ep_a.send((ctypes.addressof(buf_a), 0, 5)).wait() if st != 0: raise RuntimeError(f"send2: {st}") ep_a.shutdown() @@ -430,10 +430,10 @@ def test_torch_send_recv( def run_a(): ep_a.connect(info_b) time.sleep(5) - st = ep_a.async_send((t_a.data_ptr(), 0, n_bytes)).wait() + st = ep_a.send((t_a.data_ptr(), 0, n_bytes)).wait() if st != 0: raise RuntimeError(f"send: {st}") - st = ep_a.async_recv((t_a.data_ptr(), 10 * 4, sl_bytes)).wait() + st = ep_a.recv((t_a.data_ptr(), 10 * 4, sl_bytes)).wait() if st != 0: raise RuntimeError(f"recv: {st}") if not torch.equal(t_a[10:15], t_b[20:25]): @@ -442,13 +442,13 @@ def run_a(): def run_b(): ep_b.connect(info_a) - st = ep_b.async_recv((t_b.data_ptr(), 0, n_bytes)).wait() + st = ep_b.recv((t_b.data_ptr(), 0, n_bytes)).wait() if st != 0: raise RuntimeError(f"recv: {st}") if not torch.equal(expected, t_b): raise RuntimeError("full tensor mismatch") time.sleep(5) - st = ep_b.async_send((t_b.data_ptr(), 20 * 4, sl_bytes)).wait() + st = ep_b.send((t_b.data_ptr(), 20 * 4, sl_bytes)).wait() if st != 0: raise RuntimeError(f"send: {st}") ep_b.shutdown() @@ -482,7 +482,7 @@ def test_torch_write( def run_a(): ep_a.connect(info_b) - st = ep_a.async_write([(h_a, h_br, 0, 0, n_bytes)]).wait() + st = ep_a.write([(h_a, h_br, 0, 0, n_bytes)]).wait() if st != 0: raise RuntimeError(f"write: {st}") ep_a.shutdown() @@ -527,7 +527,7 @@ def test_torch_read( def run_a(): ep_a.connect(info_b) - st = ep_a.async_read([(h_a, h_br, 0, 0, n_bytes)]).wait() + st = ep_a.read([(h_a, h_br, 0, 0, n_bytes)]).wait() if st != 0: raise RuntimeError(f"read: {st}") if not torch.equal(t_a, expected): @@ -583,7 +583,7 @@ def run_a(): (h_a_batch[i], h_br_batch[i], i * dsize, i * dsize, dsize) for i in range(n_batch) ] - st = ep_a.async_write(assigns).wait() + st = ep_a.write(assigns).wait() if st != 0: raise RuntimeError(f"write batch: {st}") ep_a.shutdown() @@ -641,7 +641,7 @@ def run_a(): (h_a_batch[i], h_br_batch[i], i * dsize, i * dsize, dsize) for i in range(n_batch) ] - st = ep_a.async_read(assigns).wait() + st = ep_a.read(assigns).wait() if st != 0: raise RuntimeError(f"read batch: {st}") ep_a.shutdown() diff --git a/docs/src/guide/peeragent-api.md b/docs/src/guide/peeragent-api.md index 31dbc960..13bd549b 100644 --- a/docs/src/guide/peeragent-api.md +++ b/docs/src/guide/peeragent-api.md @@ -74,7 +74,7 @@ conn.wait(timeout=60) | `peer_alias` | Remote PeerAgent alias. | | `local_nic` / `remote_nic` | Selected local and remote NICs. | | `state` | Connection state such as `connecting`, `connected`, or `failed`. | -| `endpoint` | Underlying `RDMAEndpoint` (or `TcpEndpointAdapter`) once created. | +| `endpoint` | Underlying `RDMAEndpoint` or `TcpEndpoint` once created. | | `peer_endpoint_info` | Peer's `endpoint_info` dict captured during handshake (TCP one-sided ops). | ### Selecting the transport diff --git a/docs/src/guide/tcp-transport.md b/docs/src/guide/tcp-transport.md index 86577a6e..ccacad54 100644 --- a/docs/src/guide/tcp-transport.md +++ b/docs/src/guide/tcp-transport.md @@ -8,10 +8,10 @@ TCP is the right transport when: - A connection has to traverse a network without RDMA capability. It exposes the same primitives as RDMA — `endpoint_info` / `connect`, two-sided -`async_send` / `async_recv`, one-sided `async_read` / `async_write`, and named -memory regions — and plugs into PeerAgent through the same control plane. -Immediate-data ops (`write_with_imm`, `imm_recv`) are RDMA-only and raise -`NotImplementedError` on TCP. +`send` / `recv`, one-sided `read` / `write`, and named memory regions — and +plugs into PeerAgent through the same control plane. Immediate-data ops +(`write_with_imm`, `imm_recv`) are RDMA-only; on TCP they raise +`NotImplementedError` (translated from a C++ `not_implemented` exception). TCP is enabled by default at build time (`BUILD_TCP=ON`). @@ -35,13 +35,13 @@ ep_a.connect(info_b) ep_b.connect(info_a) buf = ctypes.create_string_buffer(64) -fut = ep_a.async_send((ctypes.addressof(buf), 0, 5)) +fut = ep_a.send((ctypes.addressof(buf), 0, 5)) assert fut.wait() == 0 ``` -`async_send`, `async_recv`, `async_read`, and `async_write` return future -objects with a `wait()` method. `wait_for(seconds)` returns `None` on timeout -and the status code otherwise. +`send`, `recv`, `read`, and `write` return future objects with a `wait()` +method. `wait_for(seconds)` returns `None` on timeout and the status code +otherwise. For one-sided ops, register both sides' MRs **before** calling `connect()` so `endpoint_info()` carries them across the handshake: @@ -53,7 +53,7 @@ info_b = ep_b.endpoint_info() # carries mr_info["b"] h_remote = ep_a.register_remote_memory_region("rb", info_b["mr_info"]["b"]) ep_a.connect(info_b) ep_b.connect(ep_a.endpoint_info()) -ep_a.async_write([(h_local, h_remote, 0, 0, 12)]).wait() +ep_a.write([(h_local, h_remote, 0, 0, 12)]).wait() ``` Examples: @@ -105,7 +105,7 @@ conn_a = agent_a.connect_to("tcp_b", transport="tcp") agent_b.connect_to("tcp_a", transport="tcp") conn_a.wait() -ep_a = conn_a.endpoint # TcpEndpointAdapter +ep_a = conn_a.endpoint # TcpEndpoint peer_info = conn_a.peer_endpoint_info # set by mailbox post-handshake h_local = ep_a.register_memory_region("buf_a_loc", addr_a, 0, 64) h_remote = ep_a.register_remote_memory_region( From 8854e10c39d153eaf414516f6b916bd5e29de4f8 Mon Sep 17 00:00:00 2001 From: JimyMa Date: Wed, 27 May 2026 18:30:29 +0000 Subject: [PATCH 4/5] fix reviewer comment --- dlslime/dlslime/peer_agent/_agent.py | 36 +++++++++++++++++++++----- dlslime/dlslime/peer_agent/_mailbox.py | 5 +--- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/dlslime/dlslime/peer_agent/_agent.py b/dlslime/dlslime/peer_agent/_agent.py index 98a62455..c260d20c 100644 --- a/dlslime/dlslime/peer_agent/_agent.py +++ b/dlslime/dlslime/peer_agent/_agent.py @@ -1012,16 +1012,16 @@ def _ensure_local_tcp_endpoint(self, conn_id: str, conn: DirectedConnection): new_ep = _TcpEndpoint(ip=local_key.host, port=local_key.port) # Replay any previously-registered logical regions onto the new - # endpoint so the handshake's endpoint_info() carries them. Done - # before publishing the endpoint so concurrent MR registrations - # see the endpoint via the same lock ordering as the RDMA path. + # endpoint so the handshake's endpoint_info() carries them. with self._regions_lock: - regions = list(self._logical_regions.values()) - for region in regions: + initial_regions = list(self._logical_regions.values()) + replayed: Set[str] = set() + for region in initial_regions: try: new_ep.register_memory_region( region.name, region.ptr, region.offset, region.length ) + replayed.add(region.name) except Exception as e: logger.warning( "PeerAgent %s: TCP MR replay for %s failed during " @@ -1042,7 +1042,31 @@ def _ensure_local_tcp_endpoint(self, conn_id: str, conn: DirectedConnection): return existing self._endpoints[conn_id] = new_ep conn.attach_endpoint(new_ep, None) - return new_ep + + # Post-publish sweep: a concurrent ``register_memory_region`` may + # have added a new logical region after our initial snapshot but + # before we published ``new_ep`` — in which case its endpoint + # snapshot missed us. Re-read ``_logical_regions`` now that we are + # discoverable; ``register_memory_region`` calls landing after the + # publish will see ``new_ep`` and register on it directly, so this + # only catches the "added in the gap" set. + with self._regions_lock: + post_regions = list(self._logical_regions.values()) + for region in post_regions: + if region.name in replayed: + continue + try: + new_ep.register_memory_region( + region.name, region.ptr, region.offset, region.length + ) + except Exception as e: + logger.warning( + "PeerAgent %s: TCP MR post-replay for %s failed: %s", + self.alias, + region.name, + e, + ) + return new_ep def get_connections(self) -> Dict[str, Dict[str, PeerConnection]]: """Return local connections grouped by peer and connection id.""" diff --git a/dlslime/dlslime/peer_agent/_mailbox.py b/dlslime/dlslime/peer_agent/_mailbox.py index ab4a399e..e61cc11d 100644 --- a/dlslime/dlslime/peer_agent/_mailbox.py +++ b/dlslime/dlslime/peer_agent/_mailbox.py @@ -376,10 +376,7 @@ def _try_connect_peer_inner( endpoint.connect(peer_qp_info) # Stash for one-sided ops on transports (TCP) where remote MR info # rides on the endpoint_info JSON instead of a separate Redis record. - try: - conn.peer_endpoint_info = peer_qp_info - except Exception: - pass + conn.peer_endpoint_info = peer_qp_info _tlog( f"{self._agent.alias}: [D] endpoint.connect({peer}) " f"+{(time.perf_counter() - t_d) * 1000:.3f}ms" From b8a87f7e1868d12571959f6c60204d776b724e61 Mon Sep 17 00:00:00 2001 From: JimyMa Date: Wed, 27 May 2026 18:35:20 +0000 Subject: [PATCH 5/5] fix pypi ci --- .github/workflows/pypi-publish.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/pypi-publish.yml b/.github/workflows/pypi-publish.yml index d536eb98..b3bb7b2e 100644 --- a/.github/workflows/pypi-publish.yml +++ b/.github/workflows/pypi-publish.yml @@ -50,11 +50,13 @@ jobs: # Build only the matching Python ABI on x86_64 manylinux. CIBW_BUILD: "${{ matrix.python }}-manylinux_x86_64" CIBW_BEFORE_ALL_LINUX: | - yum install -y libibverbs-devel numactl-devel || \ - (apt-get update && apt-get install -y libibverbs-dev libnuma-dev) + (yum install -y epel-release || true) && \ + yum install -y libibverbs-devel numactl-devel asio-devel || \ + yum install -y libibverbs-devel numactl-devel boost-devel || \ + (apt-get update && apt-get install -y libibverbs-dev libnuma-dev libasio-dev) CIBW_ENVIRONMENT: >- CMAKE_ARGS=-DBUILD_RDMA=ON -DBUILD_PYTHON=ON - -DBUILD_NVLINK=OFF -DBUILD_TORCH_PLUGIN=OFF + -DBUILD_TCP=ON -DBUILD_NVLINK=OFF -DBUILD_TORCH_PLUGIN=OFF -DBUILD_ASCEND_DIRECT=OFF -DBUILD_TEST=OFF # Don't try to bundle libibverbs / libnuma into the wheel: # they're system-level libraries (provided by the rdma-core / numactl