Reduce symbol list peak heap usage during load and compaction#2986
Reduce symbol list peak heap usage during load and compaction#2986G-D-Petrov wants to merge 9 commits into
Conversation
2a815f0 to
5844198
Compare
| for (auto i = 0L; i < seg.column(0).row_count(); ++i) { | ||
| auto stream_id = stream_id_from_segment(data_type, seg, i, 0); | ||
| auto reference_id = VersionId{scalar_at<uint64_t>(seg, i, 1)}; | ||
| auto reference_time = timestamp{scalar_at<int64_t>(seg, i, 2)}; | ||
| ARCTICDB_RUNTIME_DEBUG( | ||
| log::symbol(), "Reading added symbol {}: {}@{}", stream_id, reference_id, reference_time | ||
| ); | ||
| output.emplace_back(stream_id, reference_id, reference_time, ActionType::ADD); | ||
| visitor(stream_id_from_segment(data_type, seg, i, 0), | ||
| VersionId{scalar_at<uint64_t>(seg, i, 1)}, | ||
| timestamp{scalar_at<int64_t>(seg, i, 2)}, | ||
| ActionType::ADD); | ||
| } | ||
|
|
||
| if (seg.descriptor().field_count() == 6) { | ||
| util::check( | ||
| seg.column(3).row_count() == seg.column(4).row_count() && | ||
| seg.column(3).row_count() == seg.column(5).row_count(), | ||
| "Column mismatch in symbol segment deletions: {} {} {}", | ||
| seg.column(3).row_count(), | ||
| seg.column(4).row_count(), | ||
| seg.column(5).row_count() | ||
| ); | ||
|
|
||
| for (auto i = 0L; i < seg.column(3).row_count(); ++i) { | ||
| auto stream_id = stream_id_from_segment(data_type, seg, i, 3); | ||
| auto reference_id = VersionId{scalar_at<uint64_t>(seg, i, 4)}; | ||
| auto reference_time = timestamp{scalar_at<int64_t>(seg, i, 5)}; | ||
| ARCTICDB_RUNTIME_DEBUG( | ||
| log::symbol(), "Reading deleted symbol {}: {}@{}", stream_id, reference_id, reference_time | ||
| ); | ||
| output.emplace_back(stream_id, reference_id, reference_time, ActionType::DELETE); | ||
| visitor(stream_id_from_segment(data_type, seg, i, 3), | ||
| VersionId{scalar_at<uint64_t>(seg, i, 4)}, | ||
| timestamp{scalar_at<int64_t>(seg, i, 5)}, | ||
| ActionType::DELETE); | ||
| } |
There was a problem hiding this comment.
The column row-count consistency checks that previously lived in read_new_style_list_from_storage have been dropped here:
util::check(
seg.column(0).row_count() == seg.column(1).row_count() &&
seg.column(0).row_count() == seg.column(2).row_count(),
"Column mismatch in symbol segment additions: {} {} {}", ...);Without them, if a stored segment is corrupt and has column(1).row_count() < column(0).row_count(), the scalar_at<uint64_t>(seg, i, 1) / scalar_at<int64_t>(seg, i, 2) calls on line 156–157 (and 164–165 for deletions) will either read garbage or rely on scalar_at's internal bounds check to throw an opaque error — losing the clear "Column mismatch in symbol segment additions/deletions" diagnostic that pointed directly at the storage corruption.
This is a defensive on-disk-data-integrity check and is cheap (three integer comparisons before the loops). Please restore it inside for_each_segment_entry before the additions and deletions loops.
| load_result.total_key_count_ = journal.total_key_count; | ||
| load_result.symbol_list_keys_ = std::move(journal.all_keys); | ||
|
|
||
| if (journal.compaction_key) { | ||
| ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from symbol list keys"); | ||
| auto existing = read_from_storage(store, *journal.compaction_key); | ||
| load_result.symbols_ = | ||
| merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(existing)); | ||
| } else { | ||
| ARCTICDB_RUNTIME_DEBUG(log::symbol(), "Loading symbols from version keys"); | ||
| auto previous_entries = load_previous_from_version_keys(store, data, will_attempt_compaction); | ||
| load_result.symbols_ = | ||
| merge_existing_with_journal_map(version_map, store, journal.update_map, std::move(previous_entries)); | ||
| } | ||
|
|
There was a problem hiding this comment.
The "Would delete unseen key" sanity check from master has been removed without a replacement:
std::unordered_set<StreamId> keys_in_versions;
for (const auto& entry : load_result.symbols_)
keys_in_versions.emplace(entry.stream_id_);
for (const auto& key : load_result.symbol_list_keys_)
util::check(
keys_in_versions.find(StreamId{std::get<StringIndex>(key.start_index())}) != keys_in_versions.end(),
"Would delete unseen key {}", key);This was a defensive assertion guarding the version-keys path: it ensured that every journal key about to be deleted during compaction corresponded to a symbol present in the merged output. If a bug in the merge logic (or in problematic-symbol resolution) ever drops a symbol that still has live journal entries, delete_keys/remove_keys_sync would now silently destroy those journal entries — exactly the silent-data-loss class of bug this check was designed to catch.
Either restore the equivalent assertion against load_result.symbol_list_keys_ (the VariantKey collection) before remove_keys_sync in compact_internal, or justify in the PR description why the check is no longer needed.
ArcticDB Code Review SummaryAll previously flagged issues have been addressed. No new issues introduced in the latest commit. Correctness
Performance
Testing
|
| // Verify every journal key we'd delete during compaction corresponds to a symbol in the | ||
| // merged output. Guards against silent data loss from merge bugs. | ||
| // O(N*M) but only runs once per library (version-keys path, before any compaction exists). | ||
| if (collect_keys) { | ||
| for (const auto& key : load_result.symbol_list_keys_) { | ||
| auto stream_id = StreamId{std::get<StringIndex>(to_atom(key).start_index())}; | ||
| util::check( | ||
| std::any_of( | ||
| load_result.symbols_.begin(), | ||
| load_result.symbols_.end(), | ||
| [&stream_id](const SymbolListEntry& e) { return e.stream_id_ == stream_id; } | ||
| ), | ||
| "Would delete unseen key {}", | ||
| key | ||
| ); | ||
| } |
There was a problem hiding this comment.
Good to see this check restored, but the implementation is O(N*M) where master was O(N+M).
Master built an unordered_set<StreamId> once from load_result.symbols_ and did a single hashed lookup per journal key:
std::unordered_set<StreamId> keys_in_versions;
for (const auto& entry : load_result.symbols_)
keys_in_versions.emplace(entry.stream_id_);
for (const auto& key : load_result.symbol_list_keys_)
util::check(
keys_in_versions.find(StreamId{std::get<StringIndex>(to_atom(key).start_index())}) != keys_in_versions.end(),
"Would delete unseen key {}", key);The current std::any_of does a full linear scan of symbols_ for every entry in symbol_list_keys_. This path triggers on the first compaction of a library — exactly the scenario the PR is optimising — so for a library reaching first compaction with e.g. 300K uncompacted journal entries (cf. the 300K×1 benchmark in the PR description) this becomes ~9×10¹⁰ comparisons.
Suggest using a hashed lookup like master, or std::binary_search over symbols_ (it's already sorted by stream_id_ when problematic_symbols is non-empty, and merge_existing_entries keeps it sorted otherwise). The comment claiming "only runs once per library" doesn't change that this single run can be very slow at scale.
Reference Issues/PRs
What does this implement or fix?
The symbol list load and compaction path had higher peak heap usage than necessary. The original
implementation collected all SYMBOL_LIST
AtomKeys into a sorted vector in a first pass(
get_all_symbol_list_keys), then iterated that vector a second time to build the journal updatemap (
load_journal_keys→MapType). This meant the full key vector and the full update mapcoexisted on the heap throughout the merge phase. During compaction, a further
vector<VariantKey>was built from the same key vector before deletion — so all threeallocations could overlap.
For a library with N uncompacted journal entries (steady-state path where a compaction key exists):
vector<AtomKey>(N)+MapType(N entries)MapType(N entries)onlyvector<AtomKey>(N)+vector<VariantKey>(N)+CollectionTypeCollectionType+vector<VariantKey>(N)vector<AtomKey>(N)+vector<VariantKey>(N)+CollectionTypevector<VariantKey>(N)onlyChanges
cpp/arcticdb/version/symbol_list.cppStreaming journal load — eliminated the two-pass key vector:
get_all_symbol_list_keys(first-pass collection into a sortedvector<AtomKey>)and
load_journal_keys(second-pass map construction from that vector).load_journal_streaming: a singleiterate_typepass that builds theupdate_map(MapType) directly without materialising an intermediate key vector. Thelatest compaction key is tracked inline. Each symbol's entry list is sorted at the end
of the pass via
sort_update_map_entries.load_journal_streamingaccepts acollect_keysparameter: when true (compactionexpected), it also collects
VariantKeys during the same iterate pass — avoiding aseparate storage iteration for key collection while keeping the non-compaction load
path lean (no key vector allocated at all).
add_update_map_entry,sort_update_map_entries,key_sort_comparator.Compaction path — early free + direct VariantKey collection:
attempt_loadnow passescollect_keys = (will_attempt_compaction == YES), so keys areonly collected when compaction may follow. The compaction path performs the same number
of storage iterations as master (one full iterate for load + key collection, one filtered
iterate for
has_recent_compaction). The non-compaction read path saves one full iterate.compact_internalmovessymbols_intowrite_symbols(which now takesCollectionTypeby value), so the symbol vector is freed when
write_symbolsreturns. Deletion usesin-place erase (
remove_if+erase) to filter out the newly written key, thenpasses the same vector to
remove_keys_syncvia move — no second vector is allocated.This ensures
CollectionTypeand the deletion keys never coexist on the heap.has_recent_compactionnow takesoptional<AtomKey>instead ofoptional<vector<AtomKey>::const_iterator>— simpler interface matching the newLoadResult::compaction_key_.VariantKeys directly during the load iterate, avoidingthe
vector<AtomKey>→vector<VariantKey>conversion that master performed indelete_keys. Keys are sorted before deletion for storage backend performance(e.g.
InMemoryStorage).load_from_symbol_list_keys,load_from_version_keys,collect_all_symbol_list_keys,last_compaction,LoadResult::detach_symbol_list_keys()."Would delete unseen key"defensive assertion from master in theversion-keys path (no compaction key found). Adapted to use
vector<VariantKey>keysvia
to_atom(). Only runs once per library before the first compaction.Segment reading simplification:
read_old_style_list_from_storageandread_new_style_list_from_storage;replaced with a single
for_each_segment_entrytemplate that dispatches based onfield count (1 = old-style all-ADD, 6 = new-style with deletions). Column row-count
consistency checks are preserved inside
for_each_segment_entrybefore the additionsand deletions loops. Used by
read_from_storagevia a visitor lambda.Merge logic refactoring:
merge_existing_entriesfrom the monolithicmerge_existing_with_journal_keys.The new function takes
existing_keys+update_map+min_allowed_intervalandreturns
ExistingKeysMergeResult(symbols + problematic map). Consumed entries areerased from
update_map.merge_existing_with_journal_keys→merge_existing_with_journal_map(nowtakes
MapType&directly instead ofvector<AtomKey>&).Other changes:
delete_keysreturn type changed fromvector<Store::RemoveKeyResultType>tovoid(return value was unused at every call site).
LoadResultsimplified:maybe_previous_compaction(optional<iterator>) →compaction_key_(optional<AtomKey>);timestamp_removed;symbol_list_keys_changed from
vector<AtomKey>tovector<VariantKey>(only populated when compactionis expected);
total_key_count_added.cpp/arcticdb/version/symbol_list.hppCompactionandMaybeCompactiontype aliases.LoadResultupdated as described above.delete_keysdeclaration updated to returnvoid.load()now builds the outputset<StreamId>before callingcompact_internal,since compaction moves
symbols_intowrite_symbols(by value). Uses copy-insertionso that
compact_internalcan still consumesymbols_for the write.cpp/arcticdb/version/test/test_symbol_list.cppAdded two equivalence tests verifying
loadwith and withoutno_compaction=truereturnidentical symbol sets:
DirectPathEquivalence: 50 symbols, compaction, 10 new + 10 remove journal entries.Remove entries resolve as ADD (version map still shows those symbols as existing). 60
symbols expected.
DirectPathEquivalenceWithTrueDeletes: 50 symbols, compaction, 10 new + 10 tombstoneddeletes (version map agrees). Remove entries resolve as DELETE. 50 symbols expected,
deleted symbols verified absent.
cpp/arcticdb/version/test/benchmark_symbol_list.cpp(new)New C++ microbenchmarks measuring wall time and peak heap delta via
PeakHeapTracker(
mallinfo2polled at 100 µs intervals):BM_symbol_list_loadBM_symbol_list_load_many_entriesBM_symbol_list_compactionBM_symbol_list_load_s3BM_symbol_list_compaction_s3cpp/arcticdb/CMakeLists.txtAdded
version/test/benchmark_symbol_list.cppto thebenchmarksexecutable source list.python/benchmarks/list_symbols.pyThree new ASV benchmark classes covering the optimised paths, each with
time_list_symbolsand
peakmem_list_symbolsmeasurements across LMDB and S3 backends:ListSymbolsWithCompactedCacheListSymbolsCompactionlist_symbolstriggering compaction with 1K symbols × 10 uncompacted versionsListSymbolsWithDeletesListSymbolsWithoutCache.num_symbolsreduced from[100, 1000]to[1000]to trim thebenchmark matrix.
python/.asv/results/benchmarks.jsonUpdated ASV benchmark registry to include the three new benchmark classes.
Benchmark Results (C++)
All runs on a 128-core machine, release build. Peak figures are heap delta measured via
mallinfo2polled at 100 µs intervals from a trimmed baseline.Benchmarks marked (local only) are commented out in the source and excluded from CI
because their setup or run time exceeds ~5 s on CI hardware. Run them locally with
make bench-cpp FILTER=<name>.Load path (compacted cache + small journal tail)
BM_symbol_list_load/100KBM_symbol_list_load/300KBM_symbol_list_load/1M(local only)The load path with
no_compaction=truedoes not collect keys, so only the streamingupdate_mapconstruction contributes to peak. The key vector elimination cuts 9–17% peakfor the typical 100K–300K symbol range.
Load path (compacted cache + large uncompacted journal)
BM_symbol_list_load_many_entries/1K×1KBM_symbol_list_load_many_entries/300K×1(local only)The 300K×1 case shows the largest memory gain: with 300K distinct symbols each contributing
one journal entry, master held a
vector<AtomKey>(300K keys) alongside the update mapsimultaneously. The branch builds the update map in a single pass without materialising
the key vector, cutting peak by ~114 MB.
The 1K×1K case is also 2× faster (1521 ms → 731 ms): the streaming approach avoids
a second iteration over the sorted key vector that master needed to build the update map.
Compaction path
BM_symbol_list_compaction/1K×100BM_symbol_list_compaction/1K×1K(local only)BM_symbol_list_compaction/10K×100(local only)BM_symbol_list_compaction/300K×1(local only)Wall-clock times are close to master (same number of storage iterations: one full + one
filtered). The 1K×100 and 300K×1 cases show clear peak improvements (-6% and -11%) from
the in-place erase of the written key (no second
vector<VariantKey>is allocated fordeletion). The 1K×1K case shows a peak regression (+14%) because the pre-collected
VariantKeyvector overlaps with theupdate_mapduring load — at that scale theupdate_map(1K × 1KSymbolEntryDataentries) dominates peak and the savings fromthe deletion path cannot offset it.
S3 mock store
BM_symbol_list_load_s3/10KBM_symbol_list_compaction_s3/1K×100(local only)BM_symbol_list_compaction_s3/10K×10(local only)S3 compaction peak for 1K×100 dropped from 103.8 to 65.1 MB (-37%). Moving
symbols_into
write_symbols(by value) prevents overlap with the deletion key vector, and thein-place erase eliminates the second
vector<VariantKey>allocation during deletion.The 10K×10 case (same 100K total entries, 10× more distinct symbols) peaks at 66.0 MB —
similar to the 1K×100 case since total entry count is the same.