Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/pypi-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ jobs:
# Build only the matching Python ABI on x86_64 manylinux.
CIBW_BUILD: "${{ matrix.python }}-manylinux_x86_64"
CIBW_BEFORE_ALL_LINUX: |
yum install -y libibverbs-devel numactl-devel || \
(apt-get update && apt-get install -y libibverbs-dev libnuma-dev)
(yum install -y epel-release || true) && \
yum install -y libibverbs-devel numactl-devel asio-devel || \
yum install -y libibverbs-devel numactl-devel boost-devel || \
(apt-get update && apt-get install -y libibverbs-dev libnuma-dev libasio-dev)
CIBW_ENVIRONMENT: >-
CMAKE_ARGS=-DBUILD_RDMA=ON -DBUILD_PYTHON=ON
-DBUILD_NVLINK=OFF -DBUILD_TORCH_PLUGIN=OFF
-DBUILD_TCP=ON -DBUILD_NVLINK=OFF -DBUILD_TORCH_PLUGIN=OFF
-DBUILD_ASCEND_DIRECT=OFF -DBUILD_TEST=OFF
# Don't try to bundle libibverbs / libnuma into the wheel:
# they're system-level libraries (provided by the rdma-core / numactl
Expand Down
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ repos:
rev: v2.9.0
hooks:
- id: ufmt
# black 25.x requires Python >= 3.9; pin a recent stable to avoid
# pre-commit defaulting to whatever system python happens to be.
language_version: python3.12
# Use whatever Python pre-commit was launched with (3.9+ required by
# black 25.x). Avoids forcing a specific 3.x venv that may not exist
# on every machine.
additional_dependencies:
- black==25.9.0
- usort==1.0.8.post1
Expand Down
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ DLSlime is a PeerAgent-centered communication and microservice toolkit for
distributed AI systems. PeerAgent is the runtime hub: application services such
as SlimeRPC and DLSlimeCache build on it, NanoCtrl supplies service governance
and coordination metadata around it, and endpoint APIs below it drive
heterogeneous transports such as RDMA, NVLink, and Ascend Direct.
heterogeneous transports such as RDMA, TCP, NVLink, and Ascend Direct.

DLSlime is designed to be adopted one layer at a time. Applications can start
with direct endpoints, add PeerAgent coordination, use NanoCtrl for governance,
Expand Down Expand Up @@ -96,6 +96,36 @@ python dlslime/examples/python/p2p_ascend_read.py
Ascend Direct setup details live in
[docs/huawei_ascend/README.md](docs/huawei_ascend/README.md).

### TCP Fallback Transport

Use TCP when the hosts have no RDMA NICs or when a peer connection has to
traverse a network without RDMA capability. The TCP transport exposes the same
primitives — `endpoint_info` / `connect`, two-sided `send` / `recv`, one-sided
`read` / `write`, and named memory regions — and plugs into PeerAgent through
the same control plane via `connect_to(transport="tcp")`. Immediate-data ops
(`write_with_imm`, `imm_recv`) are RDMA-only and raise `NotImplementedError`
on TCP.

`BUILD_TCP` is `ON` by default.

Raw `TcpEndpoint`, no NanoCtrl required:

```bash
python dlslime/examples/python/p2p_tcp_rc_send_recv.py
```

PeerAgent over TCP:

```bash
nanoctrl start
python dlslime/examples/python/p2p_tcp_send_recv_peer_agent.py
python dlslime/examples/python/p2p_tcp_rc_write_peer_agent.py
python dlslime/examples/python/p2p_tcp_rc_read_peer_agent.py
```

See [docs/src/guide/tcp-transport.md](docs/src/guide/tcp-transport.md) for the
full surface, one-sided I/O setup, and the test reference.

### PeerAgent-to-PeerAgent Access

