Skip to content

Latest commit

 

History

History
402 lines (321 loc) · 15.4 KB

File metadata and controls

402 lines (321 loc) · 15.4 KB

Examples

This document provides a "cookbook" of common usage patterns for the LockFreeSpscQueue.

1. Initialization

The LockFreeSpscQueue is an index manager that operates on a user-provided memory buffer. The user is responsible for creating and owning the buffer and providing a std::span of it to the queue's constructor.

The buffer's capacity must be a power of two.

#include "LockFreeSpscQueue.h"
#include <vector>
#include <string>

// Define a data type to be used in the queue.
struct Message {
    int id;
    std::string payload;
};

// Define the capacity.
const size_t QUEUE_CAPACITY = 128; // 2^7

// Create the memory buffer that will be shared between threads.
std::vector<Message> shared_data_buffer(QUEUE_CAPACITY);

// Create the queue, giving it a non-owning view of our buffer.
LockFreeSpscQueue<Message> queue(shared_data_buffer);

Lifetime Note: The user must ensure that shared_data_buffer outlives the queue object.


2. Writing to the Queue (Producer)

There are three primary ways to write to the queue, each suited for a different use case.

High-Frequency Writes: WriteTransaction

This is the highest-performance API for producers that generate many individual items in a tight loop. It amortizes the high cost of atomic synchronization over many fast, non-atomic pushes and emplaces.

The process involves three steps:

  1. Start a transaction for a batch of items using try_start_write().
  2. If successful, use the transaction object's ultra-fast try_emplace() or try_push() methods to add items to the reserved space.
  3. It tracks the number of successful pushes/emplaces. When the WriteTransaction object is destroyed, it automatically commits exactly that many items. Any unused portion of the initial reservation is simply ignored and made available for future writes.

The try_emplace Method (Recommended for Complex Objects)

try_emplace is the most efficient way to add a complex object to the queue. It constructs the object in-place directly in the queue's memory, avoiding any temporary objects or expensive move operations.

#include <string>
#include <vector>

struct MyData {
    std::string s;
    std::vector<int> v;
    MyData(const std::string& str, size_t n) : s(str), v(n) {}
};

void emplace_producer(LockFreeSpscQueue<MyData>& queue)
{
    // Try to start a transaction for up to 16 items.
    // This returns an optional; it will be empty if the queue is too full.
    if (auto transaction = queue.try_start_write(16))
    {
        // We have a reservation. Its actual capacity might be less than 16.
        // We should always check the return value of try_emplace in robust code.

        // Attempt to emplace the first item.
        bool success1 = transaction->try_emplace("hello", 100);

        // We can chain calls, but the second will only succeed if the first did
        // and the transaction's capacity was at least 2.
        bool success2 = false;
        if (success1) {
            success2 = transaction->try_emplace("world", 200);
        }

        // The transaction now knows whether 0, 1, or 2 items were added.
        // When it is destroyed here, it will commit the correct amount automatically.
        // If the reservation was for only 1 slot, success2 will be false, and only
        // the first item ("hello") will be committed.
    }
}

The try_push Method (For Simple Types or Pre-existing Objects)

try_push is ideal for trivial types (like int or float) or when you already have an existing object that you want to move into the queue.

void high_frequency_producer(LockFreeSpscQueue<int>& queue)
{
    int next_item = 0;

    // Keep writing until we've sent 100 items.
    while (next_item < 100)
    {
        // 1. Try to start a transaction for a batch of up to 32 items.
        //    This returns an optional; it will be empty if the queue is too full.
        if (auto transaction = queue.try_start_write(32))
        {
            // 2. We got a reservation! Push items into it.
            //    Use try_push for this simple integer type.
            while (next_item < 100 && transaction->try_push(next_item)) {
                // Item was successfully pushed into the transaction's reserved space.
                next_item++;
            }
            // 3. The transaction automatically commits the items that were
            //    pushed when it goes out of scope here.
        } else {
            // The queue was too full to even start a transaction, yield to the consumer.
            std::this_thread::yield();
        }
    }
}

Move semantics are also supported: transaction->try_push(std::move(my_message));

Batch Writes (Convenience): try_write

This is the recommended method for transferring a pre-prepared batch of data. It's safe, convenient, and highly efficient. It accepts a lambda that is only called if space is available.

