Skip to content

Commit e94fcc8

Browse files
committed
chore: refactor DataFileSet and make WriteManifests to use iterators
- Move DataFileSet out of content_file_util.h to data_file_set.h to reduce header dependencies - Refactor WriteDataManifests and WriteDeleteManifests to accept iterator begin/end instead of vectors
1 parent a457099 commit e94fcc8

8 files changed

Lines changed: 409 additions & 144 deletions

File tree

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ add_iceberg_test(util_test
109109
SOURCES
110110
bucket_util_test.cc
111111
config_test.cc
112+
data_file_set_test.cc
112113
decimal_test.cc
113114
endian_test.cc
114115
formatter_test.cc
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/data_file_set.h"
21+
22+
#include <gtest/gtest.h>
23+
24+
#include "iceberg/file_format.h"
25+
#include "iceberg/manifest/manifest_entry.h"
26+
#include "iceberg/row/partition_values.h"
27+
28+
namespace iceberg {
29+
30+
class DataFileSetTest : public ::testing::Test {
31+
protected:
32+
std::shared_ptr<DataFile> CreateDataFile(const std::string& path, int64_t size = 100) {
33+
auto file = std::make_shared<DataFile>();
34+
file->file_path = path;
35+
file->file_format = FileFormatType::kParquet;
36+
file->file_size_in_bytes = size;
37+
file->record_count = 10;
38+
file->content = DataFile::Content::kData;
39+
return file;
40+
}
41+
};
42+
43+
TEST_F(DataFileSetTest, EmptySet) {
44+
DataFileSet set;
45+
EXPECT_TRUE(set.empty());
46+
EXPECT_EQ(set.size(), 0);
47+
EXPECT_EQ(set.begin(), set.end());
48+
}
49+
50+
TEST_F(DataFileSetTest, InsertSingleFile) {
51+
DataFileSet set;
52+
auto file = CreateDataFile("/path/to/file.parquet");
53+
54+
auto [iter, inserted] = set.insert(file);
55+
EXPECT_TRUE(inserted);
56+
EXPECT_EQ(*iter, file);
57+
EXPECT_FALSE(set.empty());
58+
EXPECT_EQ(set.size(), 1);
59+
}
60+
61+
TEST_F(DataFileSetTest, InsertDuplicateFile) {
62+
DataFileSet set;
63+
auto file1 = CreateDataFile("/path/to/file.parquet");
64+
auto file2 = CreateDataFile("/path/to/file.parquet"); // Same path
65+
66+
auto [iter1, inserted1] = set.insert(file1);
67+
EXPECT_TRUE(inserted1);
68+
69+
auto [iter2, inserted2] = set.insert(file2);
70+
EXPECT_FALSE(inserted2);
71+
EXPECT_EQ(iter1, iter2); // Should point to the same element
72+
EXPECT_EQ(set.size(), 1); // Should still be size 1
73+
}
74+
75+
TEST_F(DataFileSetTest, InsertDifferentFiles) {
76+
DataFileSet set;
77+
auto file1 = CreateDataFile("/path/to/file1.parquet");
78+
auto file2 = CreateDataFile("/path/to/file2.parquet");
79+
auto file3 = CreateDataFile("/path/to/file3.parquet");
80+
81+
set.insert(file1);
82+
set.insert(file2);
83+
set.insert(file3);
84+
85+
EXPECT_EQ(set.size(), 3);
86+
EXPECT_FALSE(set.empty());
87+
}
88+
89+
TEST_F(DataFileSetTest, InsertionOrderPreserved) {
90+
DataFileSet set;
91+
auto file1 = CreateDataFile("/path/to/file1.parquet");
92+
auto file2 = CreateDataFile("/path/to/file2.parquet");
93+
auto file3 = CreateDataFile("/path/to/file3.parquet");
94+
95+
set.insert(file1);
96+
set.insert(file2);
97+
set.insert(file3);
98+
99+
// Iterate and verify order
100+
std::vector<std::string> paths;
101+
for (const auto& file : set) {
102+
paths.push_back(file->file_path);
103+
}
104+
105+
EXPECT_EQ(paths.size(), 3);
106+
EXPECT_EQ(paths[0], "/path/to/file1.parquet");
107+
EXPECT_EQ(paths[1], "/path/to/file2.parquet");
108+
EXPECT_EQ(paths[2], "/path/to/file3.parquet");
109+
}
110+
111+
TEST_F(DataFileSetTest, InsertDuplicatePreservesOrder) {
112+
DataFileSet set;
113+
auto file1 = CreateDataFile("/path/to/file1.parquet");
114+
auto file2 = CreateDataFile("/path/to/file2.parquet");
115+
auto file3 = CreateDataFile("/path/to/file1.parquet"); // Duplicate of file1
116+
117+
set.insert(file1);
118+
set.insert(file2);
119+
set.insert(file3); // Should not insert, but order should be preserved
120+
121+
EXPECT_EQ(set.size(), 2);
122+
123+
std::vector<std::string> paths;
124+
for (const auto& file : set) {
125+
paths.push_back(file->file_path);
126+
}
127+
128+
EXPECT_EQ(paths[0], "/path/to/file1.parquet");
129+
EXPECT_EQ(paths[1], "/path/to/file2.parquet");
130+
}
131+
132+
TEST_F(DataFileSetTest, InsertNullFile) {
133+
DataFileSet set;
134+
std::shared_ptr<DataFile> null_file = nullptr;
135+
136+
auto [iter, inserted] = set.insert(null_file);
137+
EXPECT_FALSE(inserted);
138+
EXPECT_EQ(iter, set.end());
139+
EXPECT_TRUE(set.empty());
140+
EXPECT_EQ(set.size(), 0);
141+
}
142+
143+
TEST_F(DataFileSetTest, InsertMoveSemantics) {
144+
DataFileSet set;
145+
auto file1 = CreateDataFile("/path/to/file1.parquet");
146+
auto file2 = CreateDataFile("/path/to/file2.parquet");
147+
148+
// Insert using move
149+
auto [iter1, inserted1] = set.insert(std::move(file1));
150+
EXPECT_TRUE(inserted1);
151+
EXPECT_EQ(file1, nullptr); // Should be moved
152+
153+
// Insert using copy
154+
auto [iter2, inserted2] = set.insert(file2);
155+
EXPECT_TRUE(inserted2);
156+
EXPECT_NE(file2, nullptr); // Should still be valid
157+
158+
EXPECT_EQ(set.size(), 2);
159+
}
160+
161+
TEST_F(DataFileSetTest, Clear) {
162+
DataFileSet set;
163+
set.insert(CreateDataFile("/path/to/file1.parquet"));
164+
set.insert(CreateDataFile("/path/to/file2.parquet"));
165+
166+
EXPECT_EQ(set.size(), 2);
167+
set.clear();
168+
EXPECT_TRUE(set.empty());
169+
EXPECT_EQ(set.size(), 0);
170+
EXPECT_EQ(set.begin(), set.end());
171+
}
172+
173+
TEST_F(DataFileSetTest, IteratorOperations) {
174+
DataFileSet set;
175+
auto file1 = CreateDataFile("/path/to/file1.parquet");
176+
auto file2 = CreateDataFile("/path/to/file2.parquet");
177+
auto file3 = CreateDataFile("/path/to/file3.parquet");
178+
179+
set.insert(file1);
180+
set.insert(file2);
181+
set.insert(file3);
182+
183+
// Test const iterators
184+
const auto& const_set = set;
185+
EXPECT_NE(const_set.begin(), const_set.end());
186+
EXPECT_NE(const_set.cbegin(), const_set.cend());
187+
188+
// Test iterator increment
189+
auto it = set.begin();
190+
EXPECT_EQ((*it)->file_path, "/path/to/file1.parquet");
191+
++it;
192+
EXPECT_EQ((*it)->file_path, "/path/to/file2.parquet");
193+
++it;
194+
EXPECT_EQ((*it)->file_path, "/path/to/file3.parquet");
195+
++it;
196+
EXPECT_EQ(it, set.end());
197+
}
198+
199+
TEST_F(DataFileSetTest, RangeBasedForLoop) {
200+
DataFileSet set;
201+
set.insert(CreateDataFile("/path/to/file1.parquet"));
202+
set.insert(CreateDataFile("/path/to/file2.parquet"));
203+
set.insert(CreateDataFile("/path/to/file3.parquet"));
204+
205+
int count = 0;
206+
for (const auto& file : set) {
207+
EXPECT_NE(file, nullptr);
208+
++count;
209+
}
210+
EXPECT_EQ(count, 3);
211+
}
212+
213+
TEST_F(DataFileSetTest, CaseSensitivePaths) {
214+
DataFileSet set;
215+
auto file1 = CreateDataFile("/path/to/file.parquet");
216+
auto file2 = CreateDataFile("/path/to/FILE.parquet"); // Different case
217+
218+
set.insert(file1);
219+
set.insert(file2);
220+
221+
// Should be treated as different files
222+
EXPECT_EQ(set.size(), 2);
223+
}
224+
225+
TEST_F(DataFileSetTest, MultipleInsertsSameFile) {
226+
DataFileSet set;
227+
auto file = CreateDataFile("/path/to/file.parquet");
228+
229+
// Insert the same file multiple times
230+
set.insert(file);
231+
set.insert(file);
232+
set.insert(file);
233+
234+
EXPECT_EQ(set.size(), 1);
235+
}
236+
237+
} // namespace iceberg

src/iceberg/update/fast_append.cc

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "iceberg/update/fast_append.h"
2121

2222
#include <iterator>
23-
#include <ranges>
2423
#include <vector>
2524

2625
#include "iceberg/constants.h"
@@ -198,10 +197,9 @@ Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
198197
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
199198
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
200199
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
201-
std::vector<std::shared_ptr<DataFile>> files;
202-
files.reserve(data_files.size());
203-
std::ranges::copy(data_files, std::back_inserter(files));
204-
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec));
200+
ICEBERG_ASSIGN_OR_RAISE(
201+
auto written_manifests,
202+
WriteDataManifests(data_files.begin(), data_files.end(), spec));
205203
new_manifests_.insert(new_manifests_.end(),
206204
std::make_move_iterator(written_manifests.begin()),
207205
std::make_move_iterator(written_manifests.end()));

