Skip to content

Commit 615bde9

Browse files
author
AztecBot
committed
Merge branch 'next' into merge-train/fairies
2 parents af79f53 + e9c8d80 commit 615bde9

35 files changed

Lines changed: 4178 additions & 22 deletions

barretenberg/cpp/src/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ if(NOT FUZZING AND NOT WASM AND NOT BB_LITE)
127127
# NOTE: Do not conditionally base this on the AVM flag as it defines a necessary vm2_sim library.
128128
add_subdirectory(barretenberg/vm2)
129129
add_subdirectory(barretenberg/ipc)
130+
add_subdirectory(barretenberg/wsdb)
131+
add_subdirectory(barretenberg/wsdb_client)
130132
add_subdirectory(barretenberg/nodejs_module)
131133
endif()
132134

barretenberg/cpp/src/barretenberg/ipc/ipc_client.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "barretenberg/ipc/ipc_client.hpp"
2+
#include "barretenberg/ipc/mpsc_shm_client.hpp"
23
#include "barretenberg/ipc/shm_client.hpp"
34
#include "barretenberg/ipc/socket_client.hpp"
45
#include <cstddef>
@@ -17,4 +18,9 @@ std::unique_ptr<IpcClient> IpcClient::create_shm(const std::string& base_name)
1718
return std::make_unique<ShmClient>(base_name);
1819
}
1920

21+
std::unique_ptr<IpcClient> IpcClient::create_mpsc_shm(const std::string& base_name, size_t client_id)
22+
{
23+
return std::make_unique<MpscShmClient>(base_name, client_id);
24+
}
25+
2026
} // namespace bb::ipc

barretenberg/cpp/src/barretenberg/ipc/ipc_client.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class IpcClient {
7272
// Factory methods
7373
static std::unique_ptr<IpcClient> create_socket(const std::string& socket_path);
7474
static std::unique_ptr<IpcClient> create_shm(const std::string& base_name);
75+
static std::unique_ptr<IpcClient> create_mpsc_shm(const std::string& base_name, size_t client_id);
7576
};
7677

7778
} // namespace bb::ipc

barretenberg/cpp/src/barretenberg/ipc/ipc_server.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "barretenberg/ipc/ipc_server.hpp"
2+
#include "barretenberg/ipc/mpsc_shm_server.hpp"
23
#include "barretenberg/ipc/shm_server.hpp"
34
#include "barretenberg/ipc/socket_server.hpp"
45
#include <cstddef>
@@ -19,4 +20,12 @@ std::unique_ptr<IpcServer> IpcServer::create_shm(const std::string& base_name,
1920
return std::make_unique<ShmServer>(base_name, request_ring_size, response_ring_size);
2021
}
2122

23+
std::unique_ptr<IpcServer> IpcServer::create_mpsc_shm(const std::string& base_name,
24+
size_t max_clients,
25+
size_t request_ring_size,
26+
size_t response_ring_size)
27+
{
28+
return std::make_unique<MpscShmServer>(base_name, max_clients, request_ring_size, response_ring_size);
29+
}
30+
2231
} // namespace bb::ipc

