Skip to content

Commit 72f9eb6

Browse files
committed
refactor(data): hoist bulk scratch buffer to caller, switch AddManyForKey to span
Address review feedback on PR #645: - AddManyForKey / BulkAddForKey now take std::span<const uint32_t> instead of pointer+length, matching the rest of the PR's style. - ForEachPositionDelete takes a caller-owned scratch vector instead of a thread_local, removing the re-entrancy hazard documented on the prior API and giving the caller full control over buffer lifetime.
1 parent 93ade21 commit 72f9eb6

8 files changed

Lines changed: 34 additions & 26 deletions

src/iceberg/data/delete_loader.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde
115115

116116
// Filter-path staging buffer; reused across batches via `clear()`.
117117
std::vector<int64_t> positions;
118+
// Scratch buffer for `ForEachPositionDelete`'s bulk dispatch path;
119+
// reused across batches and across both routing branches.
120+
std::vector<uint32_t> bulk_scratch;
118121

119122
while (true) {
120123
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
@@ -147,7 +150,8 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde
147150
const int64_t* pos_data = Int64ValuesBuffer(pos_view);
148151

149152
if (use_referenced_data_file_fast_path) {
150-
ForEachPositionDelete(std::span<const int64_t>(pos_data, length), index);
153+
ForEachPositionDelete(std::span<const int64_t>(pos_data, length), index,
154+
bulk_scratch);
151155
continue;
152156
}
153157

@@ -160,7 +164,7 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde
160164
positions.push_back(pos_data[i]);
161165
}
162166
}
163-
ForEachPositionDelete(positions, index);
167+
ForEachPositionDelete(positions, index, bulk_scratch);
164168
}
165169

166170
return reader->Close();

src/iceberg/deletes/position_delete_index.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ void PositionDeleteIndex::Merge(const PositionDeleteIndex& other) {
3939
bitmap_.Or(other.bitmap_);
4040
}
4141

42-
void PositionDeleteIndex::BulkAddForKey(int32_t key, const uint32_t* positions,
43-
size_t n) {
44-
bitmap_.AddManyForKey(key, positions, n);
42+
void PositionDeleteIndex::BulkAddForKey(int32_t key,
43+
std::span<const uint32_t> positions) {
44+
bitmap_.AddManyForKey(key, positions);
4545
}
4646

4747
} // namespace iceberg

src/iceberg/deletes/position_delete_index.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <cstdint>
2626
#include <memory>
2727
#include <span>
28+
#include <vector>
2829

2930
#include "iceberg/deletes/roaring_position_bitmap.h"
3031
#include "iceberg/iceberg_data_export.h"
@@ -66,13 +67,14 @@ class ICEBERG_DATA_EXPORT PositionDeleteIndex {
6667
void Merge(const PositionDeleteIndex& other);
6768

6869
private:
69-
// Bulk-add `n` positions sharing high-32-bit `key`. Private hook for
70+
// Bulk-add positions sharing high-32-bit `key`. Private hook for
7071
// `ForEachPositionDelete`'s bulk path; keeps `Delete` the sole public
7172
// mutation surface.
72-
void BulkAddForKey(int32_t key, const uint32_t* positions, size_t n);
73+
void BulkAddForKey(int32_t key, std::span<const uint32_t> positions);
7374

7475
friend void ICEBERG_DATA_EXPORT
75-
ForEachPositionDelete(std::span<const int64_t> positions, PositionDeleteIndex& target);
76+
ForEachPositionDelete(std::span<const int64_t> positions, PositionDeleteIndex& target,
77+
std::vector<uint32_t>& scratch);
7678

7779
RoaringPositionBitmap bitmap_;
7880
};

src/iceberg/deletes/position_delete_range_consumer.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ void CoalesceIntoRanges(std::span<const int64_t> positions, PositionDeleteIndex&
9090
} // namespace
9191

