Skip to content

Commit 11497ae

Browse files
committed
feat: add roaring-based position bitmap
1 parent 4db7c67 commit 11497ae

File tree

12 files changed

+950
-0
lines changed

12 files changed

+950
-0
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ set(ICEBERG_SOURCES
2525
data/position_delete_writer.cc
2626
data/writer.cc
2727
delete_file_index.cc
28+
deletes/roaring_position_bitmap.cc
2829
expression/aggregate.cc
2930
expression/binder.cc
3031
expression/evaluator.cc
@@ -165,6 +166,7 @@ iceberg_install_all_headers(iceberg)
165166

166167
add_subdirectory(catalog)
167168
add_subdirectory(data)
169+
add_subdirectory(deletes)
168170
add_subdirectory(expression)
169171
add_subdirectory(manifest)
170172
add_subdirectory(row)

src/iceberg/deletes/CMakeLists.txt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
iceberg_install_all_headers(iceberg/deletes)

src/iceberg/deletes/meson.build

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
install_headers(['roaring_position_bitmap.h'], subdir: 'iceberg/deletes')
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/deletes/roaring_position_bitmap.h"
21+
22+
#include <cstdint>
23+
#include <exception>
24+
#include <limits>
25+
#include <string>
26+
#include <string_view>
27+
#include <utility>
28+
#include <vector>
29+
30+
#include "roaring/roaring.hh"
31+
32+
namespace iceberg {
33+
34+
namespace {
35+
36+
constexpr size_t kBitmapCountSizeBytes = 8;
37+
constexpr size_t kBitmapKeySizeBytes = 4;
38+
39+
// Extracts high 32 bits from a 64-bit position (the key).
40+
int32_t Key(int64_t pos) { return static_cast<int32_t>(pos >> 32); }
41+
42+
// Extracts low 32 bits from a 64-bit position.
43+
uint32_t Pos32Bits(int64_t pos) { return static_cast<uint32_t>(pos); }
44+
45+
// Combines key (high 32 bits) and pos32 (low 32 bits) into a 64-bit
46+
// position. The low 32 bits are zero-extended to avoid sign extension.
47+
int64_t ToPosition(int32_t key, uint32_t pos32) {
48+
return (static_cast<int64_t>(key) << 32) | static_cast<int64_t>(pos32);
49+
}
50+
51+
void WriteLE64(char* buf, int64_t value) {
52+
auto v = static_cast<uint64_t>(value);
53+
for (int i = 0; i < 8; ++i) {
54+
buf[i] = static_cast<char>((v >> (i * 8)) & 0xFF);
55+
}
56+
}
57+
58+
void WriteLE32(char* buf, int32_t value) {
59+
auto v = static_cast<uint32_t>(value);
60+
for (int i = 0; i < 4; ++i) {
61+
buf[i] = static_cast<char>((v >> (i * 8)) & 0xFF);
62+
}
63+
}
64+
65+
int64_t ReadLE64(const char* buf) {
66+
auto b = reinterpret_cast<const uint8_t*>(buf);
67+
uint64_t v = 0;
68+
for (int i = 0; i < 8; ++i) {
69+
v |= static_cast<uint64_t>(b[i]) << (i * 8);
70+
}
71+
return static_cast<int64_t>(v);
72+
}
73+
74+
int32_t ReadLE32(const char* buf) {
75+
auto b = reinterpret_cast<const uint8_t*>(buf);
76+
uint32_t v = 0;
77+
for (int i = 0; i < 4; ++i) {
78+
v |= static_cast<uint32_t>(b[i]) << (i * 8);
79+
}
80+
return static_cast<int32_t>(v);
81+
}
82+
83+
Status ValidatePosition(int64_t pos) {
84+
if (pos < 0 || pos > RoaringPositionBitmap::kMaxPosition) {
85+
return InvalidArgument("Bitmap supports positions that are >= 0 and <= {}: {}",
86+
RoaringPositionBitmap::kMaxPosition, pos);
87+
}
88+
return {};
89+
}
90+
91+
} // namespace
92+
93+
struct RoaringPositionBitmap::Impl {
94+
std::vector<roaring::Roaring> bitmaps;
95+
96+
void AllocateBitmapsIfNeeded(int32_t required_length) {
97+
if (static_cast<int32_t>(bitmaps.size()) < required_length) {
98+
bitmaps.resize(static_cast<size_t>(required_length));
99+
}
100+
}
101+
};
102+
103+
RoaringPositionBitmap::RoaringPositionBitmap() : impl_(std::make_unique<Impl>()) {}
104+
105+
RoaringPositionBitmap::~RoaringPositionBitmap() = default;
106+
107+
RoaringPositionBitmap::RoaringPositionBitmap(RoaringPositionBitmap&&) noexcept = default;
108+
109+
RoaringPositionBitmap& RoaringPositionBitmap::operator=(
110+
RoaringPositionBitmap&&) noexcept = default;
111+
112+
RoaringPositionBitmap::RoaringPositionBitmap(std::unique_ptr<Impl> impl)
113+
: impl_(std::move(impl)) {}
114+
115+
Status RoaringPositionBitmap::Add(int64_t pos) {
116+
if (auto status = ValidatePosition(pos); !status) {
117+
return status;
118+
}
119+
int32_t key = Key(pos);
120+
uint32_t pos32 = Pos32Bits(pos);
121+
impl_->AllocateBitmapsIfNeeded(key + 1);
122+
impl_->bitmaps[key].add(pos32);
123+
return {};
124+
}
125+
126+
Status RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {
127+
for (int64_t pos = pos_start; pos < pos_end; ++pos) {
128+
if (auto status = Add(pos); !status) {
129+
return status;
130+
}
131+
}
132+
return {};
133+
}
134+
135+
Result<bool> RoaringPositionBitmap::Contains(int64_t pos) const {
136+
if (auto status = ValidatePosition(pos); !status) {
137+
return std::unexpected(status.error());
138+
}
139+
int32_t key = Key(pos);
140+
uint32_t pos32 = Pos32Bits(pos);
141+
return key < static_cast<int32_t>(impl_->bitmaps.size()) &&
142+
impl_->bitmaps[key].contains(pos32);
143+
}
144+
145+
bool RoaringPositionBitmap::IsEmpty() const { return Cardinality() == 0; }
146+
147+
int64_t RoaringPositionBitmap::Cardinality() const {
148+
int64_t total = 0;
149+
for (const auto& bitmap : impl_->bitmaps) {
150+
total += static_cast<int64_t>(bitmap.cardinality());
151+
}
152+
return total;
153+
}
154+
155+
void RoaringPositionBitmap::Or(const RoaringPositionBitmap& other) {
156+
impl_->AllocateBitmapsIfNeeded(static_cast<int32_t>(other.impl_->bitmaps.size()));
157+
for (size_t key = 0; key < other.impl_->bitmaps.size(); ++key) {
158+
impl_->bitmaps[key] |= other.impl_->bitmaps[key];
159+
}
160+
}
161+
162+
bool RoaringPositionBitmap::RunLengthEncode() {
163+
bool changed = false;
164+
for (auto& bitmap : impl_->bitmaps) {
165+
changed |= bitmap.runOptimize();
166+
}
167+
return changed;
168+
}
169+
170+
void RoaringPositionBitmap::ForEach(const std::function<void(int64_t)>& fn) const {
171+
for (size_t key = 0; key < impl_->bitmaps.size(); ++key) {
172+
for (uint32_t pos32 : impl_->bitmaps[key]) {
173+
fn(ToPosition(static_cast<int32_t>(key), pos32));
174+
}
175+
}
176+
}
177+
178+
int64_t RoaringPositionBitmap::SerializedSizeInBytes() const {
179+
int64_t size = static_cast<int64_t>(kBitmapCountSizeBytes);
180+
for (const auto& bitmap : impl_->bitmaps) {
181+
size += static_cast<int64_t>(kBitmapKeySizeBytes) +
182+
static_cast<int64_t>(bitmap.getSizeInBytes(/*portable=*/true));
183+
}
184+
return size;
185+
}
186+
187+
Result<std::string> RoaringPositionBitmap::Serialize() const {
188+
int64_t size = SerializedSizeInBytes();
189+
std::string result(static_cast<size_t>(size), '\0');
190+
char* buf = result.data();
191+
192+
// Write bitmap count (array length including empties)
193+
WriteLE64(buf, static_cast<int64_t>(impl_->bitmaps.size()));
194+
buf += kBitmapCountSizeBytes;
195+
196+
// Write each bitmap with its key
197+
for (int32_t key = 0; key < static_cast<int32_t>(impl_->bitmaps.size()); ++key) {
198+
WriteLE32(buf, key);
199+
buf += kBitmapKeySizeBytes;
200+
size_t written = impl_->bitmaps[key].write(buf, /*portable=*/true);
201+
buf += written;
202+
}
203+
204+
return result;
205+
}
206+
207+
Result<RoaringPositionBitmap> RoaringPositionBitmap::Deserialize(std::string_view bytes) {
208+
const char* buf = bytes.data();
209+
size_t remaining = bytes.size();
210+
211+
if (remaining < kBitmapCountSizeBytes) {
212+
return InvalidArgument("Buffer too small for bitmap count");
213+
}
214+
215+
int64_t bitmap_count = ReadLE64(buf);
216+
buf += kBitmapCountSizeBytes;
217+
remaining -= kBitmapCountSizeBytes;
218+
219+
if (bitmap_count < 0 || bitmap_count > std::numeric_limits<int32_t>::max()) {
220+
return InvalidArgument("Invalid bitmap count: {}", bitmap_count);
221+
}
222+
223+
auto impl = std::make_unique<Impl>();
224+
int32_t last_key = -1;
225+
int32_t remaining_count = static_cast<int32_t>(bitmap_count);
226+
227+
while (remaining_count > 0) {
228+
if (remaining < kBitmapKeySizeBytes) {
229+
return InvalidArgument("Buffer too small for bitmap key");
230+
}
231+
232+
int32_t key = ReadLE32(buf);
233+
buf += kBitmapKeySizeBytes;
234+
remaining -= kBitmapKeySizeBytes;
235+
236+
// Validate key (matches Java's readKey)
237+
if (key < 0) {
238+
return InvalidArgument("Invalid unsigned key: {}", key);
239+
}
240+
if (key > std::numeric_limits<int32_t>::max() - 1) {
241+
return InvalidArgument("Key is too large: {}", key);
242+
}
243+
if (key <= last_key) {
244+
return InvalidArgument("Keys must be sorted in ascending order");
245+
}
246+
247+
// Fill gaps with empty bitmaps
248+
while (last_key < key - 1) {
249+
impl->bitmaps.emplace_back();
250+
++last_key;
251+
}
252+
253+
// Read bitmap using portable safe deserialization.
254+
// CRoaring's readSafe may throw on corrupted data.
255+
roaring::Roaring bitmap;
256+
try {
257+
bitmap = roaring::Roaring::readSafe(buf, remaining);
258+
} catch (const std::exception& e) {
259+
return InvalidArgument("Failed to deserialize bitmap at key {}: {}", key, e.what());
260+
}
261+
size_t bitmap_size = bitmap.getSizeInBytes(/*portable=*/true);
262+
if (bitmap_size > remaining) {
263+
return InvalidArgument("Buffer too small for bitmap data at key {}", key);
264+
}
265+
buf += bitmap_size;
266+
remaining -= bitmap_size;
267+
268+
impl->bitmaps.push_back(std::move(bitmap));
269+
last_key = key;
270+
--remaining_count;
271+
}
272+
273+
return RoaringPositionBitmap(std::move(impl));
274+
}
275+
276+
} // namespace iceberg

0 commit comments

Comments
 (0)