void batch_producer(LockFreeSpscQueue<Message>& queue)
{
    // Prepare a local batch of data to send.
    std::vector<Message> local_batch = { {0, "a"}, {1, "b"}, {2, "c"} };

    size_t items_written = 0;
    while (items_written < local_batch.size()) {
        // Create a view of the remaining items we need to write.
        std::span<const Message> sub_batch(local_batch.data() + items_written,
                                           local_batch.size() - items_written);

        // Ask `try_write` to write the sub-batch. It will write as many items as
        // it can and return the count. The lambda handles the actual copy.
        items_written += queue.try_write(sub_batch.size(), [&](auto block1, auto block2) {
            // --- Copy Semantics ---
            // std::copy_n performs a copy-assignment for each element.
            std::copy_n(sub_batch.begin(), block1.size(), block1.begin());
            if (!block2.empty()) {
                std::copy_n(sub_batch.begin() + block1.size(), block2.size(), block2.begin());
            }

            // --- Move Semantics ---
            // To perform a move, adapt the source iterator with `std::make_move_iterator`.
            // The std::copy_n algorithm will then invoke the move-assignment operator.
            // Note: the source container (`sub_batch` here) must be mutable.
            //
            // auto move_iter = std::make_move_iterator(sub_batch.begin());
            // std::copy_n(move_iter, block1.size(), block1.begin());
            // if (!block2.empty()) {
            //     std::copy_n(move_iter + block1.size(), block2.size(), block2.begin());
            // }
        });

        // If the queue was full, items_written won't increase, and we'll loop.
    }
}

Batch Writes (Low-Level): prepare_write

This is the low-level "engine" that the other write methods are built upon. It provides maximum control by returning a WriteScope object that grants direct std::span access to the underlying buffer.

void low_level_batch_producer(LockFreeSpscQueue<Message>& queue,
                              const std::vector<Message>& data)
{
    // Ask to reserve up to `data.size()` items in the queue for writing.
    auto write_scope = queue.prepare_write(data.size());

    // `get_items_written()` returns the actual number of slots reserved,
    // which may be less than what we asked for if the queue was partially full.
    size_t items_to_write = write_scope.get_items_written();

    if (items_to_write > 0) {
        auto block1 = write_scope.get_block1();
        auto block2 = write_scope.get_block2();

        // Copy data into the contiguous memory blocks.
        // Note: To move data, wrap the source iterator with `std::make_move_iterator`
        // as in the example above.
        std::copy_n(data.begin(), block1.size(), block1.begin());
        if (!block2.empty()) {
            std::copy_n(data.begin() + block1.size(), block2.size(), block2.begin());
        }

    }
    // The write is automatically committed when `write_scope` is destroyed.
}

Note: This low-level example performs a single write attempt. In a real-world scenario, you would typically place this logic inside a loop (similar to the try_write example) to handle cases where the queue is initially full and to ensure all data is eventually sent.


3. Reading from the Queue (Consumer)

There are two primary ways to read from the queue.

Batch Reads (Convenience): try_read

This is the recommended and most efficient method for consuming data. It accepts a lambda which is given direct std::span access to the largest available contiguous blocks of readable data.

void batch_consumer(LockFreeSpscQueue<Message>& queue)
{
    // Ask to read up to 16 items at a time.
    const size_t items_read = queue.try_read(16, [&](auto block1, auto block2) {
        // Process all items in the first contiguous block.
        for (const auto& msg : block1) {
            std::cout << "Consumer: Got ID " << msg.id << "\n";
        }
        // Process all items in the second (wrapped-around) block.
        for (const auto& msg : block2) {
            std::cout << "Consumer: Got ID " << msg.id << "\n";
        }
    });

    if (items_read == 0) {
        // The queue was empty.
        std::this_thread::yield();
    }
}

Batch Reads (Low-Level): prepare_read

This is the low-level counterpart to prepare_write. It gives you a ReadScope object that provides direct access to the readable memory blocks.

void low_level_batch_consumer(LockFreeSpscQueue<Message>& queue)
{
    std::vector<Message> consumed_data;

    // Ask to read up to 16 items from the queue.
    auto read_scope = queue.prepare_read(16);

    if (read_scope.get_items_read() > 0)
    {
        // Copy the data from the queue's buffer into our local vector.
        // On a non-const scope returns a mutable std::span<T>.
        auto block1 = read_scope.get_block1();
        auto block2 = read_scope.get_block2();

        consumed_data.reserve(read_scope.get_items_read());

        // Copy block1
        consumed_data.insert(consumed_data.end(), block1.begin(), block1.end());

        // Copy block2 if needed
        if (!block2.empty()) {
            consumed_data.insert(consumed_data.end(), block2.begin(), block2.end());
        }

        // --- Move Semantics ---
        // To move data out of the queue, use std::make_move_iterator. This is highly
        // efficient for objects that are expensive to copy.
        //
        // std::copy(std::make_move_iterator(block1.begin()),
        //           std::make_move_iterator(block1.end()),
        //           std::back_inserter(consumed_data));
        //
        // if (!block2.empty()) {
        //     std::copy(std::make_move_iterator(block2.begin()),
        //               std::make_move_iterator(block2.end()),
        //               std::back_inserter(consumed_data));
        // }
    }
    // The read is automatically committed when `read_scope` is destroyed.
}

