Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions barretenberg/cpp/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ if(NOT FUZZING AND NOT WASM AND NOT BB_LITE)
# NOTE: Do not conditionally base this on the AVM flag as it defines a necessary vm2_sim library.
add_subdirectory(barretenberg/vm2)
add_subdirectory(barretenberg/ipc)
add_subdirectory(barretenberg/wsdb)
add_subdirectory(barretenberg/wsdb_client)
add_subdirectory(barretenberg/nodejs_module)
endif()

Expand Down
6 changes: 6 additions & 0 deletions barretenberg/cpp/src/barretenberg/ipc/ipc_client.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "barretenberg/ipc/ipc_client.hpp"
#include "barretenberg/ipc/mpsc_shm_client.hpp"
#include "barretenberg/ipc/shm_client.hpp"
#include "barretenberg/ipc/socket_client.hpp"
#include <cstddef>
Expand All @@ -17,4 +18,9 @@ std::unique_ptr<IpcClient> IpcClient::create_shm(const std::string& base_name)
return std::make_unique<ShmClient>(base_name);
}

std::unique_ptr<IpcClient> IpcClient::create_mpsc_shm(const std::string& base_name, size_t client_id)
{
return std::make_unique<MpscShmClient>(base_name, client_id);
}

} // namespace bb::ipc
1 change: 1 addition & 0 deletions barretenberg/cpp/src/barretenberg/ipc/ipc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class IpcClient {
// Factory methods
static std::unique_ptr<IpcClient> create_socket(const std::string& socket_path);
static std::unique_ptr<IpcClient> create_shm(const std::string& base_name);
static std::unique_ptr<IpcClient> create_mpsc_shm(const std::string& base_name, size_t client_id);
};

} // namespace bb::ipc
9 changes: 9 additions & 0 deletions barretenberg/cpp/src/barretenberg/ipc/ipc_server.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "barretenberg/ipc/ipc_server.hpp"
#include "barretenberg/ipc/mpsc_shm_server.hpp"
#include "barretenberg/ipc/shm_server.hpp"
#include "barretenberg/ipc/socket_server.hpp"
#include <cstddef>
Expand All @@ -19,4 +20,12 @@ std::unique_ptr<IpcServer> IpcServer::create_shm(const std::string& base_name,
return std::make_unique<ShmServer>(base_name, request_ring_size, response_ring_size);
}

