Skip to content

Commit 188acdf

Browse files
committed
feat: add FastAppend
1 parent 08e8127 commit 188acdf

6 files changed

Lines changed: 488 additions & 1 deletion

File tree

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
8181
transform.cc
8282
transform_function.cc
8383
type.cc
84+
update/fast_append.cc
8485
update/pending_update.cc
8586
update/snapshot_update.cc
8687
update/update_partition_spec.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ iceberg_sources = files(
102102
'transform.cc',
103103
'transform_function.cc',
104104
'type.cc',
105+
'update/fast_append.cc',
105106
'update/pending_update.cc',
106107
'update/snapshot_update.cc',
107108
'update/update_partition_spec.cc',

src/iceberg/update/fast_append.cc

Lines changed: 327 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,327 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/fast_append.h"
21+
22+
#include <format>
23+
#include <iterator>
24+
#include <ranges>
25+
#include <vector>
26+
27+
#include "iceberg/constants.h"
28+
#include "iceberg/file_io.h"
29+
#include "iceberg/manifest/manifest_entry.h"
30+
#include "iceberg/manifest/manifest_reader.h"
31+
#include "iceberg/manifest/manifest_writer.h"
32+
#include "iceberg/snapshot.h"
33+
#include "iceberg/table.h"
34+
#include "iceberg/table_metadata.h"
35+
#include "iceberg/transaction.h"
36+
#include "iceberg/util/error_collector.h"
37+
#include "iceberg/util/macros.h"
38+
#include "iceberg/util/string_util.h"
39+
40+
namespace iceberg {
41+
42+
Result<std::unique_ptr<FastAppend>> FastAppend::Make(
43+
std::string table_name, std::shared_ptr<Transaction> transaction) {
44+
return std::unique_ptr<FastAppend>(
45+
new FastAppend(std::move(table_name), std::move(transaction)));
46+
}
47+
48+
FastAppend::FastAppend(std::string table_name, std::shared_ptr<Transaction> transaction)
49+
: SnapshotUpdate(std::move(transaction)), table_name_(std::move(table_name)) {}
50+
51+
FastAppend& FastAppend::AppendFile(std::shared_ptr<DataFile> file) {
52+
ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null");
53+
ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(),
54+
"Data file must have partition spec ID");
55+
56+
int32_t spec_id = file->partition_spec_id.value();
57+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, Spec(spec_id));
58+
59+
auto& data_files = new_data_files_by_spec_[spec_id];
60+
auto [iter, inserted] = data_files.insert(file);
61+
if (inserted) {
62+
has_new_files_ = true;
63+
ICEBERG_BUILDER_RETURN_IF_ERROR(AddFileToSummary(spec, file));
64+
}
65+
66+
return *this;
67+
}
68+
69+
FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
70+
if (manifest.existing_files_count.has_value() && manifest.existing_files_count != 0) {
71+
AddError(ErrorKind::kInvalidArgument, "Cannot append manifest with existing files");
72+
return *this;
73+
}
74+
if (manifest.deleted_files_count.has_value() && manifest.deleted_files_count != 0) {
75+
AddError(ErrorKind::kInvalidArgument, "Cannot append manifest with deleted files");
76+
return *this;
77+
}
78+
if (manifest.added_snapshot_id != kInvalidSnapshotId &&
79+
manifest.added_snapshot_id != -1) {
80+
AddError(ErrorKind::kInvalidArgument, "Snapshot id must be assigned during commit");
81+
return *this;
82+
}
83+
if (manifest.sequence_number != -1 &&
84+
manifest.sequence_number != TableMetadata::kInitialSequenceNumber) {
85+
AddError(ErrorKind::kInvalidArgument,
86+
"Sequence number must be assigned during commit");
87+
return *this;
88+
}
89+
90+
if (can_inherit_snapshot_id() && (manifest.added_snapshot_id == kInvalidSnapshotId ||
91+
manifest.added_snapshot_id == -1)) {
92+
ICEBERG_BUILDER_RETURN_IF_ERROR(AddManifestToSummary(manifest));
93+
append_manifests_.push_back(manifest);
94+
} else {
95+
// The manifest must be rewritten with this update's snapshot ID
96+
auto copied_manifest_result = CopyManifest(manifest);
97+
if (!copied_manifest_result.has_value()) {
98+
AddError(copied_manifest_result.error());
99+
return *this;
100+
}
101+
rewritten_append_manifests_.push_back(copied_manifest_result.value());
102+
}
103+
104+
return *this;
105+
}
106+
107+
FastAppend& FastAppend::ToBranch(const std::string& branch) {
108+
ICEBERG_BUILDER_RETURN_IF_ERROR(SetTargetBranch(branch));
109+
return *this;
110+
}
111+
112+
FastAppend& FastAppend::Set(const std::string& property, const std::string& value) {
113+
summary_[property] = value;
114+
return *this;
115+
}
116+
117+
std::string FastAppend::operation() { return DataOperation::kAppend; }
118+
119+
Result<std::vector<ManifestFile>> FastAppend::Apply(
120+
const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) {
121+
std::vector<ManifestFile> manifests;
122+
123+
ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests());
124+
if (!new_written_manifests.empty()) {
125+
manifests.insert(manifests.end(), new_written_manifests.begin(),
126+
new_written_manifests.end());
127+
}
128+
129+
// Transform append manifests and rewritten append manifests with snapshot ID
130+
int64_t snapshot_id = SnapshotId();
131+
for (const auto& manifest : append_manifests_) {
132+
ManifestFile updated = manifest;
133+
updated.added_snapshot_id = snapshot_id;
134+
manifests.push_back(updated);
135+
}
136+
137+
for (const auto& manifest : rewritten_append_manifests_) {
138+
ManifestFile updated = manifest;
139+
updated.added_snapshot_id = snapshot_id;
140+
manifests.push_back(updated);
141+
}
142+
143+
// Add all manifests from the snapshot
144+
if (snapshot != nullptr) {
145+
// Use SnapshotCache to get manifests, similar to snapshot_update.cc
146+
auto cached_snapshot = SnapshotCache(snapshot.get());
147+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests_span,
148+
cached_snapshot.Manifests(transaction_->table()->io()));
149+
std::vector<ManifestFile> snapshot_manifests(snapshot_manifests_span.begin(),
150+
snapshot_manifests_span.end());
151+
manifests.insert(manifests.end(), snapshot_manifests.begin(),
152+
snapshot_manifests.end());
153+
}
154+
155+
return manifests;
156+
}
157+
158+
std::unordered_map<std::string, std::string> FastAppend::Summary() {
159+
// Note: In Java, the summary builder has a method to set partition summary limit.
160+
// The limit affects how many partition summaries are included in manifest metadata.
161+
// Here we just return the summary map. The partition summary limit is typically
162+
// used when building partition summaries, which happens during manifest writing.
163+
return summary_;
164+
}
165+
166+
void FastAppend::CleanUncommitted(const std::unordered_set<std::string>& committed) {
167+
// Clean up new manifests that were written but not committed
168+
if (!new_manifests_.empty()) {
169+
for (const auto& manifest : new_manifests_) {
170+
if (committed.find(manifest.manifest_path) == committed.end()) {
171+
std::ignore = DeleteFile(manifest.manifest_path);
172+
}
173+
}
174+
new_manifests_.clear();
175+
}
176+
177+
// Clean up only rewritten_append_manifests as they are always owned by the table
178+
// Don't clean up append_manifests as they are added to the manifest list and are
179+
// not compacted
180+
if (!rewritten_append_manifests_.empty()) {
181+
for (const auto& manifest : rewritten_append_manifests_) {
182+
if (committed.find(manifest.manifest_path) == committed.end()) {
183+
std::ignore = DeleteFile(manifest.manifest_path);
184+
}
185+
}
186+
}
187+
}
188+
189+
bool FastAppend::CleanupAfterCommit() const {
190+
// Cleanup after committing is disabled for FastAppend unless there are
191+
// rewritten_append_manifests because:
192+
// 1.) Appended manifests are never rewritten
193+
// 2.) Manifests which are written out as part of appendFile are already cleaned
194+
// up between commit attempts in writeNewManifests
195+
return !rewritten_append_manifests_.empty();
196+
}
197+
198+
Result<std::shared_ptr<PartitionSpec>> FastAppend::Spec(int32_t spec_id) {
199+
return base().PartitionSpecById(spec_id);
200+
}
201+
202+
Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {
203+
const TableMetadata& current = base();
204+
ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema());
205+
ICEBERG_ASSIGN_OR_RAISE(auto spec,
206+
current.PartitionSpecById(manifest.partition_spec_id));
207+
208+
// Read the manifest entries
209+
ICEBERG_ASSIGN_OR_RAISE(
210+
auto reader,
211+
ManifestReader::Make(manifest, transaction_->table()->io(), schema, spec));
212+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
213+
214+
// Create a new manifest writer
215+
// Generate a unique manifest path using the transaction's metadata location
216+
std::string filename = std::format("copy-m{}.avro", copy_manifest_count_++);
217+
std::string new_manifest_path = transaction_->MetadataFileLocation(filename);
218+
int64_t snapshot_id = SnapshotId();
219+
ICEBERG_ASSIGN_OR_RAISE(
220+
auto writer, ManifestWriter::MakeWriter(
221+
current.format_version, snapshot_id, new_manifest_path,
222+
transaction_->table()->io(), spec, schema, ManifestContent::kData,
223+
/*first_row_id=*/current.next_row_id));
224+
225+
// Write all entries as added entries with the new snapshot ID
226+
for (auto& entry : entries) {
227+
ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded,
228+
"Manifest to copy must only contain added entries");
229+
entry.snapshot_id = snapshot_id;
230+
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
231+
}
232+
233+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
234+
ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile());
235+
236+
// Update summary with the copied manifest
237+
AddManifestToSummary(new_manifest);
238+
239+
return new_manifest;
240+
}
241+
242+
Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
243+
// If there are new files and manifests were already written, clean them up
244+
if (has_new_files_ && !new_manifests_.empty()) {
245+
for (const auto& manifest : new_manifests_) {
246+
ICEBERG_RETURN_UNEXPECTED(DeleteFile(manifest.manifest_path));
247+
}
248+
new_manifests_.clear();
249+
}
250+
251+
// Write new manifests if there are new data files
252+
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
253+
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
254+
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
255+
std::vector<std::shared_ptr<DataFile>> files;
256+
files.reserve(data_files.size());
257+
std::ranges::copy(data_files, std::back_inserter(files));
258+
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests, WriteDataManifests(files, spec));
259+
new_manifests_.insert(new_manifests_.end(), written_manifests.begin(),
260+
written_manifests.end());
261+
}
262+
has_new_files_ = false;
263+
}
264+
265+
return new_manifests_;
266+
}
267+
268+
Status FastAppend::AddFileToSummary(const std::shared_ptr<PartitionSpec>& spec,
269+
const std::shared_ptr<DataFile>& file) {
270+
// Update added data files count
271+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
272+
int64_t added_data_files = 0;
273+
if (it != summary_.end()) {
274+
ICEBERG_ASSIGN_OR_RAISE(added_data_files, StringUtils::ParseInt<int64_t>(it->second));
275+
}
276+
summary_[SnapshotSummaryFields::kAddedDataFiles] = std::to_string(added_data_files + 1);
277+
278+
// Update added records count
279+
it = summary_.find(SnapshotSummaryFields::kAddedRecords);
280+
int64_t added_records = 0;
281+
if (it != summary_.end()) {
282+
ICEBERG_ASSIGN_OR_RAISE(added_records, StringUtils::ParseInt<int64_t>(it->second));
283+
}
284+
summary_[SnapshotSummaryFields::kAddedRecords] =
285+
std::to_string(added_records + file->record_count);
286+
287+
// Update added file size
288+
it = summary_.find(SnapshotSummaryFields::kAddedFileSize);
289+
int64_t added_file_size = 0;
290+
if (it != summary_.end()) {
291+
ICEBERG_ASSIGN_OR_RAISE(added_file_size, StringUtils::ParseInt<int64_t>(it->second));
292+
}
293+
summary_[SnapshotSummaryFields::kAddedFileSize] =
294+
std::to_string(added_file_size + file->file_size_in_bytes);
295+
296+
// Note: Partition summary is built during manifest writing, not here.
297+
// The partition summary limit is handled in the manifest writer.
298+
return {};
299+
}
300+
301+
Status FastAppend::AddManifestToSummary(const ManifestFile& manifest) {
302+
// Update added data files count from manifest
303+
if (manifest.added_files_count.has_value()) {
304+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
305+
int64_t added_data_files = 0;
306+
if (it != summary_.end()) {
307+
ICEBERG_ASSIGN_OR_RAISE(added_data_files,
308+
StringUtils::ParseInt<int64_t>(it->second));
309+
}
310+
summary_[SnapshotSummaryFields::kAddedDataFiles] =
311+
std::to_string(added_data_files + manifest.added_files_count.value());
312+
}
313+
314+
// Update added records count from manifest
315+
if (manifest.added_rows_count.has_value()) {
316+
auto it = summary_.find(SnapshotSummaryFields::kAddedRecords);
317+
int64_t added_records = 0;
318+
if (it != summary_.end()) {
319+
ICEBERG_ASSIGN_OR_RAISE(added_records, StringUtils::ParseInt<int64_t>(it->second));
320+
}
321+
summary_[SnapshotSummaryFields::kAddedRecords] =
322+
std::to_string(added_records + manifest.added_rows_count.value());
323+
}
324+
return {};
325+
}
326+
327+
} // namespace iceberg

0 commit comments

Comments
 (0)