Skip to content

Commit fe8636b

Browse files
committed
feat(data): add DeleteLoader for position and equality delete files
- LoadPositionDeletes dispatches to LoadPositionDelete (file-based) and LoadDV (deletion vectors, not yet implemented). - LoadEqualityDeletes reads equality delete files into a StructLikeSet.
1 parent 36b5887 commit fe8636b

File tree

6 files changed

+538
-1
lines changed

6 files changed

+538
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ set(ICEBERG_SOURCES
2525
data/position_delete_writer.cc
2626
data/writer.cc
2727
delete_file_index.cc
28+
data/delete_loader.cc
2829
deletes/roaring_position_bitmap.cc
2930
deletes/position_delete_index.cc
3031
expression/aggregate.cc

src/iceberg/data/delete_loader.cc

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/data/delete_loader.h"
21+
22+
#include <string>
23+
#include <vector>
24+
25+
#include "iceberg/arrow_c_data_guard_internal.h"
26+
#include "iceberg/deletes/position_delete_index.h"
27+
#include "iceberg/file_reader.h"
28+
#include "iceberg/manifest/manifest_entry.h"
29+
#include "iceberg/metadata_columns.h"
30+
#include "iceberg/row/arrow_array_wrapper.h"
31+
#include "iceberg/schema.h"
32+
#include "iceberg/util/macros.h"
33+
#include "iceberg/util/struct_like_set.h"
34+
35+
namespace iceberg {
36+
37+
namespace {
38+
39+
/// \brief Build the projection schema for reading position delete files.
40+
std::shared_ptr<Schema> PosDeleteSchema() {
41+
return std::make_shared<Schema>(std::vector<SchemaField>{
42+
MetadataColumns::kDeleteFilePath,
43+
MetadataColumns::kDeleteFilePos,
44+
});
45+
}
46+
47+
/// \brief Open a delete file with the given projection schema.
48+
Result<std::unique_ptr<Reader>> OpenDeleteFile(const DataFile& file,
49+
std::shared_ptr<Schema> projection,
50+
const std::shared_ptr<FileIO>& io) {
51+
ReaderOptions options{
52+
.path = file.file_path,
53+
.length = static_cast<size_t>(file.file_size_in_bytes),
54+
.io = io,
55+
.projection = std::move(projection),
56+
};
57+
return ReaderFactoryRegistry::Open(file.file_format, options);
58+
}
59+
60+
} // namespace
61+
62+
DeleteLoader::DeleteLoader(std::shared_ptr<FileIO> io) : io_(std::move(io)) {}
63+
64+
DeleteLoader::~DeleteLoader() = default;
65+
66+
Status DeleteLoader::LoadPositionDelete(const DataFile& file, PositionDeleteIndex& index,
67+
std::string_view data_file_path) const {
68+
// TODO(gangwu): push down path filter to open the file.
69+
ICEBERG_ASSIGN_OR_RAISE(auto reader, OpenDeleteFile(file, PosDeleteSchema(), io_));
70+
71+
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
72+
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
73+
74+
while (true) {
75+
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
76+
if (!batch_opt.has_value()) break;
77+
78+
auto& batch = batch_opt.value();
79+
internal::ArrowArrayGuard batch_guard(&batch);
80+
81+
ICEBERG_ASSIGN_OR_RAISE(
82+
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
83+
84+
for (int64_t i = 0; i < batch.length; ++i) {
85+
if (i > 0) {
86+
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
87+
}
88+
// Field 0: file_path
89+
ICEBERG_ASSIGN_OR_RAISE(auto path_scalar, row->GetField(0));
90+
auto path = std::get<std::string_view>(path_scalar);
91+
92+
if (path == data_file_path) {
93+
// Field 1: pos
94+
ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar, row->GetField(1));
95+
index.Delete(std::get<int64_t>(pos_scalar));
96+
}
97+
}
98+
}
99+
100+
return reader->Close();
101+
}
102+
103+
Status DeleteLoader::LoadDV(const DataFile& file, PositionDeleteIndex& index) const {
104+
return NotSupported("Loading deletion vectors is not yet supported");
105+
}
106+
107+
Result<PositionDeleteIndex> DeleteLoader::LoadPositionDeletes(
108+
std::span<const std::shared_ptr<DataFile>> delete_files,
109+
std::string_view data_file_path) const {
110+
PositionDeleteIndex index;
111+
112+
for (const auto& file : delete_files) {
113+
if (file->IsDeletionVector()) {
114+
ICEBERG_RETURN_UNEXPECTED(LoadDV(*file, index));
115+
}
116+
117+
ICEBERG_PRECHECK(file->content == DataFile::Content::kPositionDeletes,
118+
"Expected position delete file but got content type {}",
119+
static_cast<int>(file->content));
120+
121+
ICEBERG_RETURN_UNEXPECTED(LoadPositionDelete(*file, index, data_file_path));
122+
}
123+
124+
return index;
125+
}
126+
127+
Result<std::unique_ptr<UncheckedStructLikeSet>> DeleteLoader::LoadEqualityDeletes(
128+
std::span<const std::shared_ptr<DataFile>> delete_files,
129+
const StructType& equality_type) const {
130+
auto eq_set = std::make_unique<UncheckedStructLikeSet>(equality_type);
131+
132+
std::shared_ptr<Schema> projection = equality_type.ToSchema();
133+
134+
for (const auto& file : delete_files) {
135+
ICEBERG_PRECHECK(file->content == DataFile::Content::kEqualityDeletes,
136+
"Expected equality delete file but got content type {}",
137+
static_cast<int>(file->content));
138+
139+
ICEBERG_ASSIGN_OR_RAISE(auto reader, OpenDeleteFile(*file, projection, io_));
140+
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader->Schema());
141+
internal::ArrowSchemaGuard schema_guard(&arrow_schema);
142+
143+
while (true) {
144+
ICEBERG_ASSIGN_OR_RAISE(auto batch_opt, reader->Next());
145+
if (!batch_opt.has_value()) break;
146+
147+
auto& batch = batch_opt.value();
148+
internal::ArrowArrayGuard batch_guard(&batch);
149+
150+
ICEBERG_ASSIGN_OR_RAISE(
151+
auto row, ArrowArrayStructLike::Make(arrow_schema, batch, /*row_index=*/0));
152+
153+
for (int64_t i = 0; i < batch.length; ++i) {
154+
if (i > 0) {
155+
ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
156+
}
157+
ICEBERG_RETURN_UNEXPECTED(eq_set->Insert(*row));
158+
}
159+
}
160+
161+
ICEBERG_RETURN_UNEXPECTED(reader->Close());
162+
}
163+
164+
return eq_set;
165+
}
166+
167+
} // namespace iceberg

