Skip to content

Commit de3849c

Browse files
committed
Address Ivo's comments
1 parent 56cc7a5 commit de3849c

4 files changed

Lines changed: 70 additions & 15 deletions

File tree

cpp/arcticdb/entity/atom_key.hpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,12 @@ class AtomKeyImpl {
8989
friend bool operator!=(const AtomKeyImpl& l, const AtomKeyImpl& r) { return !(l == r); }
9090

9191
friend bool operator<(const AtomKeyImpl& l, const AtomKeyImpl& r) {
92-
const auto lt = std::tie(l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_);
93-
const auto rt = std::tie(r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_);
92+
const auto lt = std::tie(
93+
l.id_, l.version_id_, l.index_start_, l.index_end_, l.creation_ts_, l.key_type_, l.content_hash_
94+
);
95+
const auto rt = std::tie(
96+
r.id_, r.version_id_, r.index_start_, r.index_end_, r.creation_ts_, r.key_type_, r.content_hash_
97+
);
9498
return lt < rt;
9599
}
96100

@@ -268,8 +272,10 @@ struct AtomKeyPacked {
268272
friend bool operator!=(const AtomKeyPacked& l, const AtomKeyPacked& r) { return !(l == r); }
269273

270274
friend bool operator<(const AtomKeyPacked& l, const AtomKeyPacked& r) {
271-
const auto lt = std::tie(l.version_id_, l.index_start_, l.index_end_, l.creation_ts_);
272-
const auto rt = std::tie(r.version_id_, r.index_start_, r.index_end_, r.creation_ts_);
275+
const auto lt =
276+
std::tie(l.version_id_, l.index_start_, l.index_end_, l.creation_ts_, l.key_type_, l.content_hash_);
277+
const auto rt =
278+
std::tie(r.version_id_, r.index_start_, r.index_end_, r.creation_ts_, r.key_type_, r.content_hash_);
273279
return lt < rt;
274280
}
275281

cpp/arcticdb/version/version_map.hpp

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <arcticdb/util/configs_map.hpp>
3131
#include <arcticdb/storage/store.hpp>
3232
#include <arcticdb/storage/storage.hpp>
33+
#include <arcticdb/storage/storage_utils.hpp>
3334
#include <arcticdb/util/constants.hpp>
3435
#include <arcticdb/util/key_utils.hpp>
3536
#include <arcticdb/version/version_map_entry.hpp>
@@ -38,8 +39,6 @@
3839
#include <arcticdb/version/version_utils.hpp>
3940
#include <arcticdb/util/lock_table.hpp>
4041

41-
#include "storage/storage_utils.hpp"
42-
4342
namespace arcticdb {
4443

4544
template<class Clock = util::SysClock>
@@ -137,6 +136,12 @@ class VersionMapImpl {
137136
const std::shared_ptr<Store>& store, const VersionMapEntry& ref_entry,
138137
const std::shared_ptr<VersionMapEntry>& entry, const LoadStrategy& load_strategy
139138
) const {
139+
util::check(
140+
ref_entry.stream_id_ == entry->stream_id_,
141+
"follow_version_chain called with mismatching stream ids {} != {}",
142+
ref_entry.stream_id_,
143+
entry->stream_id_
144+
);
140145
auto next_key = ref_entry.head_;
141146
entry->head_ = ref_entry.head_;
142147

@@ -380,8 +385,15 @@ class VersionMapImpl {
380385
else {
381386
if (tombstone->type() == KeyType::TOMBSTONE_ALL)
382387
new_entry->try_set_tombstone_all(*tombstone);
383-
else
384-
new_entry->tombstones_.insert(std::make_pair(key.version_id(), *tombstone));
388+
else {
389+
util::check(
390+
key.id() == new_entry->stream_id_,
391+
"compact_and_remove_deleted_indexes found mismatching stream ids {} != {}",
392+
key.id(),
393+
new_entry->stream_id_
394+
);
395+
new_entry->try_set_tombstone(*tombstone);
396+
}
385397

386398
new_entry->keys_.push_back(key);
387399
}
@@ -671,7 +683,7 @@ class VersionMapImpl {
671683
auto tombstone_version_id = tombstones.front().version_id();
672684
do_write(store, tombstone_version_id, stream_id, std::span{tombstones}, entry);
673685
for (const auto& key : tombstones) {
674-
entry->tombstones_.try_emplace(key.version_id(), key);
686+
entry->try_set_tombstone(key);
675687
}
676688
maybe_invalidate_cached_undeleted(*entry);
677689

cpp/arcticdb/version/version_map_entry.hpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,15 @@ struct VersionMapEntry {
314314
return strm.str();
315315
}
316316

317-
void unshift_key(const AtomKey& key) { keys_.push_front(key); }
317+
void unshift_key(const AtomKey& key) {
318+
util::check(
319+
key.id() == stream_id_,
320+
"VersionMapEntry::unshift_key called with key with stream id {} != {}",
321+
key.id(),
322+
stream_id_
323+
);
324+
keys_.push_front(key);
325+
}
318326

319327
std::vector<AtomKeyPacked> get_indexes(bool include_deleted) const {
320328
std::vector<AtomKeyPacked> output;
@@ -418,7 +426,23 @@ struct VersionMapEntry {
418426
util::check_rte(head_->id() == stream_id_, "Multiple symbols in keys: {} != {}", head_->id(), stream_id_);
419427
}
420428

429+
void try_set_tombstone(const AtomKey& key) {
430+
util::check(
431+
key.id() == stream_id_,
432+
"VersionMapEntry::try_set_tombstone called with key with stream id {} != {}",
433+
key.id(),
434+
stream_id_
435+
);
436+
tombstones_.try_emplace(key.version_id(), key);
437+
}
438+
421439
void try_set_tombstone_all(const AtomKey& key) {
440+
util::check(
441+
key.id() == stream_id_,
442+
"VersionMapEntry::try_set_tombstone_all called with key with stream id {} != {}",
443+
key.id(),
444+
stream_id_
445+
);
422446
util::check(key.type() == KeyType::TOMBSTONE_ALL, "Can't set tombstone all key with key {}", key);
423447
if (!tombstone_all_ || tombstone_all_->version_id() < key.version_id())
424448
tombstone_all_ = key;
@@ -453,8 +477,10 @@ struct VersionMapEntry {
453477
timestamp last_reload_time_ = 0;
454478
LoadProgress load_progress_;
455479
std::deque<AtomKeyPacked> keys_;
456-
ankerl::unordered_dense::map<VersionId, AtomKeyPacked> tombstones_;
457480
std::optional<AtomKey> tombstone_all_;
481+
482+
private:
483+
ankerl::unordered_dense::map<VersionId, AtomKeyPacked> tombstones_;
458484
};
459485

460486
inline bool is_live_index_type_key(const AtomKeyPacked& key, const std::shared_ptr<VersionMapEntry>& entry) {

cpp/arcticdb/version/version_utils.hpp

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ inline std::optional<AtomKey> read_segment_with_keys(
3939
for (; row < ssize_t(seg.row_count()); ++row) {
4040
auto key = read_key_row(seg, row);
4141
ARCTICDB_TRACE(log::version(), "Reading key {}", key);
42-
42+
util::check(
43+
key.id() == entry.stream_id_,
44+
"read_segment_with_keys found mismatching stream ids {} != {}",
45+
key.id(),
46+
entry.stream_id_
47+
);
4348
if (is_index_key_type(key.type())) {
4449
entry.keys_.push_back(key);
4550
oldest_loaded_index = std::min(oldest_loaded_index, key.version_id());
@@ -51,7 +56,7 @@ inline std::optional<AtomKey> read_segment_with_keys(
5156
}
5257

5358
} else if (key.type() == KeyType::TOMBSTONE) {
54-
entry.tombstones_.try_emplace(key.version_id(), key);
59+
entry.try_set_tombstone(key);
5560
entry.keys_.push_back(key);
5661
} else if (key.type() == KeyType::TOMBSTONE_ALL) {
5762
entry.try_set_tombstone_all(key);
@@ -96,11 +101,17 @@ std::shared_ptr<VersionMapEntry> build_version_map_entry_with_predicate_iteratio
96101
for (auto key_type : key_types) {
97102
store->iterate_type(
98103
key_type,
99-
[&predicate, &read_keys, &store, &output, &perform_read_segment_with_keys](VariantKey&& vk) {
104+
[&stream_id, &predicate, &read_keys, &store, &output, &perform_read_segment_with_keys](VariantKey&& vk
105+
) {
100106
const auto& key = to_atom(std::move(vk));
101107
if (!predicate(key))
102108
return;
103-
109+
util::check(
110+
key.id() == stream_id,
111+
"build_version_map_entry_with_predicate_iteration found mismatching stream ids {} != {}",
112+
key.id(),
113+
stream_id
114+
);
104115
read_keys.push_back(key);
105116
ARCTICDB_DEBUG(log::storage(), "Version map iterating key {}", key);
106117
if (perform_read_segment_with_keys) {

0 commit comments

Comments
 (0)