Skip to content

Commit a1dc2f8

Browse files
authored
feat(data): add DeleteLoader for position and equality delete files (#610)
- LoadPositionDeletes dispatches to LoadPositionDelete (file-based) and LoadDV (deletion vectors, not yet implemented). - LoadEqualityDeletes reads equality delete files into a StructLikeSet.
1 parent 9190c0a commit a1dc2f8

File tree

8 files changed

+565
-5
lines changed

8 files changed

+565
-5
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ set(ICEBERG_SOURCES
2121
arrow_c_data_guard_internal.cc
2222
catalog/memory/in_memory_catalog.cc
2323
data/data_writer.cc
24+
data/delete_loader.cc
2425
data/equality_delete_writer.cc
2526
data/position_delete_writer.cc
2627
data/writer.cc

src/iceberg/data/delete_loader.cc

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/delete_loader.h"
21+
22+
#include <string>
23+
#include <vector>
24+
25+
#include "iceberg/arrow_c_data_guard_internal.h"
26+
#include "iceberg/deletes/position_delete_index.h"
27+
#include "iceberg/file_reader.h"
28+
#include "iceberg/manifest/manifest_entry.h"
29+
#include "iceberg/metadata_columns.h"
30+
#include "iceberg/row/arrow_array_wrapper.h"
31+
#include "iceberg/schema.h"
32+
#include "iceberg/util/macros.h"
33+
#include "iceberg/util/struct_like_set.h"
34+
35+
namespace iceberg {
36+
37+
namespace {
38+
39+
/// \brief Build the projection schema for reading position delete files.
40+
std::shared_ptr<Schema> PosDeleteSchema() {
41+
return std::make_shared<Schema>(std::vector<SchemaField>{
42+
MetadataColumns::kDeleteFilePath,
43+
MetadataColumns::kDeleteFilePos,
44+
});
45+
}
46+
47+
/// \brief Open a delete file with the given projection schema.
48+
Result<std::unique_ptr<Reader>> OpenDeleteFile(const DataFile& file,
49+
std::shared_ptr<Schema> projection,
50+
const std::shared_ptr<FileIO>& io) {
51+
ReaderOptions options{
52+
.path = file.file_path,
53+
.length = static_cast<size_t>(file.file_size_in_bytes),
54+
.io = io,
55+
.projection = std::move(projection),
56+
};
57+
return ReaderFactoryRegistry::Open(file.file_format, options);
58+
}
59+
60+
} // namespace
61+
62+
DeleteLoader::DeleteLoader(std::shared_ptr<FileIO> io) : io_(std::move(io)) {}
63+
64+
DeleteLoader::~DeleteLoader() = default;
65+
66+
Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteIndex& index,
67+
std::string_view data_file_path) const {
68+
// TODO(gangwu): push down path filter to open the file.
69+
ICEBERG_ASSIGN_OR_RAISE(auto reader, OpenDeleteFile(file, PosDeleteSchema(), io_));
70+
71+
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
72+
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
73+
74+
while (true) {
75+
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
76+
if (!batch_opt.has_value()) break;
77+
78+
auto& batch = batch_opt.value();
79+
internal::ArrowArrayGuard batch_guard(&batch);
80+
81+
ICEBERG_ASSIGN_OR_RAISE(
82+
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
83+
84+
for (int64_t i = 0; i < batch.length; ++i) {
85+
if (i > 0) {
86+
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
87+
}
88+
// Field 0: file_path
89+
ICEBERG_ASSIGN_OR_RAISE(auto path_scalar, row->GetField(0));
90+
auto path = std::get<std::string_view>(path_scalar);
91+
92+
if (path == data_file_path) {
93+
// Field 1: pos
94+
ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, row->GetField(1));
95+
index.Delete(std::get<int64_t>(pos_scalar));
96+
}
97+
}
98+
}
99+
100+
return reader->Close();
101+
}
102+
103+
Status DeleteLoader::LoadDV(const DataFile& file, PositionDeleteIndex& index) const {
104+
return NotSupported("Loading deletion vectors is not yet supported");
105+
}
106+
107+
Result<PositionDeleteIndex> DeleteLoader::LoadPositionDeletes(
108+
std::span<const std::shared_ptr<DataFile>> delete_files,
109+
std::string_view data_file_path) const {
110+
PositionDeleteIndex index;
111+
112+
for (const auto& file : delete_files) {
113+
if (file->referenced_data_file.has_value() &&
114+
file->referenced_data_file.value() != data_file_path) {
115+
continue;
116+
}
117+
118+
if (file->IsDeletionVector()) {
119+
ICEBERG_RETURN_UNEXPECTED(LoadDV(*file, index));
120+
continue;
121+
}
122+
123+
ICEBERG_PRECHECK(file->content == DataFile::Content::kPositionDeletes,
124+
"Expected position delete file but got content type {}",
125+
ToString(file->content));
126+
127+
ICEBERG_RETURN_UNEXPECTED(LoadPositionDelete(*file, index, data_file_path));
128+
}
129+
130+
return index;
131+
}
132+
133+
Result<std::unique_ptr<UncheckedStructLikeSet>> DeleteLoader::LoadEqualityDeletes(
134+
std::span<const std::shared_ptr<DataFile>> delete_files,
135+
const StructType& equality_type) const {
136+
auto eq_set = std::make_unique<UncheckedStructLikeSet>(equality_type);
137+
138+
std::shared_ptr<Schema> projection = equality_type.ToSchema();
139+
140+
for (const auto& file : delete_files) {
141+
ICEBERG_PRECHECK(file->content == DataFile::Content::kEqualityDeletes,
142+
"Expected equality delete file but got content type {}",
143+
static_cast<int>(file->content));
144+
145+
ICEBERG_ASSIGN_OR_RAISE(auto reader, OpenDeleteFile(*file, projection, io_));
146+
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
147+
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
148+
149+
while (true) {
150+
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
151+
if (!batch_opt.has_value()) break;
152+
153+
auto& batch = batch_opt.value();
154+
internal::ArrowArrayGuard batch_guard(&batch);
155+
156+
ICEBERG_ASSIGN_OR_RAISE(
157+
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
158+
159+
for (int64_t i = 0; i < batch.length; ++i) {
160+
if (i > 0) {
161+
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
162+
}
163+
ICEBERG_RETURN_UNEXPECTED(eq_set->Insert(*row));
164+
}
165+
}
166+
167+
ICEBERG_RETURN_UNEXPECTED(reader->Close());
168+
}
169+
170+
return eq_set;
171+
}
172+
173+
} // namespace iceberg

src/iceberg/data/delete_loader.h

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/data/delete_loader.h
23+
/// Loads position and equality delete files into in-memory indexes.
24+
25+
#include <memory>
26+
#include <span>
27+
#include <string_view>
28+
29+
#include "iceberg/iceberg_export.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/type_fwd.h"
32+
33+
namespace iceberg {
34+
35+
/// \brief Loads delete files and constructs in-memory delete indexes.
36+
class ICEBERG_EXPORT DeleteLoader {
37+
public:
38+
/// \brief Create a DeleteLoader.
39+
/// \param io FileIO instance for reading delete files
40+
explicit DeleteLoader(std::shared_ptr<FileIO> io);
41+
42+
~DeleteLoader();
43+
44+
/// \brief Load position deletes for a specific data file.
45+
///
46+
/// Reads the given position delete files and returns a PositionDeleteIndex
47+
/// containing only the positions that apply to the specified data file path.
48+
/// Supports both regular position delete files and deletion vectors.
49+
///
50+
/// \param delete_files Position delete files to load (must have
51+
/// content == Content::kPositionDeletes)
52+
/// \param data_file_path Path of the data file to filter positions for
53+
/// \return A PositionDeleteIndex with deleted positions for the data file
54+
Result<PositionDeleteIndex> LoadPositionDeletes(
55+
std::span<const std::shared_ptr<DataFile>> delete_files,
56+
std::string_view data_file_path) const;
57+
58+
/// \brief Load equality deletes into an in-memory set.
59+
///
60+
/// \param delete_files Equality delete files to load (must have
61+
/// content == Content::kEqualityDeletes)
62+
/// \param equality_type The struct type describing the equality columns
63+
/// \return A StructLikeSet containing the deleted rows
64+
Result<std::unique_ptr<UncheckedStructLikeSet>> LoadEqualityDeletes(
65+
std::span<const std::shared_ptr<DataFile>> delete_files,
66+
const StructType& equality_type) const;
67+
68+
private:
69+
/// \brief Load a single position delete file into the index.
70+
Status LoadPositionDelete(const DataFile& file, PositionDeleteIndex& index,
71+
std::string_view data_file_path) const;
72+
73+
/// \brief Load a single deletion vector file into the index.
74+
Status LoadDV(const DataFile& file, PositionDeleteIndex& index) const;
75+
76+
std::shared_ptr<FileIO> io_;
77+
};
78+
79+
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ iceberg_include_dir = include_directories('..')
4242
iceberg_sources = files(
4343
'arrow_c_data_guard_internal.cc',
4444
'catalog/memory/in_memory_catalog.cc',
45+
'data/data_writer.cc',
46+
'data/delete_loader.cc',
47+
'data/equality_delete_writer.cc',
48+
'data/position_delete_writer.cc',
49+
'data/writer.cc',
4550
'delete_file_index.cc',
4651
'deletes/position_delete_index.cc',
4752
'deletes/roaring_position_bitmap.cc',

src/iceberg/test/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,11 @@ if(ICEBERG_BUILD_BUNDLE)
194194
update_sort_order_test.cc
195195
update_statistics_test.cc)
196196

197-
add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
197+
add_iceberg_test(data_test
198+
USE_BUNDLE
199+
SOURCES
200+
data_writer_test.cc
201+
delete_loader_test.cc)
198202

199203
endif()
200204

0 commit comments

Comments
 (0)