src/iceberg/data/delete_loader.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/data/delete_loader.h
23+
/// Loads position and equality delete files into in-memory indexes.
24+
25+
#include <memory>
26+
#include <span>
27+
#include <string_view>
28+
29+
#include "iceberg/iceberg_export.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/type_fwd.h"
32+
#include "iceberg/util/struct_like_set.h"
33+
34+
namespace iceberg {
35+
36+
/// \brief Loads delete files and constructs in-memory delete indexes.
37+
class ICEBERG_EXPORT DeleteLoader {
38+
public:
39+
/// \brief Create a DeleteLoader.
40+
/// \param io FileIO instance for reading delete files
41+
explicit DeleteLoader(std::shared_ptr<FileIO> io);
42+
43+
~DeleteLoader();
44+
45+
/// \brief Load position deletes for a specific data file.
46+
///
47+
/// Reads the given position delete files and returns a PositionDeleteIndex
48+
/// containing only the positions that apply to the specified data file path.
49+
/// Supports both regular position delete files and deletion vectors.
50+
///
51+
/// \param delete_files Position delete files to load (must have
52+
/// content == Content::kPositionDeletes)
53+
/// \param data_file_path Path of the data file to filter positions for
54+
/// \return A PositionDeleteIndex with deleted positions for the data file
55+
Result<PositionDeleteIndex> LoadPositionDeletes(
56+
std::span<const std::shared_ptr<DataFile>> delete_files,
57+
std::string_view data_file_path) const;
58+
59+
/// \brief Load equality deletes into an in-memory set.
60+
///
61+
/// \param delete_files Equality delete files to load (must have
62+
/// content == Content::kEqualityDeletes)
63+
/// \param equality_type The struct type describing the equality columns
64+
/// \return A StructLikeSet containing the deleted rows
65+
Result<std::unique_ptr<UncheckedStructLikeSet>> LoadEqualityDeletes(
66+
std::span<const std::shared_ptr<DataFile>> delete_files,
67+
const StructType& equality_type) const;
68+
69+
private:
70+
/// \brief Load a single position delete file into the index.
71+
Status LoadPositionDelete(const DataFile& file, PositionDeleteIndex& index,
72+
std::string_view data_file_path) const;
73+
74+
/// \brief Load a single deletion vector file into the index.
75+
Status LoadDV(const DataFile& file, PositionDeleteIndex& index) const;
76+
77+
std::shared_ptr<FileIO> io_;
78+
};
79+
80+
} // namespace iceberg

src/iceberg/test/CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,11 @@ if(ICEBERG_BUILD_BUNDLE)
194194
update_sort_order_test.cc
195195
update_statistics_test.cc)
196196

197-
add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
197+
add_iceberg_test(data_test
198+
USE_BUNDLE
199+
SOURCES
200+
data_writer_test.cc
201+
delete_loader_test.cc)
198202

199203
endif()
200204

0 commit comments

Comments
 (0)