Skip to content

Commit 75204b1

Browse files
authored
chore: refactor DataFileSet and make WriteManifests to accept span (#519)
- Move DataFileSet out of content_file_util.h to data_file_set.h to reduce header dependencies - Refactor WriteDataManifests and WriteDeleteManifests to accept span instead of vector
1 parent 3994b5d commit 75204b1

File tree

10 files changed

+411
-90
lines changed

10 files changed

+411
-90
lines changed

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ add_iceberg_test(util_test
109109
SOURCES
110110
bucket_util_test.cc
111111
config_test.cc
112+
data_file_set_test.cc
112113
decimal_test.cc
113114
endian_test.cc
114115
formatter_test.cc
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/data_file_set.h"
21+
22+
#include <gtest/gtest.h>
23+
24+
#include "iceberg/file_format.h"
25+
#include "iceberg/manifest/manifest_entry.h"
26+
#include "iceberg/row/partition_values.h"
27+
28+
namespace iceberg {
29+
30+
class DataFileSetTest : public ::testing::Test {
31+
protected:
32+
std::shared_ptr<DataFile> CreateDataFile(const std::string& path, int64_t size = 100) {
33+
auto file = std::make_shared<DataFile>();
34+
file->file_path = path;
35+
file->file_format = FileFormatType::kParquet;
36+
file->file_size_in_bytes = size;
37+
file->record_count = 10;
38+
file->content = DataFile::Content::kData;
39+
return file;
40+
}
41+
};
42+
43+
TEST_F(DataFileSetTest, EmptySet) {
44+
DataFileSet set;
45+
EXPECT_TRUE(set.empty());
46+
EXPECT_EQ(set.size(), 0);
47+
EXPECT_EQ(set.begin(), set.end());
48+
EXPECT_TRUE(set.as_span().empty());
49+
}
50+
51+
TEST_F(DataFileSetTest, InsertSingleFile) {
52+
DataFileSet set;
53+
auto file = CreateDataFile("/path/to/file.parquet");
54+
55+
auto [iter, inserted] = set.insert(file);
56+
EXPECT_TRUE(inserted);
57+
EXPECT_EQ(*iter, file);
58+
EXPECT_FALSE(set.empty());
59+
EXPECT_EQ(set.size(), 1);
60+
}
61+
62+
TEST_F(DataFileSetTest, InsertDuplicateFile) {
63+
DataFileSet set;
64+
auto file1 = CreateDataFile("/path/to/file.parquet");
65+
auto file2 = CreateDataFile("/path/to/file.parquet"); // Same path
66+
67+
auto [iter1, inserted1] = set.insert(file1);
68+
EXPECT_TRUE(inserted1);
69+
70+
auto [iter2, inserted2] = set.insert(file2);
71+
EXPECT_FALSE(inserted2);
72+
EXPECT_EQ(iter1, iter2); // Should point to the same element
73+
EXPECT_EQ(set.size(), 1); // Should still be size 1
74+
}
75+
76+
TEST_F(DataFileSetTest, InsertDifferentFiles) {
77+
DataFileSet set;
78+
auto file1 = CreateDataFile("/path/to/file1.parquet");
79+
auto file2 = CreateDataFile("/path/to/file2.parquet");
80+
auto file3 = CreateDataFile("/path/to/file3.parquet");
81+
82+
set.insert(file1);
83+
set.insert(file2);
84+
set.insert(file3);
85+
86+
EXPECT_EQ(set.size(), 3);
87+
EXPECT_FALSE(set.empty());
88+
}
89+
90+
TEST_F(DataFileSetTest, InsertionOrderPreserved) {
91+
DataFileSet set;
92+
auto file1 = CreateDataFile("/path/to/file1.parquet");
93+
auto file2 = CreateDataFile("/path/to/file2.parquet");
94+
auto file3 = CreateDataFile("/path/to/file3.parquet");
95+
96+
set.insert(file1);
97+
set.insert(file2);
98+
set.insert(file3);
99+
100+
// Iterate and verify order
101+
std::vector<std::string> paths;
102+
for (const auto& file : set) {
103+
paths.push_back(file->file_path);
104+
}
105+
106+
EXPECT_EQ(paths.size(), 3);
107+
EXPECT_EQ(paths[0], "/path/to/file1.parquet");
108+
EXPECT_EQ(paths[1], "/path/to/file2.parquet");
109+
EXPECT_EQ(paths[2], "/path/to/file3.parquet");
110+
}
111+
112+
TEST_F(DataFileSetTest, AsSpan) {
113+
DataFileSet set;
114+
EXPECT_TRUE(set.as_span().empty());
115+
116+
// Single element
117+
auto file0 = CreateDataFile("/path/to/file0.parquet");
118+
set.insert(file0);
119+
{
120+
auto span = set.as_span();
121+
EXPECT_EQ(span.size(), 1);
122+
EXPECT_EQ(span[0]->file_path, "/path/to/file0.parquet");
123+
EXPECT_EQ(span[0], file0); // Same pointer, span is a view
124+
}
125+
126+
// Multiple elements
127+
auto file1 = CreateDataFile("/path/to/file1.parquet");
128+
auto file2 = CreateDataFile("/path/to/file2.parquet");
129+
set.insert(file1);
130+
set.insert(file2);
131+
132+
auto span = set.as_span();
133+
EXPECT_EQ(span.size(), 3);
134+
EXPECT_EQ(span[0]->file_path, "/path/to/file0.parquet");
135+
EXPECT_EQ(span[1]->file_path, "/path/to/file1.parquet");
136+
EXPECT_EQ(span[2]->file_path, "/path/to/file2.parquet");
137+
138+
// Span matches set iteration order and identity
139+
size_t i = 0;
140+
for (const auto& file : set) {
141+
EXPECT_EQ(span[i], file) << "Span element " << i << " should match set iterator";
142+
++i;
143+
}
144+
EXPECT_EQ(i, span.size());
145+
146+
// Span works with range-for
147+
i = 0;
148+
for (const auto& file : span) {
149+
EXPECT_EQ(file->file_path, span[i]->file_path);
150+
++i;
151+
}
152+
EXPECT_EQ(i, 3);
153+
154+
set.clear();
155+
EXPECT_TRUE(set.as_span().empty());
156+
}
157+
158+
TEST_F(DataFileSetTest, InsertDuplicatePreservesOrder) {
159+
DataFileSet set;
160+
auto file1 = CreateDataFile("/path/to/file1.parquet");
161+
auto file2 = CreateDataFile("/path/to/file2.parquet");
162+
auto file3 = CreateDataFile("/path/to/file1.parquet"); // Duplicate of file1
163+
164+
set.insert(file1);
165+
set.insert(file2);
166+
set.insert(file3); // Should not insert, but order should be preserved
167+
168+
EXPECT_EQ(set.size(), 2);
169+
170+
std::vector<std::string> paths;
171+
for (const auto& file : set) {
172+
paths.push_back(file->file_path);
173+
}
174+
175+
EXPECT_EQ(paths[0], "/path/to/file1.parquet");
176+
EXPECT_EQ(paths[1], "/path/to/file2.parquet");
177+
}
178+
179+
TEST_F(DataFileSetTest, InsertNullFile) {
180+
DataFileSet set;
181+
std::shared_ptr<DataFile> null_file = nullptr;
182+
183+
auto [iter, inserted] = set.insert(null_file);
184+
EXPECT_FALSE(inserted);
185+
EXPECT_EQ(iter, set.end());
186+
EXPECT_TRUE(set.empty());
187+
EXPECT_EQ(set.size(), 0);
188+
}
189+
190+
TEST_F(DataFileSetTest, InsertMoveSemantics) {
191+
DataFileSet set;
192+
auto file1 = CreateDataFile("/path/to/file1.parquet");
193+
auto file2 = CreateDataFile("/path/to/file2.parquet");
194+
195+
// Insert using move
196+
auto [iter1, inserted1] = set.insert(std::move(file1));
197+
EXPECT_TRUE(inserted1);
198+
EXPECT_EQ(file1, nullptr); // Should be moved
199+
200+
// Insert using copy
201+
auto [iter2, inserted2] = set.insert(file2);
202+
EXPECT_TRUE(inserted2);
203+
EXPECT_NE(file2, nullptr); // Should still be valid
204+
205+
EXPECT_EQ(set.size(), 2);
206+
}
207+
208+
TEST_F(DataFileSetTest, Clear) {
209+
DataFileSet set;
210+
set.insert(CreateDataFile("/path/to/file1.parquet"));
211+
set.insert(CreateDataFile("/path/to/file2.parquet"));
212+
213+
EXPECT_EQ(set.size(), 2);
214+
set.clear();
215+
EXPECT_TRUE(set.empty());
216+
EXPECT_EQ(set.size(), 0);
217+
EXPECT_EQ(set.begin(), set.end());
218+
}
219+
220+
TEST_F(DataFileSetTest, IteratorOperations) {
221+
DataFileSet set;
222+
auto file1 = CreateDataFile("/path/to/file1.parquet");
223+
auto file2 = CreateDataFile("/path/to/file2.parquet");
224+
auto file3 = CreateDataFile("/path/to/file3.parquet");
225+
226+
set.insert(file1);
227+
set.insert(file2);
228+
set.insert(file3);
229+
230+
// Test const iterators
231+
const auto& const_set = set;
232+
EXPECT_NE(const_set.begin(), const_set.end());
233+
EXPECT_NE(const_set.cbegin(), const_set.cend());
234+
235+
// Test iterator increment
236+
auto it = set.begin();
237+
EXPECT_EQ((*it)->file_path, "/path/to/file1.parquet");
238+
++it;
239+
EXPECT_EQ((*it)->file_path, "/path/to/file2.parquet");
240+
++it;
241+
EXPECT_EQ((*it)->file_path, "/path/to/file3.parquet");
242+
++it;
243+
EXPECT_EQ(it, set.end());
244+
}
245+
246+
TEST_F(DataFileSetTest, RangeBasedForLoop) {
247+
DataFileSet set;
248+
set.insert(CreateDataFile("/path/to/file1.parquet"));
249+
set.insert(CreateDataFile("/path/to/file2.parquet"));
250+
set.insert(CreateDataFile("/path/to/file3.parquet"));
251+
252+
int count = 0;
253+
for (const auto& file : set) {
254+
EXPECT_NE(file, nullptr);
255+
++count;
256+
}
257+
EXPECT_EQ(count, 3);
258+
}
259+
260+
TEST_F(DataFileSetTest, CaseSensitivePaths) {
261+
DataFileSet set;
262+
auto file1 = CreateDataFile("/path/to/file.parquet");
263+
auto file2 = CreateDataFile("/path/to/FILE.parquet"); // Different case
264+
265+
set.insert(file1);
266+
set.insert(file2);
267+
268+
// Should be treated as different files
269+
EXPECT_EQ(set.size(), 2);
270+
}
271+
272+
TEST_F(DataFileSetTest, MultipleInsertsSameFile) {
273+
DataFileSet set;
274+
auto file = CreateDataFile("/path/to/file.parquet");
275+
276+
// Insert the same file multiple times
277+
set.insert(file);
278+
set.insert(file);
279+
set.insert(file);
280+
281+
EXPECT_EQ(set.size(), 1);
282+
}
283+
284+
} // namespace iceberg

src/iceberg/test/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ iceberg_tests = {
8484
'sources': files(
8585
'bucket_util_test.cc',
8686
'config_test.cc',
87+
'data_file_set_test.cc',
8788
'decimal_test.cc',
8889
'endian_test.cc',
8990
'formatter_test.cc',

src/iceberg/update/fast_append.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "iceberg/update/fast_append.h"
2121

2222
#include <iterator>
23-
#include <ranges>
2423
#include <vector>
2524

2625
#include "iceberg/constants.h"
@@ -198,10 +197,8 @@ Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
198197
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
199198
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
200199
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
201-
std::vector<std::shared_ptr<DataFile>> files;
202-
files.reserve(data_files.size());
203-
std::ranges::copy(data_files, std::back_inserter(files));
204-
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec));
200+
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests,
201+
WriteDataManifests(data_files.as_span(), spec));
205202
new_manifests_.insert(new_manifests_.end(),
206203
std::make_move_iterator(written_manifests.begin()),
207204
std::make_move_iterator(written_manifests.end()));

src/iceberg/update/fast_append.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
#include "iceberg/result.h"
3131
#include "iceberg/type_fwd.h"
3232
#include "iceberg/update/snapshot_update.h"
33-
#include "iceberg/util/content_file_util.h"
33+
#include "iceberg/util/data_file_set.h"
3434

3535
namespace iceberg {
3636

src/iceberg/update/snapshot_update.cc

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
#include "iceberg/manifest/manifest_writer.h"
3131
#include "iceberg/manifest/rolling_manifest_writer.h"
3232
#include "iceberg/partition_summary_internal.h"
33-
#include "iceberg/snapshot.h"
3433
#include "iceberg/table.h"
3534
#include "iceberg/transaction.h"
3635
#include "iceberg/util/macros.h"
@@ -166,10 +165,10 @@ SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
166165

167166
// TODO(xxx): write manifests in parallel
168167
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
169-
const std::vector<std::shared_ptr<DataFile>>& data_files,
168+
std::span<const std::shared_ptr<DataFile>> files,
170169
const std::shared_ptr<PartitionSpec>& spec,
171170
std::optional<int64_t> data_sequence_number) {
172-
if (data_files.empty()) {
171+
if (files.empty()) {
173172
return std::vector<ManifestFile>{};
174173
}
175174

@@ -185,7 +184,7 @@ Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
185184
},
186185
target_manifest_size_bytes_);
187186

188-
for (const auto& file : data_files) {
187+
for (const auto& file : files) {
189188
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number));
190189
}
191190
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
@@ -194,9 +193,9 @@ Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
194193

195194
// TODO(xxx): write manifests in parallel
196195
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
197-
const std::vector<std::shared_ptr<DataFile>>& delete_files,
196+
std::span<const std::shared_ptr<DataFile>> files,
198197
const std::shared_ptr<PartitionSpec>& spec) {
199-
if (delete_files.empty()) {
198+
if (files.empty()) {
200199
return std::vector<ManifestFile>{};
201200
}
202201

@@ -211,9 +210,9 @@ Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
211210
},
212211
target_manifest_size_bytes_);
213212

214-
for (const auto& file : delete_files) {
215-
/// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
216-
/// file->data_sequenece_number
213+
for (const auto& file : files) {
214+
// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
215+
// file->data_sequence_number
217216
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file));
218217
}
219218
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());

0 commit comments

Comments
 (0)