Skip to content

Commit 04a314c

Browse files
authored
Merge pull request #30348 from ballard26/ct-at-bug-investigation
Invalidate LSM iterators when exceptions are thrown
2 parents 2e429e5 + 3b49ebf commit 04a314c

3 files changed

Lines changed: 176 additions & 8 deletions

File tree

src/v/lsm/core/internal/two_level_iterator.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,10 @@ class impl : public iterator {
7676

7777
private:
7878
ss::future<> init_data_block() {
79+
// Clear previous iterator first in case `_data_iter_fn` throws.
80+
_data_iter = nullptr;
81+
7982
if (!_index_iter->valid()) {
80-
_data_iter = nullptr;
8183
co_return;
8284
}
8385
auto handle = _index_iter->value();

src/v/lsm/db/iter.cc

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,18 @@ class db_iter : public internal::iterator {
5656
}
5757

5858
ss::future<> seek_to_first() override {
59+
invalidate();
5960
_dir = forward;
6061
_saved_value.clear();
6162
_saved_key = {};
6263
co_await _iter->seek_to_first();
6364
if (_iter->valid()) {
6465
co_await find_next_user_entry(false);
65-
} else {
66-
_valid = false;
6766
}
6867
}
6968

7069
ss::future<> seek_to_last() override {
70+
invalidate();
7171
_dir = reverse;
7272
_saved_value.clear();
7373
_saved_key = {};
@@ -76,20 +76,20 @@ class db_iter : public internal::iterator {
7676
}
7777

7878
ss::future<> seek(internal::key_view target) override {
79+
invalidate();
7980
_dir = forward;
8081
_saved_value.clear();
8182
_saved_key = {};
8283
co_await _iter->seek(target);
8384
if (_iter->valid()) {
8485
_saved_key = internal::key(target);
8586
co_await find_next_user_entry(false);
86-
} else {
87-
_valid = false;
8887
}
8988
}
9089

9190
ss::future<> next() override {
9291
dassert(valid(), "iterator must be valid to call next()");
92+
invalidate();
9393
if (_dir == reverse) {
9494
_dir = forward;
9595
// iter is pointing just before the entries for this->key(),
@@ -101,7 +101,6 @@ class db_iter : public internal::iterator {
101101
co_await _iter->next();
102102
}
103103
if (!_iter->valid()) {
104-
_valid = false;
105104
_saved_key = {};
106105
co_return;
107106
}
@@ -111,7 +110,6 @@ class db_iter : public internal::iterator {
111110
_saved_key = internal::key(_iter->key());
112111
co_await _iter->next();
113112
if (!_iter->valid()) {
114-
_valid = false;
115113
_saved_key = {};
116114
co_return;
117115
}
@@ -121,12 +119,12 @@ class db_iter : public internal::iterator {
121119

122120
ss::future<> prev() override {
123121
dassert(valid(), "iterator must be valid to call prev()");
122+
invalidate();
124123
if (_dir == forward) {
125124
_saved_key = internal::key(_iter->key());
126125
while (true) {
127126
co_await _iter->prev();
128127
if (!_iter->valid()) {
129-
_valid = false;
130128
_saved_key = {};
131129
_saved_value.clear();
132130
co_return;
@@ -141,6 +139,12 @@ class db_iter : public internal::iterator {
141139
}
142140

143141
private:
142+
// Called at the start of every mutating coroutine so that if an
143+
// awaited future throws, valid() reports false instead of a stale
144+
// true. find_next_user_entry / find_prev_user_entry re-establish
145+
// _valid on the success path.
146+
void invalidate() { _valid = false; }
147+
144148
size_t random_compaction_period() {
145149
return random_generators::get_int(2 * _options->read_bytes_period);
146150
}

src/v/lsm/db/tests/iter_test.cc

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
#include <gtest/gtest.h>
1616

17+
#include <map>
1718
#include <memory>
19+
#include <stdexcept>
1820
#include <string_view>
1921

2022
namespace {
@@ -511,6 +513,166 @@ TEST_F(DBIteratorTest, TombstoneAfterValues) {
511513
EXPECT_FALSE(it->valid());
512514
}
513515

516+
// An iterator over an in-memory map that can be primed to throw on the
517+
// next mutating operation. Used to model the cloud_io failure path where
518+
// a deeper iterator (e.g. a two-level iterator's data block) raises an
519+
// exception mid-operation, as observed in the AT reactor-1 SIGSEGV.
520+
class throwing_iterator : public lsm::internal::iterator {
521+
public:
522+
explicit throwing_iterator(std::map<lsm::internal::key, iobuf> data)
523+
: _data(std::move(data))
524+
, _it(_data.end()) {}
525+
526+
// Arm the iterator to fail the next mutating call exactly once.
527+
void fail_next() { _fail_pending = true; }
528+
529+
bool valid() const override { return _it != _data.end(); }
530+
531+
ss::future<> seek_to_first() override {
532+
if (consume_failure()) {
533+
return make_failure();
534+
}
535+
_it = _data.begin();
536+
return ss::now();
537+
}
538+
539+
ss::future<> seek_to_last() override {
540+
if (consume_failure()) {
541+
return make_failure();
542+
}
543+
_it = _data.empty() ? _data.end() : std::prev(_data.end());
544+
return ss::now();
545+
}
546+
547+
ss::future<> seek(lsm::internal::key_view target) override {
548+
if (consume_failure()) {
549+
return make_failure();
550+
}
551+
_it = _data.lower_bound(lsm::internal::key(target));
552+
return ss::now();
553+
}
554+
555+
ss::future<> next() override {
556+
if (consume_failure()) {
557+
return make_failure();
558+
}
559+
if (_it != _data.end()) {
560+
++_it;
561+
}
562+
return ss::now();
563+
}
564+
565+
ss::future<> prev() override {
566+
if (consume_failure()) {
567+
return make_failure();
568+
}
569+
if (_it == _data.begin()) {
570+
_it = _data.end();
571+
} else if (_it != _data.end()) {
572+
--_it;
573+
} else if (!_data.empty()) {
574+
_it = std::prev(_data.end());
575+
}
576+
return ss::now();
577+
}
578+
579+
lsm::internal::key_view key() override { return _it->first; }
580+
iobuf value() override { return _it->second.copy(); }
581+
582+
private:
583+
bool consume_failure() {
584+
if (_fail_pending) {
585+
_fail_pending = false;
586+
return true;
587+
}
588+
return false;
589+
}
590+
static ss::future<> make_failure() {
591+
return ss::make_exception_future<>(
592+
std::runtime_error("simulated cloud_io download failure"));
593+
}
594+
595+
std::map<lsm::internal::key, iobuf> _data;
596+
std::map<lsm::internal::key, iobuf>::iterator _it;
597+
bool _fail_pending = false;
598+
};
599+
600+
class DBIteratorExceptionSafetyTest : public testing::Test {
601+
public:
602+
void SetUp() override {
603+
_options = ss::make_lw_shared<lsm::internal::options>();
604+
std::map<lsm::internal::key, iobuf> data;
605+
for (auto k : {"a", "b", "c"}) {
606+
auto ikey = lsm::internal::key::encode(
607+
{.key = lsm::user_key_view{k},
608+
.seqno = 100_seqno,
609+
.type = lsm::internal::value_type::value});
610+
data.emplace(std::move(ikey), iobuf::from(k));
611+
}
612+
auto inner = std::make_unique<throwing_iterator>(std::move(data));
613+
_inner = inner.get();
614+
_it = lsm::db::create_db_iterator(
615+
std::move(inner),
616+
lsm::internal::sequence_number::max(),
617+
_options,
618+
[](lsm::internal::key_view) { return ss::now(); });
619+
}
620+
621+
protected:
622+
ss::lw_shared_ptr<lsm::internal::options> _options;
623+
throwing_iterator* _inner = nullptr;
624+
std::unique_ptr<lsm::internal::iterator> _it;
625+
};
626+
627+
// If seek throws partway through, valid() must report false. Previously
628+
// _valid retained whatever it had been before the call, allowing a stale
629+
// "true" to escape and a downstream key() to dereference a half-loaded
630+
// underlying iterator (the AT reactor-1 SIGSEGV).
631+
TEST_F(DBIteratorExceptionSafetyTest, SeekThrowLeavesIteratorInvalid) {
632+
_it->seek("a"_seek_key).get();
633+
ASSERT_TRUE(_it->valid());
634+
635+
_inner->fail_next();
636+
EXPECT_THROW(_it->seek("b"_seek_key).get(), std::runtime_error);
637+
EXPECT_FALSE(_it->valid());
638+
}
639+
640+
TEST_F(DBIteratorExceptionSafetyTest, SeekToFirstThrowLeavesIteratorInvalid) {
641+
_it->seek_to_first().get();
642+
ASSERT_TRUE(_it->valid());
643+
644+
_inner->fail_next();
645+
EXPECT_THROW(_it->seek_to_first().get(), std::runtime_error);
646+
EXPECT_FALSE(_it->valid());
647+
}
648+
649+
TEST_F(DBIteratorExceptionSafetyTest, SeekToLastThrowLeavesIteratorInvalid) {
650+
_it->seek_to_last().get();
651+
ASSERT_TRUE(_it->valid());
652+
653+
_inner->fail_next();
654+
EXPECT_THROW(_it->seek_to_last().get(), std::runtime_error);
655+
EXPECT_FALSE(_it->valid());
656+
}
657+
658+
TEST_F(DBIteratorExceptionSafetyTest, NextThrowLeavesIteratorInvalid) {
659+
_it->seek_to_first().get();
660+
ASSERT_TRUE(_it->valid());
661+
662+
_inner->fail_next();
663+
EXPECT_THROW(_it->next().get(), std::runtime_error);
664+
EXPECT_FALSE(_it->valid());
665+
}
666+
667+
TEST_F(DBIteratorExceptionSafetyTest, PrevThrowLeavesIteratorInvalid) {
668+
_it->seek_to_last().get();
669+
ASSERT_TRUE(_it->valid());
670+
671+
_inner->fail_next();
672+
EXPECT_THROW(_it->prev().get(), std::runtime_error);
673+
EXPECT_FALSE(_it->valid());
674+
}
675+
514676
TEST_F(DBIteratorTest, MultipleVersionsWithSnapshot) {
515677
add_value("key", "v1", 100_seqno);
516678
add_value("key", "v2", 200_seqno);

0 commit comments

Comments
 (0)