9292
void ForEachPositionDelete(std::span<const int64_t> positions,
93-
PositionDeleteIndex& target) {
93+
PositionDeleteIndex& target, std::vector<uint32_t>& scratch) {
9494
if (positions.empty()) {
9595
return;
9696
}
@@ -116,10 +116,8 @@ void ForEachPositionDelete(std::span<const int64_t> positions,
116116
// boundaries / (sniff - 1) > kBulkThresholdPercent / 100, without FP.
117117
if (boundaries * 100 > (sniff - 1) * kBulkThresholdPercent) {
118118
// Bulk path: group by high-32-bit key, flush each group via CRoaring's
119-
// `addMany` (through `BulkAddForKey`). The thread-local buffer is
120-
// reused across calls; nested invocations on the same thread would
121-
// corrupt it -- see `\warning` on `ForEachPositionDelete`.
122-
thread_local std::vector<uint32_t> bulk_key_positions;
119+
// `addMany` (through `BulkAddForKey`). Reuses the caller-owned `scratch`
120+
// vector across key groups -- cleared between groups, capacity retained.
123121
const size_t n = positions.size();
124122
size_t i = 0;
125123
while (i < n) {
@@ -130,13 +128,13 @@ void ForEachPositionDelete(std::span<const int64_t> positions,
130128
break;
131129
}
132130
const int32_t key = HighKeyFromPosition(positions[i]);
133-
bulk_key_positions.clear();
131+
scratch.clear();
134132
while (i < n && IsValidPosition(positions[i]) &&
135133
HighKeyFromPosition(positions[i]) == key) {
136-
bulk_key_positions.push_back(static_cast<uint32_t>(positions[i] & 0xFFFFFFFFu));
134+
scratch.push_back(static_cast<uint32_t>(positions[i] & 0xFFFFFFFFu));
137135
++i;
138136
}
139-
target.BulkAddForKey(key, bulk_key_positions.data(), bulk_key_positions.size());
137+
target.BulkAddForKey(key, scratch);
140138
}
141139
return;
142140
}

src/iceberg/deletes/position_delete_range_consumer.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <cstdint>
2323
#include <span>
24+
#include <vector>
2425

2526
#include "iceberg/iceberg_data_export.h"
2627

@@ -32,11 +33,12 @@ class PositionDeleteIndex;
3233
/// to calling `target.Delete(pos)` for each entry. Out-of-range positions
3334
/// are silently ignored. Sorted, mostly-contiguous input is fastest.
3435
///
35-
/// \warning Not safe to call recursively or interleaved on the same thread:
36-
/// the bulk dispatch path uses a thread-local staging buffer that a
37-
/// nested invocation would corrupt. Concurrent calls on different
38-
/// threads are safe with disjoint `target`.
36+
/// \param scratch Caller-owned reusable buffer for the bulk dispatch path.
37+
/// Cleared and reused per key group; retain across calls to amortize
38+
/// allocations. Pass a distinct `scratch` per thread when calling
39+
/// concurrently with disjoint `target`.
3940
void ICEBERG_DATA_EXPORT ForEachPositionDelete(std::span<const int64_t> positions,
40-
PositionDeleteIndex& target);
41+
PositionDeleteIndex& target,
42+
std::vector<uint32_t>& scratch);
4143

4244
} // namespace iceberg

src/iceberg/deletes/roaring_position_bitmap.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,10 @@ void RoaringPositionBitmap::Add(int64_t pos) {
105105
impl_->bitmaps[key].add(pos32);
106106
}
107107

108-
void RoaringPositionBitmap::AddManyForKey(int32_t key, const uint32_t* positions,
109-
size_t n) {
108+
void RoaringPositionBitmap::AddManyForKey(int32_t key,
109+
std::span<const uint32_t> positions) {
110110
impl_->AllocateBitmapsIfNeeded(key + 1);
111-
impl_->bitmaps[key].addMany(n, positions);
111+
impl_->bitmaps[key].addMany(positions.size(), positions.data());
112112
}
113113

114114
void RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {

src/iceberg/deletes/roaring_position_bitmap.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <cstdint>
2626
#include <functional>
2727
#include <memory>
28+
#include <span>
2829
#include <string>
2930
#include <string_view>
3031

@@ -116,7 +117,7 @@ class ICEBERG_DATA_EXPORT RoaringPositionBitmap {
116117
// Bulk-add positions sharing high-32-bit `key`. Internal hook for
117118
// `PositionDeleteIndex::BulkAddForKey`; per-key grouping is the caller's
118119
// job, keeping this a thin wrapper around CRoaring's `addMany`.
119-
void AddManyForKey(int32_t key, const uint32_t* positions, size_t n);
120+
void AddManyForKey(int32_t key, std::span<const uint32_t> positions);
120121
friend class PositionDeleteIndex;
121122
};
122123

src/iceberg/test/position_delete_range_consumer_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ std::set<int64_t> ExpectedValidSet(const std::vector<int64_t>& positions) {
4949
// Weaker checks would miss divergences at the 32-bit key boundary.
5050
void AssertMatchesBaseline(const std::vector<int64_t>& positions) {
5151
PositionDeleteIndex index;
52-
ForEachPositionDelete(std::span<const int64_t>(positions), index);
52+
std::vector<uint32_t> scratch;
53+
ForEachPositionDelete(std::span<const int64_t>(positions), index, scratch);
5354
const auto expected = ExpectedValidSet(positions);
5455
ASSERT_EQ(index.Cardinality(), static_cast<int64_t>(expected.size()))
5556
<< "input size=" << positions.size();

0 commit comments

Comments
 (0)