Skip to content

Commit 300b721

Browse files
committed
fix: add a as as_span method for DataFileSet
1 parent e94fcc8 commit 300b721

7 files changed

Lines changed: 93 additions & 63 deletions

File tree

src/iceberg/test/data_file_set_test.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ TEST_F(DataFileSetTest, EmptySet) {
4545
EXPECT_TRUE(set.empty());
4646
EXPECT_EQ(set.size(), 0);
4747
EXPECT_EQ(set.begin(), set.end());
48+
EXPECT_TRUE(set.as_span().empty());
4849
}
4950

5051
TEST_F(DataFileSetTest, InsertSingleFile) {
@@ -108,6 +109,24 @@ TEST_F(DataFileSetTest, InsertionOrderPreserved) {
108109
EXPECT_EQ(paths[2], "/path/to/file3.parquet");
109110
}
110111

112+
TEST_F(DataFileSetTest, AsSpan) {
113+
DataFileSet set;
114+
EXPECT_TRUE(set.as_span().empty());
115+
116+
auto file1 = CreateDataFile("/path/to/file1.parquet");
117+
auto file2 = CreateDataFile("/path/to/file2.parquet");
118+
set.insert(file1);
119+
set.insert(file2);
120+
121+
auto span = set.as_span();
122+
EXPECT_EQ(span.size(), 2);
123+
EXPECT_EQ(span[0]->file_path, "/path/to/file1.parquet");
124+
EXPECT_EQ(span[1]->file_path, "/path/to/file2.parquet");
125+
126+
set.clear();
127+
EXPECT_TRUE(set.as_span().empty());
128+
}
129+
111130
TEST_F(DataFileSetTest, InsertDuplicatePreservesOrder) {
112131
DataFileSet set;
113132
auto file1 = CreateDataFile("/path/to/file1.parquet");

src/iceberg/test/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ iceberg_tests = {
8484
'sources': files(
8585
'bucket_util_test.cc',
8686
'config_test.cc',
87+
'data_file_set_test.cc',
8788
'decimal_test.cc',
8889
'endian_test.cc',
8990
'formatter_test.cc',

src/iceberg/update/fast_append.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,8 @@ Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
197197
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
198198
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
199199
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
200-
ICEBERG_ASSIGN_OR_RAISE(
201-
auto written_manifests,
202-
WriteDataManifests(data_files.begin(), data_files.end(), spec));
200+
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests,
201+
WriteDataManifests(data_files.as_span(), spec));
203202
new_manifests_.insert(new_manifests_.end(),
204203
std::make_move_iterator(written_manifests.begin()),
205204
std::make_move_iterator(written_manifests.end()));

src/iceberg/update/snapshot_update.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
#include "iceberg/manifest/manifest_entry.h"
2828
#include "iceberg/manifest/manifest_list.h"
2929
#include "iceberg/manifest/manifest_reader.h"
30+
#include "iceberg/manifest/manifest_writer.h"
31+
#include "iceberg/manifest/rolling_manifest_writer.h"
3032
#include "iceberg/partition_summary_internal.h"
3133
#include "iceberg/util/macros.h"
3234
#include "iceberg/util/snapshot_util_internal.h"
@@ -166,6 +168,60 @@ int64_t SnapshotUpdate::SnapshotId() {
166168
return snapshot_id_.value();
167169
}
168170

171+
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
172+
std::span<const std::shared_ptr<DataFile>> files,
173+
const std::shared_ptr<PartitionSpec>& spec,
174+
std::optional<int64_t> data_sequence_number) {
175+
if (files.empty()) {
176+
return std::vector<ManifestFile>{};
177+
}
178+
179+
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
180+
RollingManifestWriter rolling_writer(
181+
[this, spec, schema = std::move(current_schema),
182+
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
183+
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
184+
ManifestPath(), transaction_->table()->io(),
185+
std::move(spec), std::move(schema),
186+
ManifestContent::kData,
187+
/*first_row_id=*/base().next_row_id);
188+
},
189+
target_manifest_size_bytes_);
190+
191+
for (const auto& file : files) {
192+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, data_sequence_number));
193+
}
194+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
195+
return rolling_writer.ToManifestFiles();
196+
}
197+
198+
Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
199+
std::span<const std::shared_ptr<DataFile>> files,
200+
const std::shared_ptr<PartitionSpec>& spec) {
201+
if (files.empty()) {
202+
return std::vector<ManifestFile>{};
203+
}
204+
205+
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
206+
RollingManifestWriter rolling_writer(
207+
[this, spec, schema = std::move(current_schema),
208+
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
209+
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
210+
ManifestPath(), transaction_->table()->io(),
211+
std::move(spec), std::move(schema),
212+
ManifestContent::kDeletes);
213+
},
214+
target_manifest_size_bytes_);
215+
216+
for (const auto& file : files) {
217+
// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
218+
// file->data_sequence_number
219+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file));
220+
}
221+
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
222+
return rolling_writer.ToManifestFiles();
223+
}
224+
169225
Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply() {
170226
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
171227
ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot,

src/iceberg/update/snapshot_update.h

Lines changed: 8 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222
#include <functional>
2323
#include <memory>
2424
#include <optional>
25+
#include <span>
2526
#include <string>
2627
#include <unordered_map>
2728
#include <unordered_set>
2829
#include <vector>
2930

3031
#include "iceberg/iceberg_export.h"
31-
#include "iceberg/manifest/manifest_list.h"
32-
#include "iceberg/manifest/manifest_writer.h"
33-
#include "iceberg/manifest/rolling_manifest_writer.h"
3432
#include "iceberg/result.h"
3533
#include "iceberg/snapshot.h"
3634
#include "iceberg/table.h"
@@ -107,75 +105,25 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
107105

108106
/// \brief Write data manifests for the given data files
109107
///
110-
/// \tparam Iterator Iterator type that dereferences to std::shared_ptr<DataFile>
111-
/// \param begin Iterator to the beginning of the data files range
112-
/// \param end Iterator to the end of the data files range
108+
/// \param files Data files to write
113109
/// \param spec The partition spec to use
114110
/// \param data_sequence_number Optional data sequence number for the files
115111
/// \return A vector of manifest files
116112
// TODO(xxx): write manifests in parallel
117-
template <typename Iterator>
118113
Result<std::vector<ManifestFile>> WriteDataManifests(
119-
Iterator begin, Iterator end, const std::shared_ptr<PartitionSpec>& spec,
120-
std::optional<int64_t> data_sequence_number = std::nullopt) {
121-
if (begin == end) {
122-
return std::vector<ManifestFile>{};
123-
}
124-
125-
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
126-
RollingManifestWriter rolling_writer(
127-
[this, spec, schema = std::move(current_schema),
128-
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
129-
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
130-
ManifestPath(), transaction_->table()->io(),
131-
std::move(spec), std::move(schema),
132-
ManifestContent::kData,
133-
/*first_row_id=*/base().next_row_id);
134-
},
135-
target_manifest_size_bytes_);
136-
137-
for (auto it = begin; it != end; ++it) {
138-
ICEBERG_RETURN_UNEXPECTED(
139-
rolling_writer.WriteAddedEntry(*it, data_sequence_number));
140-
}
141-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
142-
return rolling_writer.ToManifestFiles();
143-
}
114+
std::span<const std::shared_ptr<DataFile>> files,
115+
const std::shared_ptr<PartitionSpec>& spec,
116+
std::optional<int64_t> data_sequence_number = std::nullopt);
144117

