Skip to content

Commit a9404b9

Browse files
committed
feat(data): coalesce position deletes into range inserts
Add ForEachPositionDelete (the C++ equivalent of Java's PositionDeleteRangeConsumer) and route DeleteLoader through it, replacing the per-position PositionDeleteIndex::Delete(pos) call. The function sniffs a 1024-position prefix and dispatches to either run coalescing (CRoaring addRange) or bulk addMany grouped by high-32-bit key. Also rework DeleteLoader::LoadPositionDelete to read Arrow batches via nanoarrow's ArrowArrayView directly. When the delete file's referenced_data_file matches the target (V2 writer hint), positions are passed as a zero-copy span; otherwise a per-batch staging vector filters by path. Local microbenchmarks: 2.2x-10.6x for ForEachPositionDelete and 2.1x-2.5x end-to-end through LoadPositionDeletes. Equivalent of apache/iceberg#16052.
1 parent 97ea870 commit a9404b9

12 files changed

Lines changed: 475 additions & 14 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ set(ICEBERG_DATA_SOURCES
171171
data/position_delete_writer.cc
172172
data/writer.cc
173173
deletes/position_delete_index.cc
174+
deletes/position_delete_range_consumer.cc
174175
deletes/roaring_position_bitmap.cc
175176
puffin/file_metadata.cc
176177
puffin/json_serde.cc

src/iceberg/data/delete_loader.cc

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,21 @@
1919

2020
#include "iceberg/data/delete_loader.h"
2121

22+
#include <cstring>
23+
#include <span>
2224
#include <string>
2325
#include <vector>
2426

27+
#include <nanoarrow/nanoarrow.h>
28+
29+
#include "iceberg/arrow/nanoarrow_status_internal.h"
2530
#include "iceberg/arrow_c_data_guard_internal.h"
2631
#include "iceberg/deletes/position_delete_index.h"
32+
#include "iceberg/deletes/position_delete_range_consumer.h"
2733
#include "iceberg/file_reader.h"
2834
#include "iceberg/manifest/manifest_entry.h"
2935
#include "iceberg/metadata_columns.h"
36+
#include "iceberg/result.h"
3037
#include "iceberg/row/arrow_array_wrapper.h"
3138
#include "iceberg/schema.h"
3239
#include "iceberg/util/macros.h"
@@ -57,6 +64,25 @@ Result<std::unique_ptr<Reader>> OpenDeleteFile(const DataFile& file,
5764
return ReaderFactoryRegistry::Open(file.file_format, options);
5865
}
5966

67+
/// Raw `int64` values buffer (offset-adjusted). Skips the validity bitmap:
68+
/// `kDeleteFilePos` is required by the V2 spec.
69+
const int64_t* Int64ValuesBuffer(const ArrowArrayView* view) {
70+
return view->buffer_views[1].data.as_int64 + view->offset;
71+
}
72+
73+
/// String-equals at `row_idx` via nanoarrow's unsafe direct-buffer access.
74+
/// Skips the validity bitmap: `kDeleteFilePath` is required by the V2 spec.
75+
bool StringEquals(const ArrowArrayView* view, int64_t row_idx, std::string_view target) {
76+
ArrowStringView sv = ArrowArrayViewGetStringUnsafe(view, row_idx);
77+
if (static_cast<size_t>(sv.size_bytes) != target.size()) {
78+
return false;
79+
}
80+
if (target.empty()) {
81+
return true;
82+
}
83+
return sv.data != nullptr && std::memcmp(sv.data, target.data(), target.size()) == 0;
84+
}
85+
6086
} // namespace
6187

6288
DeleteLoader::DeleteLoader(std::shared_ptr<FileIO> io) : io_(std::move(io)) {}
@@ -71,30 +97,70 @@ Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteInde
7197
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
7298
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
7399

100+
// Reused across batches; reads child buffers directly to avoid the
101+
// per-row `Scalar` dispatch in `ArrowArrayStructLike`.
102+
ArrowArrayView array_view;
103+
internal::ArrowArrayViewGuard view_guard(&array_view);
104+
ArrowError error;
105+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
106+
ArrowArrayViewInitFromSchema(&array_view, &arrow_schema, &error), error);
107+
108+
// Fast path when the writer's `referenced_data_file` hint matches our
109+
// target: skip the path column, hand `pos_data` straight to
110+
// `ForEachPositionDelete`. Trusts the hint -- spec-compliant writers
111+
// only set it when all rows share one data file.
112+
const bool use_referenced_data_file_fast_path =
113+
file.referenced_data_file.has_value() &&
114+
file.referenced_data_file.value() == data_file_path;
115+
116+
// Filter-path staging buffer; reused across batches via `clear()`.
117+
std::vector<int64_t> positions;
118+
74119
while (true) {
75120
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
76121
if (!batch_opt.has_value()) break;
77122

78123
auto& batch = batch_opt.value();
79124
internal::ArrowArrayGuard batch_guard(&batch);
80125

81-
ICEBERG_ASSIGN_OR_RAISE(
82-
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
126+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
127+
ArrowArrayViewSetArray(&array_view, &batch, &error), error);
83128

84-
for (int64_t i = 0; i < batch.length; ++i) {
85-
if (i > 0) {
86-
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
87-
}
88-
// Field 0: file_path
89-
ICEBERG_ASSIGN_OR_RAISE(auto path_scalar, row->GetField(0));
90-
auto path = std::get<std::string_view>(path_scalar);
91-
92-
if (path == data_file_path) {
93-
// Field 1: pos
94-
ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, row->GetField(1));
95-
index.Delete(std::get<int64_t>(pos_scalar));
129+
const int64_t length = batch.length;
130+
if (length <= 0) {
131+
continue;
132+
}
133+
134+
// Child indices must match `PosDeleteSchema()`: 0 = file_path, 1 = pos.
135+
const ArrowArrayView* path_view = array_view.children[0];
136+
const ArrowArrayView* pos_view = array_view.children[1];
137+
138+
// V2 spec marks pos and file_path as required (NOT NULL). The direct
139+
// buffer access below skips the validity bitmap, so a non-compliant
140+
// batch would silently corrupt the index. Fail fast instead.
141+
if (ArrowArrayViewComputeNullCount(pos_view) != 0 ||
142+
ArrowArrayViewComputeNullCount(path_view) != 0) {
143+
return InvalidArrowData(
144+
"position delete file has null values in required pos/file_path columns");
145+
}
146+
147+
const int64_t* pos_data = Int64ValuesBuffer(pos_view);
148+
149+
if (use_referenced_data_file_fast_path) {
150+
ForEachPositionDelete(std::span<const int64_t>(pos_data, length), index);
151+
continue;
152+
}
153+
154+
positions.clear();
155+
if (positions.capacity() < static_cast<size_t>(length)) {
156+
positions.reserve(static_cast<size_t>(length));
157+
}
158+
for (int64_t i = 0; i < length; ++i) {
159+
if (StringEquals(path_view, i, data_file_path)) {
160+
positions.push_back(pos_data[i]);
96161
}
97162
}
163+
ForEachPositionDelete(positions, index);
98164
}
99165

100166
return reader->Close();

src/iceberg/deletes/position_delete_index.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,9 @@ void PositionDeleteIndex::Merge(const PositionDeleteIndex& other) {
3939
bitmap_.Or(other.bitmap_);
4040
}
4141

42+
void PositionDeleteIndex::BulkAddForKey(int32_t key, const uint32_t* positions,
43+
size_t n) {
44+
bitmap_.AddManyForKey(key, positions, n);
45+
}
46+
4247
} // namespace iceberg

