Skip to content

Commit 28417ec

Browse files
committed
Simplify AIO timeout failure handling
1 parent 417bb7c commit 28417ec

3 files changed

Lines changed: 56 additions & 19 deletions

File tree

ucm/store/detail/template/task_wrapper.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#ifndef UNIFIEDCACHE_STORE_DETAIL_TEMPLATE_TASK_WRAPPER_H
2525
#define UNIFIEDCACHE_STORE_DETAIL_TEMPLATE_TASK_WRAPPER_H
2626

27+
#include <memory>
2728
#include <shared_mutex>
2829
#include <unordered_map>
2930
#include "logger/logger.h"
@@ -43,11 +44,11 @@ class TaskWrapper {
4344
using TaskIdSet = HashSet<TaskHandle>;
4445
size_t timeoutMs_;
4546
TaskIdSet failureSet_;
46-
TaskIdSet timeoutSet_;
4747
TaskSet tasks_{};
4848
std::shared_mutex mutex_{};
4949
virtual void Dispatch(TaskPtr t, WaiterPtr w) = 0;
5050
virtual void Cancel(TaskPtr t) {}
51+
virtual bool IsAio() const { return false; }
5152

5253
public:
5354
Expected<TaskHandle> Submit(Task task)
@@ -111,18 +112,13 @@ class TaskWrapper {
111112
" IO.",
112113
taskId);
113114
}
114-
// Late IO callbacks must still see failure and avoid committing success.
115+
// For Aio, Late IO callbacks must still see failure and avoid committing success.
116+
if (!IsAio()) { failureSet_.Remove(taskId); }
115117
return Status::Timeout();
116118
}
117119
auto failure = failureSet_.Contains(taskId);
118-
auto timeout = timeoutSet_.Contains(taskId);
119-
if (timeout) [[unlikely]] {
120-
timeoutSet_.Remove(taskId);
121-
if (failure) { failureSet_.Remove(taskId); }
122-
return Status::Timeout();
123-
}
124120
if (failure) [[unlikely]] {
125-
failureSet_.Remove(taskId);
121+
if (!IsAio()) { failureSet_.Remove(taskId); }
126122
return Status::Error();
127123
}
128124
return Status::OK();

ucm/store/posix/cc/io_engine_aio.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class IoEngineAio : public Detail::TaskWrapper<TransTask, Detail::TaskHandle> {
7373
}
7474