145118
/// \brief Write delete manifests for the given delete files
146119
///
147-
/// \tparam Iterator Iterator type that dereferences to std::shared_ptr<DataFile>
148-
/// \param begin Iterator to the beginning of the delete files range
149-
/// \param end Iterator to the end of the delete files range
120+
/// \param files Delete files to write
150121
/// \param spec The partition spec to use
151122
/// \return A vector of manifest files
152123
// TODO(xxx): write manifests in parallel
153-
template <typename Iterator>
154124
Result<std::vector<ManifestFile>> WriteDeleteManifests(
155-
Iterator begin, Iterator end, const std::shared_ptr<PartitionSpec>& spec) {
156-
if (begin == end) {
157-
return std::vector<ManifestFile>{};
158-
}
159-
160-
ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
161-
RollingManifestWriter rolling_writer(
162-
[this, spec, schema = std::move(current_schema),
163-
snapshot_id = SnapshotId()]() -> Result<std::unique_ptr<ManifestWriter>> {
164-
return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
165-
ManifestPath(), transaction_->table()->io(),
166-
std::move(spec), std::move(schema),
167-
ManifestContent::kDeletes);
168-
},
169-
target_manifest_size_bytes_);
170-
171-
for (auto it = begin; it != end; ++it) {
172-
/// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
173-
/// (*it)->data_sequenece_number
174-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(*it));
175-
}
176-
ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
177-
return rolling_writer.ToManifestFiles();
178-
}
125+
std::span<const std::shared_ptr<DataFile>> files,
126+
const std::shared_ptr<PartitionSpec>& spec);
179127

180128
Status SetTargetBranch(const std::string& branch);
181129
const std::string& target_branch() const { return target_branch_; }

src/iceberg/util/data_file_set.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include <iterator>
2727
#include <memory>
28+
#include <span>
2829
#include <string_view>
2930
#include <unordered_map>
3031
#include <vector>
@@ -79,6 +80,11 @@ class ICEBERG_EXPORT DataFileSet {
7980
const_iterator end() const { return elements_.end(); }
8081
const_iterator cend() const { return elements_.cend(); }
8182

83+
/// \brief Get a non-owning view of the data files in insertion order.
84+
std::span<const value_type> as_span() const {
85+
return std::span<const value_type>(elements_.data(), elements_.size());
86+
}
87+
8288
private:
8389
std::pair<iterator, bool> InsertImpl(value_type file) {
8490
if (!file) {

src/iceberg/util/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ install_headers(
2222
'config.h',
2323
'content_file_util.h',
2424
'conversions.h',
25+
'data_file_set.h',
2526
'decimal.h',
2627
'endian.h',
2728
'error_collector.h',

0 commit comments

Comments
 (0)