src/iceberg/deletes/position_delete_index.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <cstdint>
2626
#include <memory>
27+
#include <span>
2728

2829
#include "iceberg/deletes/roaring_position_bitmap.h"
2930
#include "iceberg/iceberg_data_export.h"
@@ -65,6 +66,14 @@ class ICEBERG_DATA_EXPORT PositionDeleteIndex {
6566
void Merge(const PositionDeleteIndex& other);
6667

6768
private:
69+
// Bulk-add `n` positions sharing high-32-bit `key`. Private hook for
70+
// `ForEachPositionDelete`'s bulk path; keeps `Delete` the sole public
71+
// mutation surface.
72+
void BulkAddForKey(int32_t key, const uint32_t* positions, size_t n);
73+
74+
friend void ICEBERG_DATA_EXPORT
75+
ForEachPositionDelete(std::span<const int64_t> positions, PositionDeleteIndex& target);
76+
6877
RoaringPositionBitmap bitmap_;
6978
};
7079

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/deletes/position_delete_range_consumer.h"
21+
22+
#include <algorithm>
23+
#include <cstdint>
24+
#include <span>
25+
#include <vector>
26+
27+
#include "iceberg/deletes/position_delete_index.h"
28+
#include "iceberg/deletes/roaring_position_bitmap.h"
29+
30+
namespace iceberg {
31+
32+
namespace {
33+
34+
bool IsValidPosition(int64_t pos) {
35+
return pos >= 0 && pos <= RoaringPositionBitmap::kMaxPosition;
36+
}
37+
38+
// Unsigned subtraction so negative or wrap-around input can't
39+
// false-positive via signed overflow.
40+
bool IsAdjacent(int64_t prev, int64_t next) {
41+
return (static_cast<uint64_t>(next) - static_cast<uint64_t>(prev)) == 1;
42+
}
43+
44+
// `RoaringPositionBitmap` shards positions by their high 32 bits; the
45+
// bulk path groups by this key before flushing via `BulkAddForKey`.
46+
int32_t HighKeyFromPosition(int64_t pos) { return static_cast<int32_t>(pos >> 32); }
47+
48+
// Emit `[range_start, last_position]`, collapsing singletons. Callers
49+
// pre-filter via `IsValidPosition`, so `last_position + 1` cannot overflow.
50+
void EmitRange(PositionDeleteIndex& target, int64_t range_start, int64_t last_position) {
51+
if (range_start == last_position) {
52+
target.Delete(range_start);
53+
} else {
54+
target.Delete(range_start, last_position + 1);
55+
}
56+
}
57+
58+
// Emit closed-interval runs; out-of-range positions are silently skipped
59+
// to match `Delete(pos)`.
60+
void CoalesceIntoRanges(std::span<const int64_t> positions, PositionDeleteIndex& target) {
61+
const size_t n = positions.size();
62+
63+
size_t i = 0;
64+
while (i < n && !IsValidPosition(positions[i])) {
65+
++i;
66+
}
67+
if (i == n) {
68+
return;
69+
}
70+
71+
int64_t range_start = positions[i];
72+
int64_t last_position = range_start;
73+
++i;
74+
75+
for (; i < n; ++i) {
76+
const int64_t pos = positions[i];
77+
if (!IsValidPosition(pos)) {
78+
continue;
79+
}
80+
if (!IsAdjacent(last_position, pos)) {
81+
EmitRange(target, range_start, last_position);
82+
range_start = pos;
83+
}
84+
last_position = pos;
85+
}
86+
87+
EmitRange(target, range_start, last_position);
88+
}
89+
90+
} // namespace
91+
92+
void ForEachPositionDelete(std::span<const int64_t> positions,
93+
PositionDeleteIndex& target) {
94+
if (positions.empty()) {
95+
return;
96+
}
97+
98+
// Below this size the bulk path's fixed overhead beats any coalescing win.
99+
constexpr size_t kMinSniffSize = 64;
100+
if (positions.size() < kMinSniffSize) {
101+
CoalesceIntoRanges(positions, target);
102+
return;
103+
}
104+
105+
// Bounded prefix size for the boundary-density estimate.
106+
constexpr size_t kSniffSize = 1024;
107+
// Above this boundary density take the bulk path; below it stay on coalesce.
108+
constexpr size_t kBulkThresholdPercent = 10;
109+
110+
const size_t sniff = std::min(positions.size(), kSniffSize);
111+
size_t boundaries = 0;
112+
for (size_t i = 1; i < sniff; ++i) {
113+
boundaries += static_cast<size_t>(!IsAdjacent(positions[i - 1], positions[i]));
114+
}
115+
116+
// boundaries / (sniff - 1) > kBulkThresholdPercent / 100, without FP.
117+
if (boundaries * 100 > (sniff - 1) * kBulkThresholdPercent) {
118+
// Bulk path: group by high-32-bit key, flush each group via CRoaring's
119+
// `addMany` (through `BulkAddForKey`). The thread-local buffer is
120+
// reused across calls; nested invocations on the same thread would
121+
// corrupt it -- see `\warning` on `ForEachPositionDelete`.
122+
thread_local std::vector<uint32_t> bulk_key_positions;
123+
const size_t n = positions.size();
124+
size_t i = 0;
125+
while (i < n) {
126+
while (i < n && !IsValidPosition(positions[i])) {
127+
++i;
128+
}
129+
if (i == n) {
130+
break;
131+
}
132+
const int32_t key = HighKeyFromPosition(positions[i]);
133+
bulk_key_positions.clear();
134+
while (i < n && IsValidPosition(positions[i]) &&
135+
HighKeyFromPosition(positions[i]) == key) {
136+
bulk_key_positions.push_back(static_cast<uint32_t>(positions[i] & 0xFFFFFFFFu));
137+
++i;
138+
}
139+
target.BulkAddForKey(key, bulk_key_positions.data(), bulk_key_positions.size());
140+
}
141+
return;
142+
}
143+
144+
CoalesceIntoRanges(positions, target);
145+
}
146+
147+
} // namespace iceberg
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <cstdint>
23+
#include <span>
24+
25+
#include "iceberg/iceberg_data_export.h"
26+
27+
namespace iceberg {
28+
29+
class PositionDeleteIndex;
30+
31+
/// \brief Apply `positions` to `target` as deletes; semantically equivalent
32+
/// to calling `target.Delete(pos)` for each entry. Out-of-range positions
33+
/// are silently ignored. Sorted, mostly-contiguous input is fastest.
34+
///
35+
/// \warning Not safe to call recursively or interleaved on the same thread:
36+
/// the bulk dispatch path uses a thread-local staging buffer that a
37+
/// nested invocation would corrupt. Concurrent calls on different
38+
/// threads are safe with disjoint `target`.
39+
void ICEBERG_DATA_EXPORT ForEachPositionDelete(std::span<const int64_t> positions,
40+
PositionDeleteIndex& target);
41+
42+
} // namespace iceberg