barretenberg/cpp/src/barretenberg/ipc/ipc_server.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ class IpcServer {
194194
static std::unique_ptr<IpcServer> create_shm(const std::string& base_name,
195195
size_t request_ring_size = static_cast<size_t>(1024 * 1024),
196196
size_t response_ring_size = static_cast<size_t>(1024 * 1024));
197+
static std::unique_ptr<IpcServer> create_mpsc_shm(const std::string& base_name,
198+
size_t max_clients,
199+
size_t request_ring_size = static_cast<size_t>(1024 * 1024),
200+
size_t response_ring_size = static_cast<size_t>(1024 * 1024));
197201

198202
protected:
199203
std::atomic<bool> shutdown_requested_{ false };
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#pragma once
2+
3+
#include "ipc_client.hpp"
4+
#include "shm/mpsc_shm.hpp"
5+
#include "shm/spsc_shm.hpp"
6+
#include "shm_common.hpp"
7+
#include <cassert>
8+
#include <cstdint>
9+
#include <cstring>
10+
#include <optional>
11+
#include <string>
12+
#include <utility>
13+
14+
namespace bb::ipc {
15+
16+
/**
17+
* @brief IPC client for multi-client shared memory server
18+
*
19+
* Uses MpscProducer for sending requests and a dedicated SPSC ring for
20+
* receiving responses. Each client is assigned a unique client_id.
21+
*/
22+
class MpscShmClient : public IpcClient {
23+
public:
24+
MpscShmClient(std::string base_name, size_t client_id)
25+
: base_name_(std::move(base_name))
26+
, client_id_(client_id)
27+
{}
28+
29+
~MpscShmClient() override = default;
30+
31+
// Non-copyable, non-movable
32+
MpscShmClient(const MpscShmClient&) = delete;
33+
MpscShmClient& operator=(const MpscShmClient&) = delete;
34+
MpscShmClient(MpscShmClient&&) = delete;
35+
MpscShmClient& operator=(MpscShmClient&&) = delete;
36+
37+
bool connect() override
38+
{
39+
if (producer_.has_value()) {
40+
return true; // Already connected
41+
}
42+
43+
try {
44+
// Connect as producer to the MPSC request system
45+
producer_ = MpscProducer::connect(base_name_ + "_req", client_id_);
46+
47+
// Connect to our dedicated SPSC response ring
48+
std::string resp_name = base_name_ + "_resp_" + std::to_string(client_id_);
49+
response_ring_ = SpscShm::connect(resp_name);
50+
51+
return true;
52+
} catch (...) {
53+
producer_.reset();
54+
response_ring_.reset();
55+
return false;
56+
}
57+
}
58+
59+
bool send(const void* data, size_t len, uint64_t timeout_ns) override
60+
{
61+
if (!producer_.has_value()) {
62+
return false;
63+
}
64+
65+
// Claim space for length prefix + data
66+
size_t total_size = sizeof(uint32_t) + len;
67+
void* buf = producer_->claim(total_size, static_cast<uint32_t>(timeout_ns));
68+
if (buf == nullptr) {
69+
return false;
70+
}
71+
72+
// Write length prefix + data
73+
auto len_u32 = static_cast<uint32_t>(len);
74+
std::memcpy(buf, &len_u32, sizeof(uint32_t));
75+
std::memcpy(static_cast<uint8_t*>(buf) + sizeof(uint32_t), data, len);
76+
77+
// Publish (rings doorbell to wake server)
78+
producer_->publish(total_size);
79+
return true;
80+
}
81+
82+
std::span<const uint8_t> receive(uint64_t timeout_ns) override
83+
{
84+
if (!response_ring_.has_value()) {
85+
return {};
86+
}
87+
return ring_receive_msg(response_ring_.value(), timeout_ns);
88+
}
89+
90+
void release(size_t message_size) override
91+
{
92+
if (!response_ring_.has_value()) {
93+
return;
94+
}
95+
response_ring_->release(sizeof(uint32_t) + message_size);
96+
}
97+
98+
void close() override
99+
{
100+
producer_.reset();
101+
response_ring_.reset();
102+
}
103+
104+
private:
105+
std::string base_name_;
106+
size_t client_id_;
107+
std::optional<MpscProducer> producer_;
108+
std::optional<SpscShm> response_ring_;
109+
};
110+
111+
} // namespace bb::ipc
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
#pragma once
2+
3+
#include "ipc_server.hpp"
4+
#include "shm/mpsc_shm.hpp"
5+
#include "shm/spsc_shm.hpp"
6+
#include "shm_common.hpp"
7+
#include <cstdint>
8+
#include <iostream>
9+
#include <optional>
10+
#include <string>
11+
#include <utility>
12+
#include <vector>
13+
14+
namespace bb::ipc {
15+
16+
/**
17+
* @brief IPC server implementation using shared memory with multi-client support
18+
*
19+
* Uses MPSC (multi-producer single-consumer) for requests and per-client SPSC
20+
* rings for responses. Supports up to max_clients concurrent clients.
21+
*
22+
* Shared memory layout:
23+
* - Request: MPSC consumer with one SPSC ring per client (client writes, server reads)
24+
* - Response: Separate SPSC ring per client (server writes, client reads)
25+
*/
26+
class MpscShmServer : public IpcServer {
27+
public:
28+
static constexpr size_t DEFAULT_RING_SIZE = 1 << 20; // 1MB
29+
30+
MpscShmServer(std::string base_name,
31+
size_t max_clients,
32+
size_t request_ring_size = DEFAULT_RING_SIZE,
33+
size_t response_ring_size = DEFAULT_RING_SIZE)
34+
: base_name_(std::move(base_name))
35+
, max_clients_(max_clients)
36+
, request_ring_size_(request_ring_size)
37+
, response_ring_size_(response_ring_size)
38+
{}
39+
40+
~MpscShmServer() override { close(); }
41+
42+
// Non-copyable, non-movable
43+
MpscShmServer(const MpscShmServer&) = delete;
44+
MpscShmServer& operator=(const MpscShmServer&) = delete;
45+
MpscShmServer(MpscShmServer&&) = delete;
46+
MpscShmServer& operator=(MpscShmServer&&) = delete;
47+
48+
bool listen() override
49+
{
50+
if (request_consumer_.has_value()) {
51+
return true; // Already listening
52+
}
53+
54+
// Clean up any leftover shared memory
55+
MpscConsumer::unlink(base_name_ + "_req", max_clients_);
56+
for (size_t i = 0; i < max_clients_; i++) {
57+
SpscShm::unlink(base_name_ + "_resp_" + std::to_string(i));
58+
}
59+
60+
try {
61+
// Create MPSC consumer for requests (one ring per client)
62+
request_consumer_ = MpscConsumer::create(base_name_ + "_req", max_clients_, request_ring_size_);
63+
64+
// Create per-client SPSC response rings
65+
response_rings_.reserve(max_clients_);
66+
for (size_t i = 0; i < max_clients_; i++) {
67+
std::string resp_name = base_name_ + "_resp_" + std::to_string(i);
68+
response_rings_.push_back(SpscShm::create(resp_name, response_ring_size_));
69+
}
70+
71+
return true;
72+
} catch (...) {
73+
close();
74+
return false;
75+
}
76+
}
77+
78+
int wait_for_data(uint64_t timeout_ns) override
79+
{
80+
if (!request_consumer_.has_value()) {
81+
return -1;
82+
}
83+
// MpscConsumer::wait_for_data returns ring index = client_id
84+
return request_consumer_->wait_for_data(static_cast<uint32_t>(timeout_ns));
85+
}
86+
87+
std::span<const uint8_t> receive(int client_id) override
88+
{
89+
if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
90+
return {};
91+
}
92+
// Peek on the specific client's request ring via MpscConsumer
93+
void* len_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t), 100000000);
94+
if (len_ptr == nullptr) {
95+
return {};
96+
}
97+
uint32_t msg_len = 0;
98+
std::memcpy(&msg_len, len_ptr, sizeof(uint32_t));
99+
100+
void* msg_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t) + msg_len, 100000000);
101+
if (msg_ptr == nullptr) {
102+
return {};
103+
}
104+
return std::span<const uint8_t>(static_cast<const uint8_t*>(msg_ptr) + sizeof(uint32_t), msg_len);
105+
}
106+
107+
void release(int client_id, size_t message_size) override
108+
{
109+
if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
110+
return;
111+
}
112+
request_consumer_->release(static_cast<size_t>(client_id), sizeof(uint32_t) + message_size);
113+
}
114+
115+
bool send(int client_id, const void* data, size_t len) override
116+
{
117+
if (client_id < 0 || static_cast<size_t>(client_id) >= response_rings_.size()) {
118+
return false;
119+
}
120+
return ring_send_msg(response_rings_[static_cast<size_t>(client_id)], data, len, 100000000);
121+
}
122+
123+
void close() override
124+
{
125+
request_consumer_.reset();
126+
response_rings_.clear();
127+
128+
// Clean up shared memory
129+
MpscConsumer::unlink(base_name_ + "_req", max_clients_);
130+
for (size_t i = 0; i < max_clients_; i++) {
131+
SpscShm::unlink(base_name_ + "_resp_" + std::to_string(i));
132+
}
133+
}
134+
135+
void wakeup_all() override
136+
{
137+
if (request_consumer_.has_value()) {
138+
request_consumer_->wakeup_all();
139+
}
140+
for (auto& ring : response_rings_) {
141+
ring.wakeup_all();
142+
}
143+
}
144+
145+
private:
146+
std::string base_name_;
147+
size_t max_clients_;
148+
size_t request_ring_size_;
149+
size_t response_ring_size_;
150+
std::optional<MpscConsumer> request_consumer_;
151+
std::vector<SpscShm> response_rings_;
152+
};
153+
154+
} // namespace bb::ipc

0 commit comments

Comments
 (0)