Use PeerAgent when the application wants peer-to-peer data movement without
Expand Down Expand Up @@ -241,6 +271,7 @@ cmake --build build
| Flag | Default | Description |
| --------------------- | ---------------------------------------: | ------------------------------------------------------ |
| `BUILD_RDMA` | `ON` | Build the RDMA transfer engine |
| `BUILD_TCP` | `ON` | Build the TCP transfer engine (fallback transport) |
| `BUILD_PYTHON` | `OFF` in CMake, `ON` in `pyproject.toml` | Build Python bindings |
| `BUILD_NVLINK` | `OFF` | Build the NVLink transfer engine |
| `BUILD_ASCEND_DIRECT` | `OFF` | Build Ascend Direct transport |
Expand Down Expand Up @@ -289,6 +320,7 @@ scripts/ Repo-wide automation (release.sh, ...)

- [Documentation index](docs/README.md)
- [Roadmap](docs/roadmap.md)
- [TCP transport guide](docs/src/guide/tcp-transport.md)
- [DLSlimeCache design](docs/design/dlslime-cache.md)
- [Endpoint ownership model](docs/endpoint-ownership-model.md)
- [Endpoint DeviceSignal refactor](docs/endpoint-device-signal-refactor.md)
Expand Down
2 changes: 1 addition & 1 deletion dlslime/bench/python/tcp_bench_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def rank_0_print(*args):
def transfer_batch_concurrency_dlslime(
role, opcode, local_handle, remote_handle, tensor, batch_size, num_concurrency
):
fn = tcp_endpoint.async_read if opcode == "read" else tcp_endpoint.async_write
fn = tcp_endpoint.read if opcode == "read" else tcp_endpoint.write
if role == "initiator":
slots = []
for concurrent_id in range(num_concurrency):
Expand Down
39 changes: 19 additions & 20 deletions dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ int32_t TcpEndpoint::register_remote_memory_region(const std::string& name, cons
return remote_pool_->register_remote_memory_region(mr_info, name);
}

// ── async_send ──────────────────────────────────────────
// ── send ──────────────────────────────────────────
// chunk_tuple_t = (src_ptr, offset, length) — raw pointers, no MR lookup.

