Skip to content

Commit 0a0d79d

Browse files
committed
cc tests: make userver tests friendly to thread-pinning scheduler
commit_hash:179b671cc60f2b270fadd79dad3b5d18cac253b7
1 parent 119e01a commit 0a0d79d

7 files changed

Lines changed: 50 additions & 70 deletions

File tree

core/src/concurrent/impl/striped_read_indicator_test.cpp

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
#include <userver/concurrent/impl/striped_read_indicator.hpp>
22

3+
#include <chrono>
34
#include <thread>
5+
#include <vector>
46

5-
#include <userver/compiler/impl/tsan.hpp>
67
#include <userver/engine/async.hpp>
8+
#include <userver/engine/deadline.hpp>
79
#include <userver/engine/mutex.hpp>
8-
#include <userver/engine/sleep.hpp>
10+
#include <userver/engine/task/task_with_result.hpp>
911
#include <userver/utest/utest.hpp>
1012

1113
USERVER_NAMESPACE_BEGIN
1214

13-
#if !USERVER_IMPL_HAS_TSAN
1415
namespace {
1516
constexpr std::size_t kReadersCount = 3;
1617
constexpr std::size_t kCheckersCount = 1;
@@ -21,13 +22,13 @@ UTEST_MT(StripedReadIndicator, LockPassingStress, kReadersCount + kCheckersCount
2122
concurrent::impl::StripedReadIndicatorLock indicator_lock = indicator.GetLock();
2223
engine::Mutex ping_pong_mutex;
2324

24-
std::atomic<bool> keep_running{true};
25+
const auto test_deadline = engine::Deadline::FromDuration(std::chrono::milliseconds{100});
2526
std::vector<engine::TaskWithResult<void>> tasks;
2627
tasks.reserve(kReadersCount + kCheckersCount);
2728

2829
for (std::size_t i = 0; i < kReadersCount; ++i) {
2930
tasks.push_back(engine::AsyncNoTracing([&] {
30-
while (keep_running) {
31+
while (!test_deadline.IsReached()) {
3132
const std::lock_guard ping_pong_mutex_lock{ping_pong_mutex};
3233
auto lock_copy = indicator_lock;
3334
indicator_lock = std::move(lock_copy);
@@ -39,22 +40,19 @@ UTEST_MT(StripedReadIndicator, LockPassingStress, kReadersCount + kCheckersCount
3940

4041
for (std::size_t i = 0; i < kCheckersCount; ++i) {
4142
tasks.push_back(engine::AsyncNoTracing([&] {
42-
while (keep_running) {
43+
while (!test_deadline.IsReached()) {
4344
ASSERT_FALSE(indicator.IsFree());
4445
}
4546
}));
4647
}
4748

48-
engine::SleepFor(std::chrono::milliseconds{100});
49-
keep_running = false;
5049
for (auto& task : tasks) {
5150
task.Get();
5251
}
5352

5453
indicator_lock = concurrent::impl::StripedReadIndicatorLock{};
5554
EXPECT_TRUE(indicator.IsFree());
5655
}
57-
#endif
5856

5957
UTEST(StripedReadIndicator, Metrics) {
6058
concurrent::impl::StripedReadIndicator indicator;

core/src/concurrent/intrusive_walkable_pool_test.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
#include <concurrent/intrusive_walkable_pool.hpp>
22

3-
#include <atomic>
43
#include <cstddef>
54
#include <thread>
65
#include <unordered_set>
76
#include <vector>
87

98
#include <userver/engine/async.hpp>
10-
#include <userver/engine/sleep.hpp>
119
#include <userver/engine/task/task_with_result.hpp>
1210
#include <userver/utest/utest.hpp>
1311
#include <userver/utils/assert.hpp>
@@ -79,15 +77,17 @@ UTEST_MT(IntrusiveWalkablePool, TortureTest, 4) {
7977
constexpr std::size_t kNodesPerTask = 3;
8078
CheckedIntPool pool;
8179

82-
std::atomic<bool> keep_running{true};
80+
// Let worker tasks stop by their own deadline: with a pinning queue the
81+
// main test task may not resume promptly enough to flip a shared stop flag.
82+
const auto test_deadline = engine::Deadline::FromDuration(50ms);
8383
std::vector<engine::TaskWithResult<void>> tasks;
8484
tasks.reserve(GetThreadCount() - 1);
8585

8686
for (std::size_t i = 0; i < GetThreadCount() - 2; ++i) {
8787
tasks.push_back(engine::AsyncNoTracing([&] {
8888
CheckedInt* nodes[kNodesPerTask]{};
8989

90-
while (keep_running) {
90+
while (!test_deadline.IsReached()) {
9191
for (auto*& node : nodes) {
9292
node = &pool.Acquire();
9393
node->CheckAlive();
@@ -101,7 +101,7 @@ UTEST_MT(IntrusiveWalkablePool, TortureTest, 4) {
101101
}
102102

103103
tasks.push_back(engine::AsyncNoTracing([&] {
104-
while (keep_running) {
104+
while (!test_deadline.IsReached()) {
105105
std::size_t node_count = 0;
106106
pool.Walk([&](CheckedInt& node) {
107107
node.CheckAlive();
@@ -111,8 +111,6 @@ UTEST_MT(IntrusiveWalkablePool, TortureTest, 4) {
111111
}
112112
}));
113113

114-
engine::SleepFor(50ms);
115-
keep_running = false;
116114
for (auto& task : tasks) {
117115
task.Get();
118116
}

core/src/engine/errno_test.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
#include <userver/utest/utest.hpp>
22

3-
#include <atomic>
43
#include <cerrno>
54
#include <chrono>
65
#include <condition_variable>
76
#include <mutex>
87
#include <thread>
98
#include <vector>
109

11-
#include <userver/compiler/impl/tsan.hpp>
1210
#include <userver/engine/async.hpp>
1311
#include <userver/engine/deadline.hpp>
1412
#include <userver/engine/sleep.hpp>
@@ -17,14 +15,14 @@
1715

1816
USERVER_NAMESPACE_BEGIN
1917

20-
// Test is not ready to TSan non-migrating scheduler
21-
#if !USERVER_IMPL_HAS_TSAN
2218
namespace {
23-
constexpr std::size_t kNumThreads = 2;
19+
constexpr std::size_t kNumThreads = 3;
2420
} // namespace
2521

2622
UTEST_MT(Errno, IsCoroLocal, kNumThreads) {
27-
const auto deadline = engine::Deadline::FromDuration(utest::kMaxTestWaitTime);
23+
// A pinning queue is allowed to keep a coroutine on its original worker.
24+
// The test should verify errno after an actual switch, not require one.
25+
const auto deadline = engine::Deadline::FromDuration(std::chrono::milliseconds{100});
2826

2927
std::size_t threads_started{0};
3028
std::size_t threads_switched{0};
@@ -61,14 +59,13 @@ UTEST_MT(Errno, IsCoroLocal, kNumThreads) {
6159
engine::Yield();
6260
}
6361
}
64-
return false;
62+
return true;
6563
}));
6664
}
6765

6866
for (auto& task : tasks) {
6967
EXPECT_TRUE(task.Get());
7068
}
7169
}
72-
#endif
7370

7471
USERVER_NAMESPACE_END

core/src/engine/single_use_event_test.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ UTEST(SingleUseEvent, Sample) {
7575
sender.Get();
7676
}
7777

78-
#if !USERVER_IMPL_HAS_TSAN
7978
UTEST_MT(SingleUseEvent, SimpleTaskQueue, 5) {
8079
struct SimpleTask final {
8180
std::uint64_t request;
@@ -85,17 +84,19 @@ UTEST_MT(SingleUseEvent, SimpleTaskQueue, 5) {
8584

8685
boost::lockfree::queue<SimpleTask*> task_queue{1};
8786

88-
std::atomic<bool> keep_running_clients{true};
8987
std::atomic<bool> keep_running_server{true};
9088

89+
// Clients must not rely on the main test task resuming to flip a stop flag:
90+
// pinning queues can keep the main task behind a busy worker-local queue.
91+
const auto test_deadline = engine::Deadline::FromDuration(50ms);
9192
std::vector<engine::TaskWithResult<void>> client_tasks;
9293
client_tasks.reserve(GetThreadCount() - 1);
9394

9495
for (std::size_t i = 0; i < GetThreadCount() - 1; ++i) {
9596
client_tasks.push_back(utils::Async("client", [&, i] {
9697
std::uint64_t request = i;
9798

98-
while (keep_running_clients) {
99+
while (!test_deadline.IsReached()) {
99100
SimpleTask task{request, {}, {}};
100101
task_queue.push(&task);
101102
task.completion.WaitNonCancellable();
@@ -110,6 +111,9 @@ UTEST_MT(SingleUseEvent, SimpleTaskQueue, 5) {
110111
while (keep_running_server) {
111112
SimpleTask* task{};
112113
if (!task_queue.pop(task)) {
114+
// Avoid burning the worker forever while pinned client tasks are
115+
// waiting for their own worker-local queues to make progress.
116+
engine::Yield();
113117
continue;
114118
}
115119

@@ -118,16 +122,12 @@ UTEST_MT(SingleUseEvent, SimpleTaskQueue, 5) {
118122
}
119123
});
120124

121-
engine::SleepFor(50ms);
122-
123-
keep_running_clients = false;
124125
for (auto& task : client_tasks) {
125126
task.Get();
126127
}
127128
keep_running_server = false;
128129
server_task.Get();
129130
}
130-
#endif
131131

132132
UTEST(SingleUseEvent, Cancellation) {
133133
engine::SingleUseEvent event;

core/src/rcu/rcu_map_test.cpp

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
#include <mutex>
99
#include <thread>
1010

11-
#include <userver/compiler/impl/tsan.hpp>
11+
#include <userver/engine/deadline.hpp>
1212
#include <userver/engine/sleep.hpp>
1313
#include <userver/utest/utest.hpp>
1414
#include <userver/utils/algo.hpp>
@@ -161,16 +161,15 @@ UTEST(RcuMap, Snapshot) {
161161
EXPECT_EQ(2, *second_snap.at("b"));
162162
}
163163

164-
#if !USERVER_IMPL_HAS_TSAN
165164
UTEST_MT(RcuMap, ConcurrentUpdates, 4) {
166165
rcu::RcuMap<int, std::atomic<uint32_t>> map;
167166
std::array<engine::TaskWithResult<void>, 4> workers;
168-
std::atomic<bool> stop_flag{false};
167+
const auto test_deadline = engine::Deadline::FromDuration(std::chrono::milliseconds{100});
169168

170169
for (size_t i = 0; i < workers.size(); ++i) {
171-
workers[i] = utils::Async("writer", [i, &map, &stop_flag] {
170+
workers[i] = utils::Async("writer", [i, &map, &test_deadline] {
172171
const uint32_t mask = 0xFFu << (i * 8);
173-
while (!stop_flag) {
172+
while (!test_deadline.IsReached()) {
174173
*map[i << 8] = -1;
175174
for (uint8_t v = 1; v != 0; ++v) {
176175
ASSERT_TRUE(map.Get((i << 8) + v - 1));
@@ -191,8 +190,6 @@ UTEST_MT(RcuMap, ConcurrentUpdates, 4) {
191190
});
192191
}
193192

194-
engine::SleepFor(std::chrono::milliseconds(100));
195-
stop_flag = true;
196193
for (auto& w : workers) {
197194
w.Get();
198195
}
@@ -214,12 +211,12 @@ UTEST_MT(RcuMap, ConcurrentTryEmplace, 16) {
214211
tasks.reserve(kTaskCount);
215212
for (std::size_t i = 0; i < kTaskCount; ++i) {
216213
tasks.push_back(engine::AsyncNoTracing([&map, &insertions, i] {
217-
auto key = std::string(20 + i / 2, 'x');
214+
auto key = std::string(20 + (i / 2), 'x');
218215
auto res = map.TryEmplace(key, i);
219216
if (res.inserted) {
220217
++insertions;
221218
}
222-
EXPECT_EQ(*res.value / 2, i / 2);
219+
EXPECT_EQ((*res.value) / 2, i / 2);
223220
}));
224221
}
225222
for (auto& task : tasks) {
@@ -228,7 +225,6 @@ UTEST_MT(RcuMap, ConcurrentTryEmplace, 16) {
228225
EXPECT_EQ(insertions, kTaskCount / 2);
229226
}
230227
}
231-
#endif
232228

233229
UTEST(RcuMap, IterStability) {
234230
rcu::RcuMap<int, int> map;

core/src/rcu/rcu_test.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -418,17 +418,19 @@ constexpr std::size_t kTotalTasks = kReadablePtrPingPongTasks + kReadingTasks +
418418

419419
UTEST_MT(Rcu, TortureTest, kTotalTasks) {
420420
rcu::Variable<CleaningUpInt> data{1};
421-
std::atomic<bool> keep_running{true};
422421

423422
engine::Mutex ping_pong_mutex;
424423
rcu::ReadablePtr<CleaningUpInt> ptr = data.Read();
425424

425+
// Pinning queues may delay the main test task while worker-local queues are
426+
// busy, so the stress tasks must be able to finish without an external flag.
427+
const auto test_deadline = engine::Deadline::FromDuration(std::chrono::milliseconds{100});
426428
std::vector<engine::TaskWithResult<void>> tasks;
427429
tasks.reserve(kTotalTasks - 1);
428430

429431
for (std::size_t i = 0; i < kReadablePtrPingPongTasks; ++i) {
430432
tasks.push_back(engine::AsyncNoTracing([&] {
431-
while (keep_running) {
433+
while (!test_deadline.IsReached()) {
432434
{
433435
const std::lock_guard lock(ping_pong_mutex);
434436
// copy a ptr created by another thread
@@ -442,7 +444,7 @@ UTEST_MT(Rcu, TortureTest, kTotalTasks) {
442444

443445
for (std::size_t i = 0; i < kReadingTasks; ++i) {
444446
tasks.push_back(engine::AsyncNoTracing([&] {
445-
while (keep_running) {
447+
while (!test_deadline.IsReached()) {
446448
const auto local_ptr = data.Read();
447449
ASSERT_GT(local_ptr->value, 0);
448450
}
@@ -451,15 +453,16 @@ UTEST_MT(Rcu, TortureTest, kTotalTasks) {
451453

452454
for (std::size_t i = 0; i < kWritingTasks; ++i) {
453455
tasks.push_back(engine::AsyncNoTracing([&] {
454-
while (keep_running) {
456+
while (!test_deadline.IsReached()) {
455457
const auto old = data.Read();
456458
data.Assign(CleaningUpInt{old->value + 1});
457459
}
458460
}));
459461
}
460462

461-
engine::SleepFor(std::chrono::milliseconds{100});
462-
keep_running = false;
463+
for (auto& task : tasks) {
464+
task.Get();
465+
}
463466
}
464467

465468
UTEST(Rcu, WritablePtrUnlocksInCommit) {

0 commit comments

Comments
 (0)