src/iceberg/update/fast_append.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
#include "iceberg/result.h"
3131
#include "iceberg/type_fwd.h"
3232
#include "iceberg/update/snapshot_update.h"
33-
#include "iceberg/util/content_file_util.h"
33+
#include "iceberg/util/data_file_set.h"
3434

3535
namespace iceberg {
3636

src/iceberg/update/snapshot_update.cc

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,7 @@
2727
#include "iceberg/manifest/manifest_entry.h"
2828
#include "iceberg/manifest/manifest_list.h"
2929
#include "iceberg/manifest/manifest_reader.h"
30-
#include "iceberg/manifest/manifest_writer.h"
31-
#include "iceberg/manifest/rolling_manifest_writer.h"
3230
#include "iceberg/partition_summary_internal.h"
33-
#include "iceberg/snapshot.h"
34-
#include "iceberg/table.h"
35-
#include "iceberg/transaction.h"
3631
#include "iceberg/util/macros.h"
3732
#include "iceberg/util/snapshot_util_internal.h"
3833
#include "iceberg/util/string_util.h"
@@ -164,62 +159,6 @@ SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
164159
target_manifest_size_bytes_(
165160
base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {}
166161

167-
// TODO(xxx): write manifests in parallel
168-
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
169-
const std::vector<std::shared_ptr<DataFile>>& data_files,
170-
const std::shared_ptr<PartitionSpec>& spec,
171-
std::optional<int64_t> data_sequence_number) {
172-
if (data_files.empty()) {
173-
return std::vector<ManifestFile>{};
174-
}
175-
176-
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
177-
RollingManifestWriter rolling_writer(
178-
[this, spec, schema = std::move(current_schema),
179-
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
180-
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
181-
ManifestPath(), transaction_->table()->io(),
182-
std::move(spec), std::move(schema),
183-
ManifestContent::kData,
184-
/*first_row_id=*/base().next_row_id);
185-
},
186-
target_manifest_size_bytes_);
187-
188-
for (const auto& file : data_files) {
189-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number));
190-
}
191-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
192-
return rolling_writer.ToManifestFiles();
193-
}
194-
195-
// TODO(xxx): write manifests in parallel
196-
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
197-
const std::vector<std::shared_ptr<DataFile>>& delete_files,
198-
const std::shared_ptr<PartitionSpec>& spec) {
199-
if (delete_files.empty()) {
200-
return std::vector<ManifestFile>{};
201-
}
202-
203-
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
204-
RollingManifestWriter rolling_writer(
205-
[this, spec, schema = std::move(current_schema),
206-
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
207-
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
208-
ManifestPath(), transaction_->table()->io(),
209-
std::move(spec), std::move(schema),
210-
ManifestContent::kDeletes);
211-
},
212-
target_manifest_size_bytes_);
213-
214-
for (const auto& file : delete_files) {
215-
/// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
216-
/// file->data_sequenece_number
217-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file));
218-
}
219-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
220-
return rolling_writer.ToManifestFiles();
221-
}
222-
223162
int64_t SnapshotUpdate::SnapshotId() {
224163
if (!snapshot_id_.has_value()) {
225164
snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base());

0 commit comments

Comments
 (0)