1212#include " lsm/core/internal/options.h"
1313#include " lsm/db/impl.h"
1414#include " lsm/io/memory_persistence.h"
15+ #include " lsm/lsm.h"
1516#include " random/generators.h"
1617#include " ssx/clock.h"
1718#include " test_utils/async.h"
@@ -154,6 +155,7 @@ class ImplTest : public testing::Test {
154155 _underlying_data_persistence->close ().get ();
155156 _meta_persistence->close ().get ();
156157 _shadow.clear ();
158+ _deleted_keys.clear ();
157159 }
158160
159161 void write_at_least (size_t size, lsm::db::impl* db = nullptr ) {
@@ -179,6 +181,7 @@ class ImplTest : public testing::Test {
179181 db->apply (std::move (batch)).get ();
180182 // Only apply writes if db write was a success
181183 for (auto & [k, v] : shadow_batch) {
184+ _deleted_keys.erase (k);
182185 _shadow.insert_or_assign (k, std::move (v));
183186 }
184187 }
@@ -210,6 +213,7 @@ class ImplTest : public testing::Test {
210213 db->apply (std::move (batch)).get ();
211214 for (auto & k : keys_to_delete) {
212215 _shadow.erase (k);
216+ _deleted_keys.insert (k);
213217 }
214218 }
215219
@@ -232,12 +236,14 @@ class ImplTest : public testing::Test {
232236 if (!db) {
233237 db = _db.get ();
234238 }
239+ std::vector<std::string> errors;
240+
241+ // Validate via iterator (range scan path).
235242 auto iter = db->create_iterator ({.snapshot = snapshot}).get ();
236243 auto it = shadow.begin ();
237- std::vector<std::string> errors;
238244 for (iter->seek_to_first ().get (); iter->valid (); iter->next ().get ()) {
239245 if (it == shadow.end ()) {
240- errors.emplace_back (" extra elements" );
246+ errors.emplace_back (" iter: extra elements" );
241247 break ;
242248 }
243249 bool key_eq = it->first == iter->key ().user_key ();
@@ -258,8 +264,44 @@ class ImplTest : public testing::Test {
258264 ++it;
259265 }
260266 if (it != shadow.end ()) {
261- errors.emplace_back (" missing elements" );
267+ errors.emplace_back (" iter: missing elements" );
268+ }
269+
270+ // Validate via point lookups (get() path). Construct the lookup
271+ // key the same way snapshot::get() / database::get() does.
272+ auto get_seqno = snapshot ? snapshot->seqno ()
273+ : lsm::internal::sequence_number::max ();
274+ for (const auto & [user_key, entry] : shadow) {
275+ auto lookup_key = lsm::internal::key::encode ({
276+ .key = lsm::user_key_view (user_key),
277+ .seqno = get_seqno,
278+ .type = lsm::internal::value_type::tombstone,
279+ });
280+ auto result = db->get (lookup_key).get ();
281+ if (result.is_missing ()) {
282+ errors.push_back (fmt::format (" get: key {} missing" , user_key));
283+ } else if (result.is_tombstone ()) {
284+ errors.push_back (
285+ fmt::format (" get: key {} unexpectedly tombstoned" , user_key));
286+ }
262287 }
288+ for (const auto & user_key : _deleted_keys) {
289+ if (shadow.contains (user_key)) {
290+ continue ;
291+ }
292+ auto lookup_key = lsm::internal::key::encode ({
293+ .key = lsm::user_key_view (user_key),
294+ .seqno = get_seqno,
295+ .type = lsm::internal::value_type::tombstone,
296+ });
297+ auto result = db->get (lookup_key).get ();
298+ if (!result.is_missing () && !result.is_tombstone ()) {
299+ errors.push_back (
300+ fmt::format (
301+ " get: deleted key {} returned a value" , user_key));
302+ }
303+ }
304+
263305 if (errors.empty ()) {
264306 return testing::AssertionSuccess ();
265307 }
@@ -313,6 +355,7 @@ class ImplTest : public testing::Test {
313355
314356protected:
315357 shadow_map _shadow;
358+ std::set<ss::sstring> _deleted_keys;
316359 ss::lw_shared_ptr<lsm::internal::options> _options;
317360 std::unique_ptr<lsm::io::data_persistence> _underlying_data_persistence;
318361 lsm::io::memory_persistence_controller _meta_persistence_controller;
@@ -385,6 +428,12 @@ TEST_F(ImplTest, RandomizedWithDeletes) {
385428 EXPECT_TRUE (matches_shadow ());
386429 delete_random_keys ();
387430 EXPECT_TRUE (matches_shadow ());
431+
432+ // Also test after flushes, with and without a snapshot.
433+ _db->flush ().get ();
434+ EXPECT_TRUE (matches_shadow ());
435+ auto snap = _db->create_snapshot ();
436+ EXPECT_TRUE (matches_shadow (_shadow, snap->get ()));
388437 }
389438}
390439
@@ -593,6 +642,53 @@ TEST_F(ImplTest, GetFindsKeysWithDifferentSeqno) {
593642 EXPECT_FALSE (result.is_missing ());
594643}
595644
645+ // Regression test: snapshot::get() must detect tombstones in SST files.
646+ TEST_F (ImplTest, SnapshotGetDetectsTombstoneInSST) {
647+ using lsm::sequence_number;
648+ auto data_persistence = lsm::io::make_memory_data_persistence ();
649+ lsm::io::memory_persistence_controller meta_ctl;
650+ auto meta_persistence = lsm::io::make_memory_metadata_persistence (
651+ &meta_ctl);
652+ auto db = lsm::database::open (
653+ lsm::options{},
654+ {
655+ .data = std::move (data_persistence),
656+ .metadata = std::move (meta_persistence),
657+ })
658+ .get ();
659+
660+ // Write a value and then tombstone it.
661+ {
662+ auto wb = db.create_write_batch ();
663+ wb.put (" foo" , iobuf::from (" bar" ), sequence_number (1 ));
664+ db.apply (std::move (wb)).get ();
665+ }
666+ {
667+ auto wb = db.create_write_batch ();
668+ wb.remove (" foo" , sequence_number (2 ));
669+ db.apply (std::move (wb)).get ();
670+ }
671+
672+ // Flush so both the value and tombstone move to an SST file.
673+ db.flush (ssx::instant::infinite_future ()).get ();
674+ ASSERT_EQ (db.max_persisted_seqno (), sequence_number (2 ));
675+
676+ auto snap = db.create_snapshot ();
677+
678+ // Iterator correctly sees no live keys (extents/terms path).
679+ {
680+ auto iter = snap.create_iterator ().get ();
681+ iter.seek_to_first ().get ();
682+ EXPECT_FALSE (iter.valid ()) << " iterator should see no live keys" ;
683+ }
684+
685+ // snapshot::get() must also return nullopt for the deleted key.
686+ auto result = snap.get (" foo" ).get ();
687+ EXPECT_FALSE (result.has_value ())
688+ << " snapshot::get() should not find a deleted key" ;
689+ db.close ().get ();
690+ }
691+
596692TEST_F (ImplTest, RefreshOnWritableDbThrows) {
597693 EXPECT_THROW (_db->refresh ().get (), lsm::invalid_argument_exception);
598694}
0 commit comments