4. Range-Based API (std::ranges)

For maximum convenience and compatibility with modern C++ algorithms, both the WriteScope and ReadScope objects are fully compliant with the C++20 std::ranges library. This allows for direct, elegant iteration, completely abstracting away the two-block nature of the circular buffer.

Writing with a Range-Based for Loop

This is a clear and idiomatic way to fill the reserved slots in a write transaction.

void range_based_producer(LockFreeSpscQueue<int>& queue)
{
    // Ask to reserve space for 10 items.
    auto write_scope = queue.prepare_write(10);

    // Only proceed if we were actually granted space.
    if (write_scope.get_items_written() > 0) {
        // The WriteScope object is directly iterable.
        // We can iterate over the empty slots and write to them.
        int i = 0;
        for (int& slot : write_scope) {
            slot = i++;
        }
    }
    // The write is automatically committed when `write_scope` is destroyed.
}

Reading with a Range-Based for Loop

This is the simplest way to consume data from the queue. The custom iterator handles the jump between the two memory blocks transparently.

void range_based_consumer(LockFreeSpscQueue<int>& queue)
{
    // Ask to read up to 16 items at a time.
    auto read_scope = queue.prepare_read(16);

    // Only proceed if there are items to read.
    if (read_scope.get_items_read() > 0)
    {
        // The ReadScope object is a C++20 range.
        // The for loop will seamlessly iterate over block1 and then block2.
        for (const int& item : read_scope)
        {
            std::cout << "Consumer: Got " << item << "\n";
        }
    }
    // The read is automatically committed when `read_scope` is destroyed.
}

Using with std::ranges Algorithms

Because the Scope objects are proper ranges, you can use them with the powerful algorithms from the <algorithm> and <numeric> headers.

#include <numeric>   // For std::accumulate
#include <algorithm> // For std::ranges::copy
#include <ranges>    // For std::views

void algorithm_example(LockFreeSpscQueue<int>& queue)
{
    // Use std::ranges::copy to fill a write scope from another container.
    auto write_scope_1 = queue.prepare_write(10);
    if (write_scope_1.get_items_written() > 0)
    {
        std::vector<int> source_data(write_scope_1.get_items_written(), 42); // Fill with 42s
        std::ranges::copy(source_data, write_scope_1.begin());
    }

    // Use std::accumulate to sum all the items in a read scope.
    auto read_scope = queue.prepare_read(10);
    if (read_scope.get_items_read() > 0)
    {
        long long sum = std::accumulate(read_scope.begin(), read_scope.end(), 0LL);
        std::cout << "Sum of items in queue: " << sum << "\n";
    }

    // -- More Examples --
    std::vector<int> source_data(16, 42); // A vector with 16 items of value 42.

    size_t total_written = 0;
    while (total_written < source_data.size())
    {
        // Ask the queue for space for the remaining items.
        auto write_scope_2 = queue.prepare_write(source_data.size() - total_written);
        // The number of slots we were actually provided.
        const size_t can_write_count = write_scope_2.get_items_written();

        if (can_write_count > 0)
        {
            // --- Copy Semantics ---
            // Create a sub-span of our source data that is _exactly_ the size of the
            // space we were granted.
            std::span source_sub_batch(source_data.data() + total_written,
                                       can_write_count);

            // Copy the sub-batch into the write_scope's range.
            std::ranges::copy(source_sub_batch, write_scope_2.begin());

            // --- Move Semantics ---
            // To move instead of copy, adapt the source range. Note: the source container
            // (`source_sub_batch` here) must be mutable.
            //
            // The idiomatic C++20 approach is to use `std::views::transform` with a lambda.
            //
            // auto move_view = source_sub_batch | std::views::transform([](auto&& i){ return std::move(i); });
            // std::ranges::copy(move_view, write_scope_2.begin());
            //
            // Note: C++23 introduces `std::views::as_rvalue`, a more direct tool.
            //
            // std::ranges::copy(source_sub_batch | std::views::as_rvalue, write_scope_2.begin());

            total_written += can_write_count;
        }
        else
        {
            // The queue was full, wait a moment and try again.
            std::this_thread::yield();
        }
    }
}