src/iceberg/deletes/roaring_position_bitmap.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ void RoaringPositionBitmap::Add(int64_t pos) {
105105
impl_->bitmaps[key].add(pos32);
106106
}
107107

108+
void RoaringPositionBitmap::AddManyForKey(int32_t key, const uint32_t* positions,
109+
size_t n) {
110+
impl_->AllocateBitmapsIfNeeded(key + 1);
111+
impl_->bitmaps[key].addMany(n, positions);
112+
}
113+
108114
void RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {
109115
pos_start = std::max(pos_start, int64_t{0});
110116
pos_end = std::min(pos_end, kMaxPosition + 1);

src/iceberg/deletes/roaring_position_bitmap.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
namespace iceberg {
3535

36+
class PositionDeleteIndex;
37+
3638
/// \brief A bitmap that supports positive 64-bit positions, optimized
3739
/// for cases where most positions fit in 32 bits.
3840
///
@@ -110,6 +112,12 @@ class ICEBERG_DATA_EXPORT RoaringPositionBitmap {
110112
std::unique_ptr<Impl> impl_;
111113

112114
explicit RoaringPositionBitmap(std::unique_ptr<Impl> impl);
115+
116+
// Bulk-add positions sharing high-32-bit `key`. Internal hook for
117+
// `PositionDeleteIndex::BulkAddForKey`; per-key grouping is the caller's
118+
// job, keeping this a thin wrapper around CRoaring's `addMany`.
119+
void AddManyForKey(int32_t key, const uint32_t* positions, size_t n);
120+
friend class PositionDeleteIndex;
113121
};
114122

115123
} // namespace iceberg

0 commit comments

Comments
 (0)