Skip to content

Commit 17b93f4

Browse files
committed
feat(ipc): MPSC shared-memory transport with multi-client echo test
Adds a multi-producer, single-consumer shared-memory IPC transport. Internally this is **N SPSC rings (one per producer) + a shared doorbell futex** — there is no shared producer ring. Each client writes only to its own SPSC ring; producers ring the doorbell to wake the consumer, which then checks which ring has data and reads from it. Each client also has its own SPSC response ring. This is the same pattern that existed in earlier IPC iterations and is what \`aztec-wsdb\` will could use when serving both the TS layer and a C++ AVM worker pool from the same world state if we need the performance. Right now though we'll use UDS. This is just if needed in future.
1 parent 8b1067f commit 17b93f4

35 files changed

Lines changed: 4178 additions & 22 deletions

barretenberg/cpp/src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ if(NOT FUZZING AND NOT WASM AND NOT BB_LITE)
127127
# NOTE: Do not conditionally base this on the AVM flag as it defines a necessary vm2_sim library.
128128
add_subdirectory(barretenberg/vm2)
129129
add_subdirectory(barretenberg/ipc)
130+
add_subdirectory(barretenberg/wsdb)
131+
add_subdirectory(barretenberg/wsdb_client)
130132
add_subdirectory(barretenberg/nodejs_module)
131133
endif()
132134

barretenberg/cpp/src/barretenberg/ipc/ipc_client.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "barretenberg/ipc/ipc_client.hpp"
2+
#include "barretenberg/ipc/mpsc_shm_client.hpp"
23
#include "barretenberg/ipc/shm_client.hpp"
34
#include "barretenberg/ipc/socket_client.hpp"
45
#include <cstddef>
@@ -17,4 +18,9 @@ std::unique_ptr<IpcClient> IpcClient::create_shm(const std::string& base_name)
1718
return std::make_unique<ShmClient>(base_name);
1819
}
1920

21+
std::unique_ptr<IpcClient> IpcClient::create_mpsc_shm(const std::string& base_name, size_t client_id)
22+
{
23+
return std::make_unique<MpscShmClient>(base_name, client_id);
24+
}
25+
2026
} // namespace bb::ipc

barretenberg/cpp/src/barretenberg/ipc/ipc_client.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class IpcClient {
7272
// Factory methods
7373
static std::unique_ptr<IpcClient> create_socket(const std::string& socket_path);
7474
static std::unique_ptr<IpcClient> create_shm(const std::string& base_name);
75+
static std::unique_ptr<IpcClient> create_mpsc_shm(const std::string& base_name, size_t client_id);
7576
};
7677

7778
} // namespace bb::ipc

barretenberg/cpp/src/barretenberg/ipc/ipc_server.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "barretenberg/ipc/ipc_server.hpp"
2+
#include "barretenberg/ipc/mpsc_shm_server.hpp"
23
#include "barretenberg/ipc/shm_server.hpp"
34
#include "barretenberg/ipc/socket_server.hpp"
45
#include <cstddef>
@@ -19,4 +20,12 @@ std::unique_ptr<IpcServer> IpcServer::create_shm(const std::string& base_name,
1920
return std::make_unique<ShmServer>(base_name, request_ring_size, response_ring_size);
2021
}
2122

23+
std::unique_ptr<IpcServer> IpcServer::create_mpsc_shm(const std::string& base_name,
24+
size_t max_clients,
25+
size_t request_ring_size,
26+
size_t response_ring_size)
27+
{
28+
return std::make_unique<MpscShmServer>(base_name, max_clients, request_ring_size, response_ring_size);
29+
}
30+
2231
} // namespace bb::ipc

barretenberg/cpp/src/barretenberg/ipc/ipc_server.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ class IpcServer {
194194
static std::unique_ptr<IpcServer> create_shm(const std::string& base_name,
195195
size_t request_ring_size = static_cast<size_t>(1024 * 1024),
196196
size_t response_ring_size = static_cast<size_t>(1024 * 1024));
197+
static std::unique_ptr<IpcServer> create_mpsc_shm(const std::string& base_name,
198+
size_t max_clients,
199+
size_t request_ring_size = static_cast<size_t>(1024 * 1024),
200+
size_t response_ring_size = static_cast<size_t>(1024 * 1024));
197201

198202
protected:
199203
std::atomic<bool> shutdown_requested_{ false };
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#pragma once
2+
3+
#include "ipc_client.hpp"
4+
#include "shm/mpsc_shm.hpp"
5+
#include "shm/spsc_shm.hpp"
6+
#include "shm_common.hpp"
7+
#include <cassert>
8+
#include <cstdint>
9+
#include <cstring>
10+
#include <optional>
11+
#include <string>
12+
#include <utility>
13+
14+
namespace bb::ipc {
15+
16+
/**
17+
* @brief IPC client for multi-client shared memory server
18+
*
19+
* Uses MpscProducer for sending requests and a dedicated SPSC ring for
20+
* receiving responses. Each client is assigned a unique client_id.
21+
*/
22+
class MpscShmClient : public IpcClient {
23+
public:
24+
MpscShmClient(std::string base_name, size_t client_id)
25+
: base_name_(std::move(base_name))
26+
, client_id_(client_id)
27+
{}
28+
29+
~MpscShmClient() override = default;
30+
31+
// Non-copyable, non-movable
32+
MpscShmClient(const MpscShmClient&) = delete;
33+
MpscShmClient& operator=(const MpscShmClient&) = delete;
34+
MpscShmClient(MpscShmClient&&) = delete;
35+
MpscShmClient& operator=(MpscShmClient&&) = delete;
36+
37+
bool connect() override
38+
{
39+
if (producer_.has_value()) {
40+
return true; // Already connected
41+
}
42+
43+
try {
44+
// Connect as producer to the MPSC request system
45+
producer_ = MpscProducer::connect(base_name_ + "_req", client_id_);
46+
47+
// Connect to our dedicated SPSC response ring
48+
std::string resp_name = base_name_ + "_resp_" + std::to_string(client_id_);
49+
response_ring_ = SpscShm::connect(resp_name);
50+
51+
return true;
52+
} catch (...) {
53+
producer_.reset();
54+
response_ring_.reset();
55+
return false;
56+
}
57+
}
58+
59+
bool send(const void* data, size_t len, uint64_t timeout_ns) override
60+
{
61+
if (!producer_.has_value()) {
62+
return false;
63+
}
64+
65+
// Claim space for length prefix + data
66+
size_t total_size = sizeof(uint32_t) + len;
67+
void* buf = producer_->claim(total_size, static_cast<uint32_t>(timeout_ns));
68+
if (buf == nullptr) {
69+
return false;
70+
}
71+
72+
// Write length prefix + data
73+
auto len_u32 = static_cast<uint32_t>(len);
74+
std::memcpy(buf, &len_u32, sizeof(uint32_t));
75+
std::memcpy(static_cast<uint8_t*>(buf) + sizeof(uint32_t), data, len);
76+
77+
// Publish (rings doorbell to wake server)
78+
producer_->publish(total_size);
79+
return true;
80+
}
81+
82+
std::span<const uint8_t> receive(uint64_t timeout_ns) override
83+
{
84+
if (!response_ring_.has_value()) {
85+
return {};
86+
}
87+
return ring_receive_msg(response_ring_.value(), timeout_ns);
88+
}
89+
90+
void release(size_t message_size) override
91+
{
92+
if (!response_ring_.has_value()) {
93+
return;
94+
}
95+
response_ring_->release(sizeof(uint32_t) + message_size);
96+
}
97+
98+
void close() override
99+
{
100+
producer_.reset();
101+
response_ring_.reset();
102+
}
103+
104+
private:
105+
std::string base_name_;
106+
size_t client_id_;
107+
std::optional<MpscProducer> producer_;
108+
std::optional<SpscShm> response_ring_;
109+
};
110+
111+
} // namespace bb::ipc
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
#pragma once
2+
3+
#include "ipc_server.hpp"
4+
#include "shm/mpsc_shm.hpp"
5+
#include "shm/spsc_shm.hpp"
6+
#include "shm_common.hpp"
7+
#include <cstdint>
8+
#include <iostream>
9+
#include <optional>
10+
#include <string>
11+
#include <utility>
12+
#include <vector>
13+
14+
namespace bb::ipc {
15+
16+
/**
17+
* @brief IPC server implementation using shared memory with multi-client support
18+
*
19+
* Uses MPSC (multi-producer single-consumer) for requests and per-client SPSC
20+
* rings for responses. Supports up to max_clients concurrent clients.
21+
*
22+
* Shared memory layout:
23+
* - Request: MPSC consumer with one SPSC ring per client (client writes, server reads)
24+
* - Response: Separate SPSC ring per client (server writes, client reads)
25+
*/
26+
class MpscShmServer : public IpcServer {
27+
public:
28+
static constexpr size_t DEFAULT_RING_SIZE = 1 << 20; // 1MB
29+
30+
MpscShmServer(std::string base_name,
31+
size_t max_clients,
32+
size_t request_ring_size = DEFAULT_RING_SIZE,
33+
size_t response_ring_size = DEFAULT_RING_SIZE)
34+
: base_name_(std::move(base_name))
35+
, max_clients_(max_clients)
36+
, request_ring_size_(request_ring_size)
37+
, response_ring_size_(response_ring_size)
38+
{}
39+
40+
~MpscShmServer() override { close(); }
41+
42+
// Non-copyable, non-movable
43+
MpscShmServer(const MpscShmServer&) = delete;
44+
MpscShmServer& operator=(const MpscShmServer&) = delete;
45+
MpscShmServer(MpscShmServer&&) = delete;
46+
MpscShmServer& operator=(MpscShmServer&&) = delete;
47+
48+
bool listen() override
49+
{
50+
if (request_consumer_.has_value()) {
51+
return true; // Already listening
52+
}
53+
54+
// Clean up any leftover shared memory
55+
MpscConsumer::unlink(base_name_ + "_req", max_clients_);
56+
for (size_t i = 0; i < max_clients_; i++) {
57+
SpscShm::unlink(base_name_ + "_resp_" + std::to_string(i));
58+
}
59+
60+
try {
61+
// Create MPSC consumer for requests (one ring per client)
62+
request_consumer_ = MpscConsumer::create(base_name_ + "_req", max_clients_, request_ring_size_);
63+
64+
// Create per-client SPSC response rings
65+
response_rings_.reserve(max_clients_);
66+
for (size_t i = 0; i < max_clients_; i++) {
67+
std::string resp_name = base_name_ + "_resp_" + std::to_string(i);
68+
response_rings_.push_back(SpscShm::create(resp_name, response_ring_size_));
69+
}
70+
71+
return true;
72+
} catch (...) {
73+
close();
74+
return false;
75+
}
76+
}
77+
78+
int wait_for_data(uint64_t timeout_ns) override
79+
{
80+
if (!request_consumer_.has_value()) {
81+
return -1;
82+
}
83+
// MpscConsumer::wait_for_data returns ring index = client_id
84+
return request_consumer_->wait_for_data(static_cast<uint32_t>(timeout_ns));
85+
}
86+
87+
std::span<const uint8_t> receive(int client_id) override
88+
{
89+
if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
90+
return {};
91+
}
92+
// Peek on the specific client's request ring via MpscConsumer
93+
void* len_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t), 100000000);
94+
if (len_ptr == nullptr) {
95+
return {};
96+
}
97+
uint32_t msg_len = 0;
98+
std::memcpy(&msg_len, len_ptr, sizeof(uint32_t));
99+
100+
void* msg_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t) + msg_len, 100000000);
101+
if (msg_ptr == nullptr) {
102+
return {};
103+
}
104+
return std::span<const uint8_t>(static_cast<const uint8_t*>(msg_ptr) + sizeof(uint32_t), msg_len);
105+
}
106+
107+
void release(int client_id, size_t message_size) override
108+
{
109+
if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
110+
return;
111+
}
112+
request_consumer_->release(static_cast<size_t>(client_id), sizeof(uint32_t) + message_size);
113+
}
114+
115+
bool send(int client_id, const void* data, size_t len) override
116+
{
117+
if (client_id < 0 || static_cast<size_t>(client_id) >= response_rings_.size()) {
118+
return false;
119+
}
120+
return ring_send_msg(response_rings_[static_cast<size_t>(client_id)], data, len, 100000000);
121+
}
122+
123+
void close() override
124+
{
125+
request_consumer_.reset();
126+
response_rings_.clear();
127+
128+
// Clean up shared memory
129+
MpscConsumer::unlink(base_name_ + "_req", max_clients_);
130+
for (size_t i = 0; i < max_clients_; i++) {
131+
SpscShm::unlink(base_name_ + "_resp_" + std::to_string(i));
132+
}
133+
}
134+
135+
void wakeup_all() override
136+
{
137+
if (request_consumer_.has_value()) {
138+
request_consumer_->wakeup_all();
139+
}
140+
for (auto& ring : response_rings_) {
141+
ring.wakeup_all();
142+
}
143+
}
144+
145+
private:
146+
std::string base_name_;
147+
size_t max_clients_;
148+
size_t request_ring_size_;
149+
size_t response_ring_size_;
150+
std::optional<MpscConsumer> request_consumer_;
151+
std::vector<SpscShm> response_rings_;
152+
};
153+
154+
} // namespace bb::ipc

0 commit comments

Comments
 (0)