std::unique_ptr<IpcServer> IpcServer::create_mpsc_shm(const std::string& base_name,
size_t max_clients,
size_t request_ring_size,
size_t response_ring_size)
{
return std::make_unique<MpscShmServer>(base_name, max_clients, request_ring_size, response_ring_size);
}

} // namespace bb::ipc
4 changes: 4 additions & 0 deletions barretenberg/cpp/src/barretenberg/ipc/ipc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ class IpcServer {
static std::unique_ptr<IpcServer> create_shm(const std::string& base_name,
size_t request_ring_size = static_cast<size_t>(1024 * 1024),
size_t response_ring_size = static_cast<size_t>(1024 * 1024));
static std::unique_ptr<IpcServer> create_mpsc_shm(const std::string& base_name,
size_t max_clients,
size_t request_ring_size = static_cast<size_t>(1024 * 1024),
size_t response_ring_size = static_cast<size_t>(1024 * 1024));

protected:
std::atomic<bool> shutdown_requested_{ false };
Expand Down
111 changes: 111 additions & 0 deletions barretenberg/cpp/src/barretenberg/ipc/mpsc_shm_client.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#pragma once

#include "ipc_client.hpp"
#include "shm/mpsc_shm.hpp"
#include "shm/spsc_shm.hpp"
#include "shm_common.hpp"
#include <cassert>
#include <cstdint>
#include <cstring>
#include <optional>
#include <string>
#include <utility>

namespace bb::ipc {

/**
* @brief IPC client for multi-client shared memory server
*
* Uses MpscProducer for sending requests and a dedicated SPSC ring for
* receiving responses. Each client is assigned a unique client_id.
*/
class MpscShmClient : public IpcClient {
public:
MpscShmClient(std::string base_name, size_t client_id)
: base_name_(std::move(base_name))
, client_id_(client_id)
{}

~MpscShmClient() override = default;

// Non-copyable, non-movable
MpscShmClient(const MpscShmClient&) = delete;
MpscShmClient& operator=(const MpscShmClient&) = delete;
MpscShmClient(MpscShmClient&&) = delete;
MpscShmClient& operator=(MpscShmClient&&) = delete;

bool connect() override
{
if (producer_.has_value()) {
return true; // Already connected
}

try {
// Connect as producer to the MPSC request system
producer_ = MpscProducer::connect(base_name_ + "_req", client_id_);

// Connect to our dedicated SPSC response ring
std::string resp_name = base_name_ + "_resp_" + std::to_string(client_id_);
response_ring_ = SpscShm::connect(resp_name);

return true;
} catch (...) {
producer_.reset();
response_ring_.reset();
return false;
}
}

bool send(const void* data, size_t len, uint64_t timeout_ns) override
{
if (!producer_.has_value()) {
return false;
}

// Claim space for length prefix + data
size_t total_size = sizeof(uint32_t) + len;
void* buf = producer_->claim(total_size, static_cast<uint32_t>(timeout_ns));
if (buf == nullptr) {
return false;
}

// Write length prefix + data
auto len_u32 = static_cast<uint32_t>(len);
std::memcpy(buf, &len_u32, sizeof(uint32_t));
std::memcpy(static_cast<uint8_t*>(buf) + sizeof(uint32_t), data, len);

// Publish (rings doorbell to wake server)
producer_->publish(total_size);
return true;
}

std::span<const uint8_t> receive(uint64_t timeout_ns) override
{
if (!response_ring_.has_value()) {
return {};
}
return ring_receive_msg(response_ring_.value(), timeout_ns);
}

void release(size_t message_size) override
{
if (!response_ring_.has_value()) {
return;
}
response_ring_->release(sizeof(uint32_t) + message_size);
}

void close() override
{
producer_.reset();
response_ring_.reset();
}

private:
std::string base_name_;
size_t client_id_;
std::optional<MpscProducer> producer_;
std::optional<SpscShm> response_ring_;
};

} // namespace bb::ipc
154 changes: 154 additions & 0 deletions barretenberg/cpp/src/barretenberg/ipc/mpsc_shm_server.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#pragma once

#include "ipc_server.hpp"
#include "shm/mpsc_shm.hpp"
#include "shm/spsc_shm.hpp"
#include "shm_common.hpp"
#include <cstdint>
#include <iostream>
#include <optional>
#include <string>
#include <utility>
#include <vector>

namespace bb::ipc {

/**
* @brief IPC server implementation using shared memory with multi-client support
*
* Uses MPSC (multi-producer single-consumer) for requests and per-client SPSC
* rings for responses. Supports up to max_clients concurrent clients.
*
* Shared memory layout:
* - Request: MPSC consumer with one SPSC ring per client (client writes, server reads)
* - Response: Separate SPSC ring per client (server writes, client reads)
*/
class MpscShmServer : public IpcServer {
public:
static constexpr size_t DEFAULT_RING_SIZE = 1 << 20; // 1MB

MpscShmServer(std::string base_name,
size_t max_clients,
size_t request_ring_size = DEFAULT_RING_SIZE,
size_t response_ring_size = DEFAULT_RING_SIZE)
: base_name_(std::move(base_name))
, max_clients_(max_clients)
, request_ring_size_(request_ring_size)
, response_ring_size_(response_ring_size)
{}

~MpscShmServer() override { close(); }

// Non-copyable, non-movable
MpscShmServer(const MpscShmServer&) = delete;
MpscShmServer& operator=(const MpscShmServer&) = delete;
MpscShmServer(MpscShmServer&&) = delete;
MpscShmServer& operator=(MpscShmServer&&) = delete;

bool listen() override
{
if (request_consumer_.has_value()) {
return true; // Already listening
}

// Clean up any leftover shared memory
MpscConsumer::unlink(base_name_ + "_req", max_clients_);
for (size_t i = 0; i < max_clients_; i++) {
SpscShm::unlink(base_name_ + "_resp_" + std::to_string(i));
}

try {
// Create MPSC consumer for requests (one ring per client)
request_consumer_ = MpscConsumer::create(base_name_ + "_req", max_clients_, request_ring_size_);

// Create per-client SPSC response rings
response_rings_.reserve(max_clients_);
for (size_t i = 0; i < max_clients_; i++) {
std::string resp_name = base_name_ + "_resp_" + std::to_string(i);
response_rings_.push_back(SpscShm::create(resp_name, response_ring_size_));
}

return true;
} catch (...) {
close();
return false;
}
}

int wait_for_data(uint64_t timeout_ns) override
{
if (!request_consumer_.has_value()) {
return -1;
}
// MpscConsumer::wait_for_data returns ring index = client_id
return request_consumer_->wait_for_data(static_cast<uint32_t>(timeout_ns));
}

std::span<const uint8_t> receive(int client_id) override
{
if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
return {};
}
// Peek on the specific client's request ring via MpscConsumer
void* len_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t), 100000000);
if (len_ptr == nullptr) {
return {};
}
uint32_t msg_len = 0;
std::memcpy(&msg_len, len_ptr, sizeof(uint32_t));

void* msg_ptr = request_consumer_->peek(static_cast<size_t>(client_id), sizeof(uint32_t) + msg_len, 100000000);
if (msg_ptr == nullptr) {
return {};
}
return std::span<const uint8_t>(static_cast<const uint8_t*>(msg_ptr) + sizeof(uint32_t), msg_len);
}

void release(int client_id, size_t message_size) override
{
if (!request_consumer_.has_value() || client_id < 0 || static_cast<size_t>(client_id) >= max_clients_) {
return;
}
request_consumer_->release(static_cast<size_t>(client_id), sizeof(uint32_t) + message_size);
}

bool send(int client_id, const void* data, size_t len) override
{
if (client_id < 0 || static_cast<size_t>(client_id) >= response_rings_.size()) {
return false;
}
return ring_send_msg(response_rings_[static_cast<size_t>(client_id)], data, len, 100000000);
}

void close() override
{
request_consumer_.reset();
response_rings_.clear();

// Clean up shared memory
MpscConsumer::unlink(base_name_ + "_req", max_clients_);
for (size_t i = 0; i < max_clients_; i++) {
SpscShm::unlink(base_name_ + "_resp_" + std::to_string(i));
}
}

void wakeup_all() override
{
if (request_consumer_.has_value()) {
request_consumer_->wakeup_all();
}
for (auto& ring : response_rings_) {
ring.wakeup_all();
}
}

private:
std::string base_name_;
size_t max_clients_;
size_t request_ring_size_;
size_t response_ring_size_;
std::optional<MpscConsumer> request_consumer_;
std::vector<SpscShm> response_rings_;
};

} // namespace bb::ipc
Loading
Loading