Skip to content

Commit e9dad1a

Browse files
committed
feat: add FastAppend
1 parent 08e8127 commit e9dad1a

6 files changed

Lines changed: 472 additions & 1 deletion

File tree

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
8181
transform.cc
8282
transform_function.cc
8383
type.cc
84+
update/fast_append.cc
8485
update/pending_update.cc
8586
update/snapshot_update.cc
8687
update/update_partition_spec.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ iceberg_sources = files(
102102
'transform.cc',
103103
'transform_function.cc',
104104
'type.cc',
105+
'update/fast_append.cc',
105106
'update/pending_update.cc',
106107
'update/snapshot_update.cc',
107108
'update/update_partition_spec.cc',

src/iceberg/update/fast_append.cc

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/fast_append.h"
21+
22+
#include <format>
23+
#include <iterator>
24+
#include <ranges>
25+
#include <vector>
26+
27+
#include "iceberg/constants.h"
28+
#include "iceberg/file_io.h"
29+
#include "iceberg/manifest/manifest_entry.h"
30+
#include "iceberg/manifest/manifest_reader.h"
31+
#include "iceberg/manifest/manifest_writer.h"
32+
#include "iceberg/snapshot.h"
33+
#include "iceberg/table.h"
34+
#include "iceberg/table_metadata.h"
35+
#include "iceberg/transaction.h"
36+
#include "iceberg/util/error_collector.h"
37+
#include "iceberg/util/macros.h"
38+
#include "iceberg/util/string_util.h"
39+
40+
namespace iceberg {
41+
42+
Result<std::unique_ptr<FastAppend>> FastAppend::Make(
43+
std::string table_name, std::shared_ptr<Transaction> transaction) {
44+
return std::unique_ptr<FastAppend>(
45+
new FastAppend(std::move(table_name), std::move(transaction)));
46+
}
47+
48+
FastAppend::FastAppend(std::string table_name, std::shared_ptr<Transaction> transaction)
49+
: SnapshotUpdate(std::move(transaction)), table_name_(std::move(table_name)) {}
50+
51+
FastAppend& FastAppend::AppendFile(std::shared_ptr<DataFile> file) {
52+
ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null");
53+
ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(),
54+
"Data file must have partition spec ID");
55+
56+
int32_t spec_id = file->partition_spec_id.value();
57+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, Spec(spec_id));
58+
59+
auto& data_files = new_data_files_by_spec_[spec_id];
60+
auto [iter, inserted] = data_files.insert(file);
61+
if (inserted) {
62+
has_new_files_ = true;
63+
ICEBERG_BUILDER_RETURN_IF_ERROR(AddFileToSummary(spec, file));
64+
}
65+
66+
return *this;
67+
}
68+
69+
FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
70+
ICEBERG_BUILDER_CHECK(!manifest.has_existing_files(),
71+
"Cannot append manifest with existing files");
72+
ICEBERG_BUILDER_CHECK(!manifest.has_deleted_files(),
73+
"Cannot append manifest with deleted files");
74+
ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId,
75+
"Snapshot id must be assigned during commit");
76+
ICEBERG_BUILDER_CHECK(manifest.sequence_number == TableMetadata::kInvalidSequenceNumber,
77+
"Sequence number must be assigned during commit");
78+
79+
if (can_inherit_snapshot_id() && (manifest.added_snapshot_id == kInvalidSnapshotId)) {
80+
ICEBERG_BUILDER_RETURN_IF_ERROR(AddManifestToSummary(manifest));
81+
append_manifests_.push_back(manifest);
82+
} else {
83+
// The manifest must be rewritten with this update's snapshot ID
84+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest, CopyManifest(manifest));
85+
rewritten_append_manifests_.push_back(copied_manifest);
86+
}
87+
88+
return *this;
89+
}
90+
91+
FastAppend& FastAppend::ToBranch(const std::string& branch) {
92+
ICEBERG_BUILDER_RETURN_IF_ERROR(SetTargetBranch(branch));
93+
return *this;
94+
}
95+
96+
FastAppend& FastAppend::Set(const std::string& property, const std::string& value) {
97+
summary_[property] = value;
98+
return *this;
99+
}
100+
101+
std::string FastAppend::operation() { return DataOperation::kAppend; }
102+
103+
Result<std::vector<ManifestFile>> FastAppend::Apply(
104+
const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) {
105+
std::vector<ManifestFile> manifests;
106+
107+
ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests());
108+
if (!new_written_manifests.empty()) {
109+
manifests.insert(manifests.end(), new_written_manifests.begin(),
110+
new_written_manifests.end());
111+
}
112+
113+
// Transform append manifests and rewritten append manifests with snapshot ID
114+
int64_t snapshot_id = SnapshotId();
115+
for (const auto& manifest : append_manifests_) {
116+
ManifestFile updated = manifest;
117+
updated.added_snapshot_id = snapshot_id;
118+
manifests.push_back(updated);
119+
}
120+
121+
for (const auto& manifest : rewritten_append_manifests_) {
122+
ManifestFile updated = manifest;
123+
updated.added_snapshot_id = snapshot_id;
124+
manifests.push_back(updated);
125+
}
126+
127+
// Add all manifests from the snapshot
128+
if (snapshot != nullptr) {
129+
// Use SnapshotCache to get manifests, similar to snapshot_update.cc
130+
auto cached_snapshot = SnapshotCache(snapshot.get());
131+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests_span,
132+
cached_snapshot.Manifests(transaction_->table()->io()));
133+
std::vector<ManifestFile> snapshot_manifests(snapshot_manifests_span.begin(),
134+
snapshot_manifests_span.end());
135+
manifests.insert(manifests.end(), snapshot_manifests.begin(),
136+
snapshot_manifests.end());
137+
}
138+
139+
return manifests;
140+
}
141+
142+
std::unordered_map<std::string, std::string> FastAppend::Summary() {
143+
// Note: In Java, the summary builder has a method to set partition summary limit.
144+
// The limit affects how many partition summaries are included in manifest metadata.
145+
// Here we just return the summary map. The partition summary limit is typically
146+
// used when building partition summaries, which happens during manifest writing.
147+
return summary_;
148+
}
149+
150+
void FastAppend::CleanUncommitted(const std::unordered_set<std::string>& committed) {
151+
// Clean up new manifests that were written but not committed
152+
if (!new_manifests_.empty()) {
153+
for (const auto& manifest : new_manifests_) {
154+
if (committed.find(manifest.manifest_path) == committed.end()) {
155+
std::ignore = DeleteFile(manifest.manifest_path);
156+
}
157+
}
158+
new_manifests_.clear();
159+
}
160+
161+
// Clean up only rewritten_append_manifests as they are always owned by the table
162+
// Don't clean up append_manifests as they are added to the manifest list and are
163+
// not compacted
164+
if (!rewritten_append_manifests_.empty()) {
165+
for (const auto& manifest : rewritten_append_manifests_) {
166+
if (committed.find(manifest.manifest_path) == committed.end()) {
167+
std::ignore = DeleteFile(manifest.manifest_path);
168+
}
169+
}
170+
}
171+
}
172+
173+
bool FastAppend::CleanupAfterCommit() const {
174+
// Cleanup after committing is disabled for FastAppend unless there are
175+
// rewritten_append_manifests because:
176+
// 1.) Appended manifests are never rewritten
177+
// 2.) Manifests which are written out as part of appendFile are already cleaned
178+
// up between commit attempts in writeNewManifests
179+
return !rewritten_append_manifests_.empty();
180+
}
181+
182+
Result<std::shared_ptr<PartitionSpec>> FastAppend::Spec(int32_t spec_id) {
183+
return base().PartitionSpecById(spec_id);
184+
}
185+
186+
Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {
187+
const TableMetadata& current = base();
188+
ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema());
189+
ICEBERG_ASSIGN_OR_RAISE(auto spec,
190+
current.PartitionSpecById(manifest.partition_spec_id));
191+
192+
// Read the manifest entries
193+
ICEBERG_ASSIGN_OR_RAISE(
194+
auto reader,
195+
ManifestReader::Make(manifest, transaction_->table()->io(), schema, spec));
196+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
197+
198+
// Create a new manifest writer
199+
// Generate a unique manifest path using the transaction's metadata location
200+
std::string filename = std::format("copy-m{}.avro", copy_manifest_count_++);
201+
std::string new_manifest_path = transaction_->MetadataFileLocation(filename);
202+
int64_t snapshot_id = SnapshotId();
203+
ICEBERG_ASSIGN_OR_RAISE(
204+
auto writer, ManifestWriter::MakeWriter(
205+
current.format_version, snapshot_id, new_manifest_path,
206+
transaction_->table()->io(), spec, schema, ManifestContent::kData,
207+
/*first_row_id=*/current.next_row_id));
208+
209+
// Write all entries as added entries with the new snapshot ID
210+
for (auto& entry : entries) {
211+
ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded,
212+
"Manifest to copy must only contain added entries");
213+
entry.snapshot_id = snapshot_id;
214+
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
215+
}
216+
217+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
218+
ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile());
219+
220+
// Update summary with the copied manifest
221+
ICEBERG_RETURN_UNEXPECTED(AddManifestToSummary(new_manifest));
222+
223+
return new_manifest;
224+
}
225+
226+
Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
227+
// If there are new files and manifests were already written, clean them up
228+
if (has_new_files_ && !new_manifests_.empty()) {
229+
for (const auto& manifest : new_manifests_) {
230+
ICEBERG_RETURN_UNEXPECTED(DeleteFile(manifest.manifest_path));
231+
}
232+
new_manifests_.clear();
233+
}
234+
235+
// Write new manifests if there are new data files
236+
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
237+
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
238+
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
239+
std::vector<std::shared_ptr<DataFile>> files;
240+
files.reserve(data_files.size());
241+
std::ranges::copy(data_files, std::back_inserter(files));
242+
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec));
243+
new_manifests_.insert(new_manifests_.end(), written_manifests.begin(),
244+
written_manifests.end());
245+
}
246+
has_new_files_ = false;
247+
}
248+
249+
return new_manifests_;
250+
}
251+
252+
Status FastAppend::AddFileToSummary(const std::shared_ptr<PartitionSpec>& spec,
253+
const std::shared_ptr<DataFile>& file) {
254+
// Update added data files count
255+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
256+
int64_t added_data_files = 0;
257+
if (it != summary_.end()) {
258+
ICEBERG_ASSIGN_OR_RAISE(added_data_files, StringUtils::ParseInt<int64_t>(it->second));
259+
}
260+
summary_[SnapshotSummaryFields::kAddedDataFiles] = std::to_string(added_data_files + 1);
261+
262+
// Update added records count
263+
it = summary_.find(SnapshotSummaryFields::kAddedRecords);
264+
int64_t added_records = 0;
265+
if (it != summary_.end()) {
266+
ICEBERG_ASSIGN_OR_RAISE(added_records, StringUtils::ParseInt<int64_t>(it->second));
267+
}
268+
summary_[SnapshotSummaryFields::kAddedRecords] =
269+
std::to_string(added_records + file->record_count);
270+
271+
// Update added file size
272+
it = summary_.find(SnapshotSummaryFields::kAddedFileSize);
273+
int64_t added_file_size = 0;
274+
if (it != summary_.end()) {
275+
ICEBERG_ASSIGN_OR_RAISE(added_file_size, StringUtils::ParseInt<int64_t>(it->second));
276+
}
277+
summary_[SnapshotSummaryFields::kAddedFileSize] =
278+
std::to_string(added_file_size + file->file_size_in_bytes);
279+
280+
// Note: Partition summary is built during manifest writing, not here.
281+
// The partition summary limit is handled in the manifest writer.
282+
return {};
283+
}
284+
285+
Status FastAppend::AddManifestToSummary(const ManifestFile& manifest) {
286+
// Update added data files count from manifest
287+
if (manifest.added_files_count.has_value()) {
288+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
289+
int64_t added_data_files = 0;
290+
if (it != summary_.end()) {
291+
ICEBERG_ASSIGN_OR_RAISE(added_data_files,
292+
StringUtils::ParseInt<int64_t>(it->second));
293+
}
294+
summary_[SnapshotSummaryFields::kAddedDataFiles] =
295+
std::to_string(added_data_files + manifest.added_files_count.value());
296+
}
297+
298+
// Update added records count from manifest
299+
if (manifest.added_rows_count.has_value()) {
300+
auto it = summary_.find(SnapshotSummaryFields::kAddedRecords);
301+
int64_t added_records = 0;
302+
if (it != summary_.end()) {
303+
ICEBERG_ASSIGN_OR_RAISE(added_records, StringUtils::ParseInt<int64_t>(it->second));
304+
}
305+
summary_[SnapshotSummaryFields::kAddedRecords] =
306+
std::to_string(added_records + manifest.added_rows_count.value());
307+
}
308+
return {};
309+
}
310+
311+
} // namespace iceberg

0 commit comments

Comments
 (0)