Skip to content

Commit 6e4b9a4

Browse files
authored
tcp_peer_agent (#105)
* tcp_peer_agent * update docs * identity interface of tcp and RDMA * fix reviewer comment * fix pypi ci
1 parent ed422cd commit 6e4b9a4

21 files changed

Lines changed: 1193 additions & 144 deletions

.github/workflows/pypi-publish.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,13 @@ jobs:
5050
# Build only the matching Python ABI on x86_64 manylinux.
5151
CIBW_BUILD: "${{ matrix.python }}-manylinux_x86_64"
5252
CIBW_BEFORE_ALL_LINUX: |
53-
yum install -y libibverbs-devel numactl-devel || \
54-
(apt-get update && apt-get install -y libibverbs-dev libnuma-dev)
53+
(yum install -y epel-release || true) && \
54+
yum install -y libibverbs-devel numactl-devel asio-devel || \
55+
yum install -y libibverbs-devel numactl-devel boost-devel || \
56+
(apt-get update && apt-get install -y libibverbs-dev libnuma-dev libasio-dev)
5557
CIBW_ENVIRONMENT: >-
5658
CMAKE_ARGS=-DBUILD_RDMA=ON -DBUILD_PYTHON=ON
57-
-DBUILD_NVLINK=OFF -DBUILD_TORCH_PLUGIN=OFF
59+
-DBUILD_TCP=ON -DBUILD_NVLINK=OFF -DBUILD_TORCH_PLUGIN=OFF
5860
-DBUILD_ASCEND_DIRECT=OFF -DBUILD_TEST=OFF
5961
# Don't try to bundle libibverbs / libnuma into the wheel:
6062
# they're system-level libraries (provided by the rdma-core / numactl

.pre-commit-config.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ repos:
33
rev: v2.9.0
44
hooks:
55
- id: ufmt
6-
# black 25.x requires Python >= 3.9; pin a recent stable to avoid
7-
# pre-commit defaulting to whatever system python happens to be.
8-
language_version: python3.12
6+
# Use whatever Python pre-commit was launched with (3.9+ required by
7+
# black 25.x). Avoids forcing a specific 3.x venv that may not exist
8+
# on every machine.
99
additional_dependencies:
1010
- black==25.9.0
1111
- usort==1.0.8.post1

README.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ DLSlime is a PeerAgent-centered communication and microservice toolkit for
1616
distributed AI systems. PeerAgent is the runtime hub: application services such
1717
as SlimeRPC and DLSlimeCache build on it, NanoCtrl supplies service governance
1818
and coordination metadata around it, and endpoint APIs below it drive
19-
heterogeneous transports such as RDMA, NVLink, and Ascend Direct.
19+
heterogeneous transports such as RDMA, TCP, NVLink, and Ascend Direct.
2020

2121
DLSlime is designed to be adopted one layer at a time. Applications can start
2222
with direct endpoints, add PeerAgent coordination, use NanoCtrl for governance,
@@ -96,6 +96,36 @@ python dlslime/examples/python/p2p_ascend_read.py
9696
Ascend Direct setup details live in
9797
[docs/huawei_ascend/README.md](docs/huawei_ascend/README.md).
9898

99+
### TCP Fallback Transport
100+
101+
Use TCP when the hosts have no RDMA NICs or when a peer connection has to
102+
traverse a network without RDMA capability. The TCP transport exposes the same
103+
primitives — `endpoint_info` / `connect`, two-sided `send` / `recv`, one-sided
104+
`read` / `write`, and named memory regions — and plugs into PeerAgent through
105+
the same control plane via `connect_to(transport="tcp")`. Immediate-data ops
106+
(`write_with_imm`, `imm_recv`) are RDMA-only and raise `NotImplementedError`
107+
on TCP.
108+
109+
`BUILD_TCP` is `ON` by default.
110+
111+
Raw `TcpEndpoint`, no NanoCtrl required:
112+
113+
```bash
114+
python dlslime/examples/python/p2p_tcp_rc_send_recv.py
115+
```
116+
117+
PeerAgent over TCP:
118+
119+
```bash
120+
nanoctrl start
121+
python dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py
122+
python dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py
123+
python dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py
124+
```
125+
126+
See [docs/src/guide/tcp-transport.md](docs/src/guide/tcp-transport.md) for the
127+
full surface, one-sided I/O setup, and the test reference.
128+
99129
### PeerAgent-to-PeerAgent Access
100130

101131
Use PeerAgent when the application wants peer-to-peer data movement without
@@ -241,6 +271,7 @@ cmake --build build
241271
| Flag | Default | Description |
242272
| --------------------- | ---------------------------------------: | ------------------------------------------------------ |
243273
| `BUILD_RDMA` | `ON` | Build the RDMA transfer engine |
274+
| `BUILD_TCP` | `ON` | Build the TCP transfer engine (fallback transport) |
244275
| `BUILD_PYTHON` | `OFF` in CMake, `ON` in `pyproject.toml` | Build Python bindings |
245276
| `BUILD_NVLINK` | `OFF` | Build the NVLink transfer engine |
246277
| `BUILD_ASCEND_DIRECT` | `OFF` | Build Ascend Direct transport |
@@ -289,6 +320,7 @@ scripts/ Repo-wide automation (release.sh, ...)
289320

290321
- [Documentation index](docs/README.md)
291322
- [Roadmap](docs/roadmap.md)
323+
- [TCP transport guide](docs/src/guide/tcp-transport.md)
292324
- [DLSlimeCache design](docs/design/dlslime-cache.md)
293325
- [Endpoint ownership model](docs/endpoint-ownership-model.md)
294326
- [Endpoint DeviceSignal refactor](docs/endpoint-device-signal-refactor.md)

dlslime/bench/python/tcp_bench_spmd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def rank_0_print(*args):
197197
def transfer_batch_concurrency_dlslime(
198198
role, opcode, local_handle, remote_handle, tensor, batch_size, num_concurrency
199199
):
200-
fn = tcp_endpoint.async_read if opcode == "read" else tcp_endpoint.async_write
200+
fn = tcp_endpoint.read if opcode == "read" else tcp_endpoint.write
201201
if role == "initiator":
202202
slots = []
203203
for concurrent_id in range(num_concurrency):

dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.cpp

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ int32_t TcpEndpoint::register_remote_memory_region(const std::string& name, cons
170170
return remote_pool_->register_remote_memory_region(mr_info, name);
171171
}
172172

173-
// ── async_send ──────────────────────────────────────────
173+
// ── send ──────────────────────────────────────────
174174
// chunk_tuple_t = (src_ptr, offset, length) — raw pointers, no MR lookup.
175175

176-
std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/)
176+
std::shared_ptr<TcpSendFuture> TcpEndpoint::send(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/)
177177
{
178178
uintptr_t src = std::get<0>(chunk) + std::get<1>(chunk);
179179
size_t len = std::get<2>(chunk);
@@ -198,7 +198,7 @@ std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chun
198198
auto* buf = new char[len];
199199
auto cu_err = cudaMemcpy(buf, send_ptr, len, cudaMemcpyDeviceToHost);
200200
if (cu_err != cudaSuccess) {
201-
SLIME_LOG_ERROR("async_send cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
201+
SLIME_LOG_ERROR("send cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
202202
delete[] buf;
203203
op->completion_status.store(TCP_FAILED, std::memory_order_release);
204204
op->signal->force_complete();
@@ -213,7 +213,7 @@ std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chun
213213
auto session = std::make_shared<ClientSession>(
214214
std::move(conn->socket), [op, conn, &pool, send_ptr, is_cuda](asio::error_code ec) {
215215
if (ec)
216-
SLIME_LOG_WARN("async_send: ", ec.message());
216+
SLIME_LOG_WARN("send: ", ec.message());
217217
op->completion_status.store(ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release);
218218
if (op->signal)
219219
op->signal->set_comm_done(0);
@@ -228,10 +228,10 @@ std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chun
228228
return std::make_shared<TcpSendFuture>(op);
229229
}
230230

231-
// ── async_recv ──────────────────────────────────────────
231+
// ── recv ──────────────────────────────────────────
232232
// chunk_tuple_t = (dst_ptr, offset, length) — raw pointers, no MR lookup.
233233

234-
std::shared_ptr<TcpRecvFuture> TcpEndpoint::async_recv(const chunk_tuple_t& chunk, bool exact_size)
234+
std::shared_ptr<TcpRecvFuture> TcpEndpoint::recv(const chunk_tuple_t& chunk, bool exact_size)
235235
{
236236
auto op = TcpOpState::create();
237237
op->signal->reset_all();
@@ -258,15 +258,14 @@ std::shared_ptr<TcpRecvFuture> TcpEndpoint::async_recv(const chunk_tuple_t& chun
258258
return std::make_shared<TcpRecvFuture>(op);
259259
}
260260

261-
// ── async_read ──────────────────────────────────────────
261+
// ── read ──────────────────────────────────────────
262262
// Each assign creates an independent ClientSession; all share one OpState.
263263
// Future.wait() blocks until every session has signalled its bit.
264264

265-
std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<assign_tuple_t>& assign,
266-
int64_t /*timeout_ms*/)
265+
std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::read(const std::vector<assign_tuple_t>& assign, int64_t /*timeout_ms*/)
267266
{
268267
if (assign.empty())
269-
throw std::runtime_error("TcpEndpoint::async_read: empty assignment");
268+
throw std::runtime_error("TcpEndpoint::read: empty assignment");
270269

271270
size_t N = assign.size();
272271
auto op = TcpOpState::create();
@@ -288,7 +287,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<as
288287
auto local = local_pool_->get_mr_fast(local_h);
289288
auto remote = remote_pool_->get_remote_mr_fast(remote_h);
290289
if (local.length == 0 || remote.length == 0)
291-
throw std::runtime_error("TcpEndpoint::async_read: invalid MR handle");
290+
throw std::runtime_error("TcpEndpoint::read: invalid MR handle");
292291

293292
uintptr_t local_dst = local.addr + local_off;
294293
SessionHeader hdr{length, remote.addr + remote_off, OP_READ};
@@ -313,14 +312,14 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<as
313312
std::move(conn->socket),
314313
[op, conn, i, &pool, read_dst, is_cuda, real_dst = local_dst, len = length](asio::error_code ec) {
315314
if (ec) {
316-
SLIME_LOG_WARN("async_read session ", i, ": ", ec.message());
315+
SLIME_LOG_WARN("read session ", i, ": ", ec.message());
317316
op->completion_status.store(TCP_FAILED, std::memory_order_release);
318317
}
319318
#ifdef USE_CUDA
320319
if (!ec && is_cuda) {
321320
auto cu_err = cudaMemcpy(reinterpret_cast<void*>(real_dst), read_dst, len, cudaMemcpyHostToDevice);
322321
if (cu_err != cudaSuccess) {
323-
SLIME_LOG_ERROR("async_read cudaMemcpy H2D: ", cudaGetErrorString(cu_err));
322+
SLIME_LOG_ERROR("read cudaMemcpy H2D: ", cudaGetErrorString(cu_err));
324323
op->completion_status.store(TCP_FAILED, std::memory_order_release);
325324
}
326325
}
@@ -337,14 +336,14 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<as
337336
return std::make_shared<TcpReadWriteFuture>(op);
338337
}
339338

340-
// ── async_write ─────────────────────────────────────────
339+
// ── write ─────────────────────────────────────────
341340
// Each assign creates an independent ClientSession; all share one OpState.
342341

343-
std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<assign_tuple_t>& assign,
344-
int64_t /*timeout_ms*/)
342+
std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::write(const std::vector<assign_tuple_t>& assign,
343+
int64_t /*timeout_ms*/)
345344
{
346345
if (assign.empty())
347-
throw std::runtime_error("TcpEndpoint::async_write: empty assignment");
346+
throw std::runtime_error("TcpEndpoint::write: empty assignment");
348347

349348
size_t N = assign.size();
350349
auto op = TcpOpState::create();
@@ -366,7 +365,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<a
366365
auto local = local_pool_->get_mr_fast(local_h);
367366
auto remote = remote_pool_->get_remote_mr_fast(remote_h);
368367
if (local.length == 0 || remote.length == 0)
369-
throw std::runtime_error("TcpEndpoint::async_write: invalid MR handle");
368+
throw std::runtime_error("TcpEndpoint::write: invalid MR handle");
370369

371370
uintptr_t src = local.addr + local_off;
372371
SessionHeader hdr{length, remote.addr + remote_off, OP_WRITE};
@@ -385,7 +384,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<a
385384
auto* buf = new char[length];
386385
auto cu_err = cudaMemcpy(buf, send_ptr, length, cudaMemcpyDeviceToHost);
387386
if (cu_err != cudaSuccess) {
388-
SLIME_LOG_ERROR("async_write cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
387+
SLIME_LOG_ERROR("write cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
389388
delete[] buf;
390389
op->completion_status.store(TCP_FAILED, std::memory_order_release);
391390
op->signal->force_complete();
@@ -400,7 +399,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<a
400399
auto session = std::make_shared<ClientSession>(
401400
std::move(conn->socket), [op, conn, i, &pool, send_ptr, is_cuda](asio::error_code ec) {
402401
if (ec) {
403-
SLIME_LOG_WARN("async_write session ", i, ": ", ec.message());
402+
SLIME_LOG_WARN("write session ", i, ": ", ec.message());
404403
op->completion_status.store(TCP_FAILED, std::memory_order_release);
405404
}
406405
if (op->signal)

dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.h

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <deque>
77
#include <memory>
88
#include <mutex>
9+
#include <stdexcept>
910
#include <string>
1011
#include <vector>
1112

@@ -20,6 +21,14 @@
2021
#include "tcp_session.h"
2122

2223
namespace dlslime {
24+
25+
// Thrown by transport operations that exist in the abstract endpoint
26+
// surface (for parity with RDMAEndpoint) but have no analogue on this
27+
// transport. Translated to Python ``NotImplementedError`` by the bindings.
28+
struct not_implemented: public std::logic_error {
29+
using std::logic_error::logic_error;
30+
};
31+
2332
namespace tcp {
2433

2534
using json = nlohmann::json;
@@ -48,16 +57,30 @@ class TcpEndpoint: public std::enable_shared_from_this<TcpEndpoint> {
4857
json mr_info() const;
4958

5059
// ── Async I/O (all return Future immediately; I/O runs on io_context thread) ──
60+
// Method names match RDMAEndpoint so PeerAgent can hold either type
61+
// without an adapter. ``stream`` exists in the bindings for signature
62+
// parity with RDMA and is ignored on TCP.
5163

52-
std::shared_ptr<TcpSendFuture> async_send(const chunk_tuple_t& chunk, int64_t timeout_ms = kDefaultTimeoutMs);
64+
std::shared_ptr<TcpSendFuture> send(const chunk_tuple_t& chunk, int64_t timeout_ms = kDefaultTimeoutMs);
5365

54-
std::shared_ptr<TcpRecvFuture> async_recv(const chunk_tuple_t& chunk, bool exact_size = false);
66+
std::shared_ptr<TcpRecvFuture> recv(const chunk_tuple_t& chunk, bool exact_size = false);
5567

56-
std::shared_ptr<TcpReadWriteFuture> async_read(const std::vector<assign_tuple_t>& assign,
57-
int64_t timeout_ms = kDefaultTimeoutMs);
68+
std::shared_ptr<TcpReadWriteFuture> read(const std::vector<assign_tuple_t>& assign,
69+
int64_t timeout_ms = kDefaultTimeoutMs);
5870

59-
std::shared_ptr<TcpReadWriteFuture> async_write(const std::vector<assign_tuple_t>& assign,
60-
int64_t timeout_ms = kDefaultTimeoutMs);
71+
std::shared_ptr<TcpReadWriteFuture> write(const std::vector<assign_tuple_t>& assign,
72+
int64_t timeout_ms = kDefaultTimeoutMs);
73+
74+
// ── RDMA-only ops; provided for interface parity, never succeed on TCP. ──
75+
[[noreturn]] void write_with_imm(const std::vector<assign_tuple_t>& /*assign*/, uint32_t /*imm_data*/ = 0)
76+
{
77+
throw not_implemented("TCP transport does not support write_with_imm; this is RDMA-only.");
78+
}
79+
80+
[[noreturn]] void imm_recv()
81+
{
82+
throw not_implemented("TCP transport does not support imm_recv; this is RDMA-only.");
83+
}
6184

6285
// ── Accessors ───────────────────────────────────────
6386
void setId(int64_t id)

0 commit comments

Comments
 (0)