std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/)
std::shared_ptr<TcpSendFuture> TcpEndpoint::send(const chunk_tuple_t& chunk, int64_t /*timeout_ms*/)
{
uintptr_t src = std::get<0>(chunk) + std::get<1>(chunk);
size_t len = std::get<2>(chunk);
Expand All @@ -198,7 +198,7 @@ std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chun
auto* buf = new char[len];
auto cu_err = cudaMemcpy(buf, send_ptr, len, cudaMemcpyDeviceToHost);
if (cu_err != cudaSuccess) {
SLIME_LOG_ERROR("async_send cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
SLIME_LOG_ERROR("send cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
delete[] buf;
op->completion_status.store(TCP_FAILED, std::memory_order_release);
op->signal->force_complete();
Expand All @@ -213,7 +213,7 @@ std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chun
auto session = std::make_shared<ClientSession>(
std::move(conn->socket), [op, conn, &pool, send_ptr, is_cuda](asio::error_code ec) {
if (ec)
SLIME_LOG_WARN("async_send: ", ec.message());
SLIME_LOG_WARN("send: ", ec.message());
op->completion_status.store(ec ? TCP_FAILED : TCP_SUCCESS, std::memory_order_release);
if (op->signal)
op->signal->set_comm_done(0);
Expand All @@ -228,10 +228,10 @@ std::shared_ptr<TcpSendFuture> TcpEndpoint::async_send(const chunk_tuple_t& chun
return std::make_shared<TcpSendFuture>(op);
}

// ── async_recv ──────────────────────────────────────────
// ── recv ──────────────────────────────────────────
// chunk_tuple_t = (dst_ptr, offset, length) — raw pointers, no MR lookup.

std::shared_ptr<TcpRecvFuture> TcpEndpoint::async_recv(const chunk_tuple_t& chunk, bool exact_size)
std::shared_ptr<TcpRecvFuture> TcpEndpoint::recv(const chunk_tuple_t& chunk, bool exact_size)
{
auto op = TcpOpState::create();
op->signal->reset_all();
Expand All @@ -258,15 +258,14 @@ std::shared_ptr<TcpRecvFuture> TcpEndpoint::async_recv(const chunk_tuple_t& chun
return std::make_shared<TcpRecvFuture>(op);
}

// ── async_read ──────────────────────────────────────────
// ── read ──────────────────────────────────────────
// Each assign creates an independent ClientSession; all share one OpState.
// Future.wait() blocks until every session has signalled its bit.

std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<assign_tuple_t>& assign,
int64_t /*timeout_ms*/)
std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::read(const std::vector<assign_tuple_t>& assign, int64_t /*timeout_ms*/)
{
if (assign.empty())
throw std::runtime_error("TcpEndpoint::async_read: empty assignment");
throw std::runtime_error("TcpEndpoint::read: empty assignment");

size_t N = assign.size();
auto op = TcpOpState::create();
Expand All @@ -288,7 +287,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<as
auto local = local_pool_->get_mr_fast(local_h);
auto remote = remote_pool_->get_remote_mr_fast(remote_h);
if (local.length == 0 || remote.length == 0)
throw std::runtime_error("TcpEndpoint::async_read: invalid MR handle");
throw std::runtime_error("TcpEndpoint::read: invalid MR handle");

uintptr_t local_dst = local.addr + local_off;
SessionHeader hdr{length, remote.addr + remote_off, OP_READ};
Expand All @@ -313,14 +312,14 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<as
std::move(conn->socket),
[op, conn, i, &pool, read_dst, is_cuda, real_dst = local_dst, len = length](asio::error_code ec) {
if (ec) {
SLIME_LOG_WARN("async_read session ", i, ": ", ec.message());
SLIME_LOG_WARN("read session ", i, ": ", ec.message());
op->completion_status.store(TCP_FAILED, std::memory_order_release);
}
#ifdef USE_CUDA
if (!ec && is_cuda) {
auto cu_err = cudaMemcpy(reinterpret_cast<void*>(real_dst), read_dst, len, cudaMemcpyHostToDevice);
if (cu_err != cudaSuccess) {
SLIME_LOG_ERROR("async_read cudaMemcpy H2D: ", cudaGetErrorString(cu_err));
SLIME_LOG_ERROR("read cudaMemcpy H2D: ", cudaGetErrorString(cu_err));
op->completion_status.store(TCP_FAILED, std::memory_order_release);
}
}
Expand All @@ -337,14 +336,14 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_read(const std::vector<as
return std::make_shared<TcpReadWriteFuture>(op);
}

// ── async_write ─────────────────────────────────────────
// ── write ─────────────────────────────────────────
// Each assign creates an independent ClientSession; all share one OpState.

std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<assign_tuple_t>& assign,
int64_t /*timeout_ms*/)
std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::write(const std::vector<assign_tuple_t>& assign,
int64_t /*timeout_ms*/)
{
if (assign.empty())
throw std::runtime_error("TcpEndpoint::async_write: empty assignment");
throw std::runtime_error("TcpEndpoint::write: empty assignment");

size_t N = assign.size();
auto op = TcpOpState::create();
Expand All @@ -366,7 +365,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<a
auto local = local_pool_->get_mr_fast(local_h);
auto remote = remote_pool_->get_remote_mr_fast(remote_h);
if (local.length == 0 || remote.length == 0)
throw std::runtime_error("TcpEndpoint::async_write: invalid MR handle");
throw std::runtime_error("TcpEndpoint::write: invalid MR handle");

uintptr_t src = local.addr + local_off;
SessionHeader hdr{length, remote.addr + remote_off, OP_WRITE};
Expand All @@ -385,7 +384,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<a
auto* buf = new char[length];
auto cu_err = cudaMemcpy(buf, send_ptr, length, cudaMemcpyDeviceToHost);
if (cu_err != cudaSuccess) {
SLIME_LOG_ERROR("async_write cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
SLIME_LOG_ERROR("write cudaMemcpy D2H: ", cudaGetErrorString(cu_err));
delete[] buf;
op->completion_status.store(TCP_FAILED, std::memory_order_release);
op->signal->force_complete();
Expand All @@ -400,7 +399,7 @@ std::shared_ptr<TcpReadWriteFuture> TcpEndpoint::async_write(const std::vector<a
auto session = std::make_shared<ClientSession>(
std::move(conn->socket), [op, conn, i, &pool, send_ptr, is_cuda](asio::error_code ec) {
if (ec) {
SLIME_LOG_WARN("async_write session ", i, ": ", ec.message());
SLIME_LOG_WARN("write session ", i, ": ", ec.message());
op->completion_status.store(TCP_FAILED, std::memory_order_release);
}
if (op->signal)
Expand Down
35 changes: 29 additions & 6 deletions dlslime/dlslime/csrc/engine/tcp/tcp_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <deque>
#include <memory>
#include <mutex>
#include <stdexcept>
#include <string>
#include <vector>

Expand All @@ -20,6 +21,14 @@
#include "tcp_session.h"

namespace dlslime {

// Thrown by transport operations that exist in the abstract endpoint
// surface (for parity with RDMAEndpoint) but have no analogue on this
// transport. Translated to Python ``NotImplementedError`` by the bindings.
struct not_implemented: public std::logic_error {
using std::logic_error::logic_error;
};

namespace tcp {

using json = nlohmann::json;
Expand Down Expand Up @@ -48,16 +57,30 @@ class TcpEndpoint: public std::enable_shared_from_this<TcpEndpoint> {
json mr_info() const;

// ── Async I/O (all return Future immediately; I/O runs on io_context thread) ──
// Method names match RDMAEndpoint so PeerAgent can hold either type
// without an adapter. ``stream`` exists in the bindings for signature
// parity with RDMA and is ignored on TCP.

std::shared_ptr<TcpSendFuture> async_send(const chunk_tuple_t& chunk, int64_t timeout_ms = kDefaultTimeoutMs);
std::shared_ptr<TcpSendFuture> send(const chunk_tuple_t& chunk, int64_t timeout_ms = kDefaultTimeoutMs);

std::shared_ptr<TcpRecvFuture> async_recv(const chunk_tuple_t& chunk, bool exact_size = false);
std::shared_ptr<TcpRecvFuture> recv(const chunk_tuple_t& chunk, bool exact_size = false);

std::shared_ptr<TcpReadWriteFuture> async_read(const std::vector<assign_tuple_t>& assign,
int64_t timeout_ms = kDefaultTimeoutMs);
std::shared_ptr<TcpReadWriteFuture> read(const std::vector<assign_tuple_t>& assign,
int64_t timeout_ms = kDefaultTimeoutMs);

std::shared_ptr<TcpReadWriteFuture> async_write(const std::vector<assign_tuple_t>& assign,
int64_t timeout_ms = kDefaultTimeoutMs);
std::shared_ptr<TcpReadWriteFuture> write(const std::vector<assign_tuple_t>& assign,
int64_t timeout_ms = kDefaultTimeoutMs);

// ── RDMA-only ops; provided for interface parity, never succeed on TCP. ──
[[noreturn]] void write_with_imm(const std::vector<assign_tuple_t>& /*assign*/, uint32_t /*imm_data*/ = 0)
{
throw not_implemented("TCP transport does not support write_with_imm; this is RDMA-only.");
}

[[noreturn]] void imm_recv()
{
throw not_implemented("TCP transport does not support imm_recv; this is RDMA-only.");
}

// ── Accessors ───────────────────────────────────────
void setId(int64_t id)
Expand Down
Loading