7575
private:
76+
bool IsAio() const override { return true; }
7677
template <bool dump>
7778
static void UpdateWaitMetrics(double wait)
7879
{
@@ -154,7 +155,6 @@ class IoEngineAio : public Detail::TaskWrapper<TransTask, Detail::TaskHandle> {
154155
auto status = dump ? aio_.WriteAsync(std::move(io)) : aio_.ReadAsync(std::move(io));
155156
if (status.Failure()) {
156157
if (status == Status::Timeout()) {
157-
timeoutSet_.Insert(tid);
158158
IncrementAioTimeoutMetric();
159159
} else {
160160
IncrementIoErrorMetric();
@@ -277,7 +277,6 @@ class IoEngineAio : public Detail::TaskWrapper<TransTask, Detail::TaskHandle> {
277277
auto pending = w ? w->Pending() : 0;
278278
UC_WARN("AIO task({}) force-completing; pending latch count before abort={}.", id, pending);
279279
failureSet_.Insert(id);
280-
timeoutSet_.Insert(id);
281280
aio_.CancelTask(id);
282281
blockOperator_.CancelQueued(id);
283282
if (w) { w->Abort(); }

ucm/store/test/case/posix/posix_store_test.cc

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,14 @@ constexpr size_t AIO_TEST_DATA_SIZE = 4096;
4242
using BufferPtr = std::unique_ptr<void, decltype(&std::free)>;
4343

4444
UC::Detail::Dictionary MakeAioConfig(const std::string& path, size_t timeoutMs = 50,
45-
size_t openConcurrency = 1)
45+
size_t openConcurrency = 1, size_t shardsPerBlock = 1)
4646
{
4747
UC::Detail::Dictionary config;
4848
config.SetNumber("device_id", 0);
4949
config.Set("storage_backends", std::vector<std::string>{path});
5050
config.SetNumber("tensor_size", AIO_TEST_DATA_SIZE);
5151
config.SetNumber("shard_size", AIO_TEST_DATA_SIZE);
52-
config.SetNumber("block_size", AIO_TEST_DATA_SIZE);
52+
config.SetNumber("block_size", AIO_TEST_DATA_SIZE * shardsPerBlock);
5353
config.Set("posix_io_engine", std::string("aio"));
5454
config.SetNumber("timeout_ms", timeoutMs);
5555
config.SetNumber("posix_open_concurrency", openConcurrency);
@@ -92,10 +92,13 @@ double ReadCounter(const std::string& name)
9292

9393
class StallingOpenHook {
9494
public:
95-
StallingOpenHook()
95+
explicit StallingOpenHook(bool succeedAfterRelease = false)
96+
: succeedAfterRelease_{succeedAfterRelease}
9697
{
9798
UC::PosixStore::TestHooks::SetOpenHook(
98-
[this](const std::string&, int32_t, mode_t) { return Run(); });
99+
[this](const std::string& path, int32_t flags, mode_t mode) {
100+
return Run(path, flags, mode);
101+
});
99102
}
100103
~StallingOpenHook()
101104
{
@@ -116,7 +119,7 @@ class StallingOpenHook {
116119
}
117120

118121
private:
119-
int32_t Run()
122+
int32_t Run(const std::string& path, int32_t flags, mode_t mode)
120123
{
121124
{
122125
std::lock_guard<std::mutex> lock{mutex_};
@@ -126,17 +129,23 @@ class StallingOpenHook {
126129
cv_.notify_all();
127130
std::unique_lock<std::mutex> lock{mutex_};
128131
cv_.wait(lock, [this] { return release_; });
132+
auto succeed = succeedAfterRelease_;
133+
lock.unlock();
134+
auto fd = succeed ? ::open(path.c_str(), flags, mode) : -1;
135+
auto err = succeed ? errno : EIO;
136+
lock.lock();
129137
--active_;
130138
cv_.notify_all();
131-
errno = EIO;
132-
return -1;
139+
errno = err;
140+
return fd;
133141
}
134142

135143
private:
136144
std::mutex mutex_;
137145
std::condition_variable cv_;
138146
bool entered_{false};
139147
bool release_{false};
148+
bool succeedAfterRelease_{false};
140149
size_t active_{0};
141150
};
142151

@@ -344,7 +353,7 @@ TEST_F(UCPosixStoreTest, AioCheckFinishesLostCompletionAfterDeadline)
344353
}
345354

346355
ASSERT_TRUE(finished);
347-
ASSERT_EQ(store.Wait(handle.Value()), UC::Status::Timeout());
356+
ASSERT_EQ(store.Wait(handle.Value()), UC::Status::Error());
348357
}
349358

350359
TEST(UCAioImplTest, SubmitEagainHonorsDeadline)
@@ -406,3 +415,36 @@ TEST_F(UCPosixStoreTest, AioQueuedTasksTimeOutWhileOpenWorkerIsStuck)
406415

407416
ASSERT_LT(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(), 1500);
408417
}
418+
419+
TEST_F(UCPosixStoreTest, AioTimedOutMultiShardDumpDoesNotCommitAfterLateOpen)
420+
{
421+
using namespace UC::PosixStore;
422+
PosixStore store;
423+
ASSERT_EQ(store.Setup(MakeAioConfig(Path(), 50, 1, 2)), UC::Status::OK());
424+
auto buffer1 = MakeAlignedBuffer(8);
425+
auto buffer2 = MakeAlignedBuffer(9);
426+
ASSERT_NE(buffer1.get(), nullptr);
427+
ASSERT_NE(buffer2.get(), nullptr);
428+
auto block = UC::Test::Detail::TypesHelper::MakeBlockIdRandomly();
429+
430+
{
431+
StallingOpenHook hook{true};
432+
UC::Detail::TaskDesc desc;
433+
desc.brief = "AioMultiShardLateOpen";
434+
desc.push_back(UC::Detail::Shard{block, 0, {buffer1.get()}});
435+
desc.push_back(UC::Detail::Shard{block, 1, {buffer2.get()}});
436+
auto handle = store.Dump(std::move(desc));
437+
ASSERT_TRUE(handle.HasValue());
438+
ASSERT_TRUE(hook.WaitEntered());
439+
440+
ASSERT_EQ(store.Wait(handle.Value()), UC::Status::Timeout());
441+
}
442+
443+
auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(500);
444+
while (std::chrono::steady_clock::now() < deadline) {
445+
auto founds = store.Lookup(&block, 1);
446+
ASSERT_TRUE(founds.HasValue());
447+
ASSERT_EQ(founds.Value(), std::vector<uint8_t>{false});
448+
std::this_thread::sleep_for(std::chrono::milliseconds(20));
449+
}
450+
}

0 commit comments

Comments
 (0)