Skip to content

Commit 9a5dc1d

Browse files
committed
Add stress test for SPSC queue
Introduces a new stress test (stress_test.cpp) for the lock-free single-producer single-consumer queue, exercising concurrent producer and consumer threads with randomized chunk sizes. Updates CMakeLists.txt to build and register the new test.
1 parent 1d56cfd commit 9a5dc1d

2 files changed

Lines changed: 156 additions & 0 deletions

File tree

tests/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,8 @@ add_executable(test_convenience_api test_convenience_api.cpp)
1717
target_link_libraries(test_convenience_api PRIVATE spsc_queue Threads::Threads)
1818
add_test(NAME ConvenienceApi COMMAND test_convenience_api)
1919

20+
# --- Test: Stress Test Consumer - Producer ---
21+
add_executable(stress_test stress_test.cpp)
22+
target_link_libraries(stress_test PRIVATE spsc_queue Threads::Threads)
23+
add_test(NAME StressTest COMMAND stress_test)
24+

tests/stress_test.cpp

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#include <iostream>
2+
#include <vector>
3+
#include <iterator>
4+
#include <thread>
5+
#include <random>
6+
#include <chrono>
7+
8+
#include "LockFreeSpscQueue.h"
9+
10+
// Helper `assert` which aborts also in the release builds
11+
#define always_assert(condition) \
12+
do { \
13+
if (!(condition)) { \
14+
std::cerr << "Assertion failed: " << #condition \
15+
<< " at " << __FILE__ << ":" << __LINE__ << std::endl; \
16+
std::abort(); \
17+
} \
18+
} while(0)
19+
20+
// Stress test with concurrent producer and consumer
21+
void stress_test()
22+
{
23+
std::cout << "\n=== Stress Test ===\n";
24+
const size_t TOTAL_PRODUCER_WRITES = 100'000;
25+
const size_t MAX_ITEMS_TO_WRITE = 4;
26+
std::vector<int> buffer(4); // Small buffer to force wrap-around
27+
LockFreeSpscQueue<int> queue(buffer);
28+
std::vector<int> produced_data, consumed_data;
29+
produced_data.reserve(TOTAL_PRODUCER_WRITES * MAX_ITEMS_TO_WRITE);
30+
consumed_data.reserve(TOTAL_PRODUCER_WRITES * MAX_ITEMS_TO_WRITE);
31+
std::atomic<bool> producer_done{false}; // Flag to signal producer completion
32+
const auto start_time = std::chrono::steady_clock::now();
33+
34+
// Producer thread: Write random-sized chunks
35+
auto producer = [&]() {
36+
std::random_device rd;
37+
std::mt19937 gen(rd());
38+
std::uniform_int_distribution<size_t> size_dist(1, MAX_ITEMS_TO_WRITE);
39+
std::vector<int> data_buffer(MAX_ITEMS_TO_WRITE);
40+
std::uniform_int_distribution<int> value_dist(1, 100);
41+
size_t total_written = 0;
42+
43+
for (int i = 0; i < TOTAL_PRODUCER_WRITES; ++i) {
44+
size_t to_write = size_dist(gen);
45+
auto write_scope = queue.prepare_write(to_write);
46+
size_t items_to_write = write_scope.get_items_written();
47+
48+
if (items_to_write == 0) {
49+
std::this_thread::yield();
50+
continue;
51+
}
52+
53+
auto data = std::span{data_buffer.begin(), items_to_write};
54+
for (auto& item : data) {
55+
item = value_dist(gen);
56+
}
57+
58+
std::copy_n(data.begin(),
59+
write_scope.get_block1().size(),
60+
write_scope.get_block1().begin());
61+
62+
if (write_scope.get_block2().size() > 0) {
63+
std::copy_n(data.begin() + write_scope.get_block1().size(),
64+
write_scope.get_block2().size(),
65+
write_scope.get_block2().begin());
66+
}
67+
68+
// Collect produced data for verification
69+
produced_data.insert(produced_data.end(), data.begin(), data.end());
70+
total_written += items_to_write;
71+
72+
std::cout << "Producer: Wrote " << items_to_write
73+
<< " items (total: " << total_written << ")\n";
74+
75+
std::this_thread::yield();
76+
}
77+
producer_done.store(true, std::memory_order_release); // Signal completion
78+
std::cout << "Producer: Finished, total written: " << total_written << "\n";
79+
};
80+
81+
// Consumer thread: Read random-sized chunks
82+
auto consumer = [&]() {
83+
std::random_device rd;
84+
std::mt19937 gen(rd());
85+
std::uniform_int_distribution<size_t> size_dist(1, 4);
86+
size_t total_read = 0;
87+
88+
while (true) {
89+
// Check for timeout (e.g., 10 seconds)
90+
if (std::chrono::steady_clock::now() - start_time > std::chrono::seconds(10)) {
91+
std::cout << "Consumer: Timeout reached, exiting\n";
92+
break;
93+
}
94+
95+
// Exit if producer is done and queue is empty
96+
if ( producer_done.load(std::memory_order_acquire)
97+
&& queue.get_num_items_ready() == 0)
98+
{
99+
std::cout << "Consumer: Producer done and queue empty, exiting\n";
100+
break;
101+
}
102+
103+
size_t to_read = size_dist(gen);
104+
auto read_scope = queue.prepare_read(to_read);
105+
size_t items_to_read = read_scope.get_items_read();
106+
107+
if (items_to_read == 0) {
108+
std::this_thread::yield();
109+
continue;
110+
}
111+
112+
// Collect consumed data for verification
113+
std::copy(read_scope.get_block1().begin(),
114+
read_scope.get_block1().end(),
115+
std::inserter(consumed_data, consumed_data.end()));
116+
117+
if (read_scope.get_block2().size() > 0) {
118+
std::copy(read_scope.get_block2().begin(),
119+
read_scope.get_block2().end(),
120+
std::inserter(consumed_data, consumed_data.end()));
121+
}
122+
123+
total_read += items_to_read;
124+
125+
std::cout << "Consumer: Read " << items_to_read
126+
<< " items (total: " << total_read << ")\n";
127+
128+
std::this_thread::yield();
129+
}
130+
131+
std::cout << "Consumer: Finished, total read: " << total_read << "\n";
132+
};
133+
134+
// Run producer and consumer concurrently
135+
std::thread producer_thread(producer);
136+
std::thread consumer_thread(consumer);
137+
producer_thread.join();
138+
consumer_thread.join();
139+
140+
// Verify that consumed data matches produced data
141+
always_assert(produced_data.size() == consumed_data.size());
142+
always_assert(produced_data == consumed_data);
143+
std::cout << "Stress test passed: Produced and consumed data match\n";
144+
}
145+
146+
int main()
147+
{
148+
stress_test();
149+
return 0;
150+
}
151+

0 commit comments

Comments
 (0)