Skip to content

Commit da281d5

Browse files
committed
feat: add roaring-based position bitmap
1 parent 743c318 commit da281d5

File tree

12 files changed

+895
-0
lines changed

12 files changed

+895
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ set(ICEBERG_SOURCES
2525
data/position_delete_writer.cc
2626
data/writer.cc
2727
delete_file_index.cc
28+
deletes/roaring_position_bitmap.cc
2829
expression/aggregate.cc
2930
expression/binder.cc
3031
expression/evaluator.cc
@@ -166,6 +167,7 @@ iceberg_install_all_headers(iceberg)
166167

167168
add_subdirectory(catalog)
168169
add_subdirectory(data)
170+
add_subdirectory(deletes)
169171
add_subdirectory(expression)
170172
add_subdirectory(manifest)
171173
add_subdirectory(puffin)

src/iceberg/deletes/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/deletes)

src/iceberg/deletes/meson.build

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
install_headers(['roaring_position_bitmap.h'], subdir: 'iceberg/deletes')
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/deletes/roaring_position_bitmap.h"
21+
22+
#include <cstring>
23+
#include <exception>
24+
#include <limits>
25+
#include <utility>
26+
#include <vector>
27+
28+
#include <roaring/roaring.hh>
29+
30+
#include "iceberg/util/endian.h"
31+
#include "iceberg/util/macros.h"
32+
33+
namespace iceberg {
34+
35+
namespace {
36+
37+
constexpr size_t kBitmapCountSizeBytes = 8;
38+
constexpr size_t kBitmapKeySizeBytes = 4;
39+
40+
// Extracts high 32 bits from a 64-bit position (the key).
41+
int32_t Key(int64_t pos) { return static_cast<int32_t>(pos >> 32); }
42+
43+
// Extracts low 32 bits from a 64-bit position.
44+
uint32_t Pos32Bits(int64_t pos) { return static_cast<uint32_t>(pos); }
45+
46+
// Combines key (high 32 bits) and pos32 (low 32 bits) into a 64-bit
47+
// position. The low 32 bits are zero-extended to avoid sign extension.
48+
int64_t ToPosition(int32_t key, uint32_t pos32) {
49+
return (static_cast<int64_t>(key) << 32) | static_cast<int64_t>(pos32);
50+
}
51+
52+
void WriteLE64(char* buf, int64_t value) {
53+
auto le = ToLittleEndian(static_cast<uint64_t>(value));
54+
std::memcpy(buf, &le, sizeof(le));
55+
}
56+
57+
void WriteLE32(char* buf, int32_t value) {
58+
auto le = ToLittleEndian(static_cast<uint32_t>(value));
59+
std::memcpy(buf, &le, sizeof(le));
60+
}
61+
62+
int64_t ReadLE64(const char* buf) {
63+
uint64_t v;
64+
std::memcpy(&v, buf, sizeof(v));
65+
return static_cast<int64_t>(FromLittleEndian(v));
66+
}
67+
68+
int32_t ReadLE32(const char* buf) {
69+
uint32_t v;
70+
std::memcpy(&v, buf, sizeof(v));
71+
return static_cast<int32_t>(FromLittleEndian(v));
72+
}
73+
74+
Status ValidatePosition(int64_t pos) {
75+
if (pos < 0 || pos > RoaringPositionBitmap::kMaxPosition) {
76+
return InvalidArgument("Bitmap supports positions that are >= 0 and <= {}: {}",
77+
RoaringPositionBitmap::kMaxPosition, pos);
78+
}
79+
return {};
80+
}
81+
82+
} // namespace
83+
84+
struct RoaringPositionBitmap::Impl {
85+
std::vector<roaring::Roaring> bitmaps;
86+
87+
void AllocateBitmapsIfNeeded(int32_t required_length) {
88+
if (std::cmp_less(bitmaps.size(), required_length)) {
89+
bitmaps.resize(static_cast<size_t>(required_length));
90+
}
91+
}
92+
};
93+
94+
RoaringPositionBitmap::RoaringPositionBitmap() : impl_(std::make_unique<Impl>()) {}
95+
96+
RoaringPositionBitmap::~RoaringPositionBitmap() = default;
97+
98+
RoaringPositionBitmap::RoaringPositionBitmap(RoaringPositionBitmap&&) noexcept = default;
99+
100+
RoaringPositionBitmap& RoaringPositionBitmap::operator=(
101+
RoaringPositionBitmap&&) noexcept = default;
102+
103+
RoaringPositionBitmap::RoaringPositionBitmap(std::unique_ptr<Impl> impl)
104+
: impl_(std::move(impl)) {}
105+
106+
Status RoaringPositionBitmap::Add(int64_t pos) {
107+
ICEBERG_RETURN_UNEXPECTED(ValidatePosition(pos));
108+
int32_t key = Key(pos);
109+
uint32_t pos32 = Pos32Bits(pos);
110+
impl_->AllocateBitmapsIfNeeded(key + 1);
111+
impl_->bitmaps[key].add(pos32);
112+
return {};
113+
}
114+
115+
Status RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {
116+
for (int64_t pos = pos_start; pos < pos_end; ++pos) {
117+
ICEBERG_RETURN_UNEXPECTED(Add(pos));
118+
}
119+
return {};
120+
}
121+
122+
Result<bool> RoaringPositionBitmap::Contains(int64_t pos) const {
123+
ICEBERG_RETURN_UNEXPECTED(ValidatePosition(pos));
124+
int32_t key = Key(pos);
125+
uint32_t pos32 = Pos32Bits(pos);
126+
return std::cmp_less(key, impl_->bitmaps.size()) && impl_->bitmaps[key].contains(pos32);
127+
}
128+
129+
bool RoaringPositionBitmap::IsEmpty() const { return Cardinality() == 0; }
130+
131+
size_t RoaringPositionBitmap::Cardinality() const {
132+
size_t total = 0;
133+
for (const auto& bitmap : impl_->bitmaps) {
134+
total += bitmap.cardinality();
135+
}
136+
return total;
137+
}
138+
139+
void RoaringPositionBitmap::Or(const RoaringPositionBitmap& other) {
140+
impl_->AllocateBitmapsIfNeeded(static_cast<int32_t>(other.impl_->bitmaps.size()));
141+
for (size_t key = 0; key < other.impl_->bitmaps.size(); ++key) {
142+
impl_->bitmaps[key] |= other.impl_->bitmaps[key];
143+
}
144+
}
145+
146+
bool RoaringPositionBitmap::Optimize() {
147+
bool changed = false;
148+
for (auto& bitmap : impl_->bitmaps) {
149+
changed |= bitmap.runOptimize();
150+
}
151+
return changed;
152+
}
153+
154+
void RoaringPositionBitmap::ForEach(const std::function<void(int64_t)>& fn) const {
155+
for (size_t key = 0; key < impl_->bitmaps.size(); ++key) {
156+
for (uint32_t pos32 : impl_->bitmaps[key]) {
157+
fn(ToPosition(static_cast<int32_t>(key), pos32));
158+
}
159+
}
160+
}
161+
162+
size_t RoaringPositionBitmap::SerializedSizeInBytes() const {
163+
size_t size = kBitmapCountSizeBytes;
164+
for (const auto& bitmap : impl_->bitmaps) {
165+
size += kBitmapKeySizeBytes + bitmap.getSizeInBytes(/*portable=*/true);
166+
}
167+
return size;
168+
}
169+
170+
Result<std::string> RoaringPositionBitmap::Serialize() const {
171+
size_t size = SerializedSizeInBytes();
172+
std::string result(size, '\0');
173+
char* buf = result.data();
174+
175+
// Write bitmap count (array length including empties)
176+
WriteLE64(buf, static_cast<int64_t>(impl_->bitmaps.size()));
177+
buf += kBitmapCountSizeBytes;
178+
179+
// Write each bitmap with its key
180+
for (int32_t key = 0; key < static_cast<int32_t>(impl_->bitmaps.size()); ++key) {
181+
WriteLE32(buf, key);
182+
buf += kBitmapKeySizeBytes;
183+
size_t written = impl_->bitmaps[key].write(buf, /*portable=*/true);
184+
buf += written;
185+
}
186+
187+
return result;
188+
}
189+
190+
Result<RoaringPositionBitmap> RoaringPositionBitmap::Deserialize(std::string_view bytes) {
191+
const char* buf = bytes.data();
192+
size_t remaining = bytes.size();
193+
194+
ICEBERG_PRECHECK(remaining >= kBitmapCountSizeBytes,
195+
"Buffer too small for bitmap count: {} bytes", remaining);
196+
197+
int64_t bitmap_count = ReadLE64(buf);
198+
buf += kBitmapCountSizeBytes;
199+
remaining -= kBitmapCountSizeBytes;
200+
201+
ICEBERG_PRECHECK(
202+
bitmap_count >= 0 && bitmap_count <= std::numeric_limits<int32_t>::max(),
203+
"Invalid bitmap count: {}", bitmap_count);
204+
205+
auto impl = std::make_unique<Impl>();
206+
int32_t last_key = -1;
207+
auto remaining_count = static_cast<int32_t>(bitmap_count);
208+
209+
while (remaining_count > 0) {
210+
ICEBERG_PRECHECK(remaining >= kBitmapKeySizeBytes,
211+
"Buffer too small for bitmap key: {} bytes", remaining);
212+
213+
int32_t key = ReadLE32(buf);
214+
buf += kBitmapKeySizeBytes;
215+
remaining -= kBitmapKeySizeBytes;
216+
217+
ICEBERG_PRECHECK(key >= 0, "Invalid unsigned key: {}", key);
218+
ICEBERG_PRECHECK(key < std::numeric_limits<int32_t>::max(), "Key is too large: {}",
219+
key);
220+
ICEBERG_PRECHECK(key > last_key,
221+
"Keys must be sorted in ascending order, got key {} after {}", key,
222+
last_key);
223+
224+
// Fill gaps with empty bitmaps
225+
while (last_key < key - 1) {
226+
impl->bitmaps.emplace_back();
227+
++last_key;
228+
}
229+
230+
// Read bitmap using portable safe deserialization.
231+
// CRoaring's readSafe may throw on corrupted data.
232+
roaring::Roaring bitmap;
233+
try {
234+
bitmap = roaring::Roaring::readSafe(buf, remaining);
235+
} catch (const std::exception& e) {
236+
return InvalidArgument("Failed to deserialize bitmap at key {}: {}", key, e.what());
237+
}
238+
size_t bitmap_size = bitmap.getSizeInBytes(/*portable=*/true);
239+
ICEBERG_PRECHECK(
240+
bitmap_size <= remaining,
241+
"Buffer too small for bitmap key {}: {} bytes needed, {} bytes available", key,
242+
bitmap_size, remaining);
243+
buf += bitmap_size;
244+
remaining -= bitmap_size;
245+
246+
impl->bitmaps.push_back(std::move(bitmap));
247+
last_key = key;
248+
--remaining_count;
249+
}
250+
251+
return RoaringPositionBitmap(std::move(impl));
252+
}
253+
254+
} // namespace iceberg

0 commit comments

Comments
 (0)