Skip to content

Commit b521855

Browse files
committed
feat: add FastAppend
1 parent 08e8127 commit b521855

5 files changed

Lines changed: 450 additions & 0 deletions

File tree

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
8181
transform.cc
8282
transform_function.cc
8383
type.cc
84+
update/fast_append.cc
8485
update/pending_update.cc
8586
update/snapshot_update.cc
8687
update/update_partition_spec.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ iceberg_sources = files(
102102
'transform.cc',
103103
'transform_function.cc',
104104
'type.cc',
105+
'update/fast_append.cc',
105106
'update/pending_update.cc',
106107
'update/snapshot_update.cc',
107108
'update/update_partition_spec.cc',

src/iceberg/update/fast_append.cc

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/fast_append.h"
21+
22+
#include <format>
23+
#include <vector>
24+
25+
#include "iceberg/constants.h"
26+
#include "iceberg/file_io.h"
27+
#include "iceberg/manifest/manifest_entry.h"
28+
#include "iceberg/manifest/manifest_reader.h"
29+
#include "iceberg/manifest/manifest_writer.h"
30+
#include "iceberg/snapshot.h"
31+
#include "iceberg/table.h"
32+
#include "iceberg/table_metadata.h"
33+
#include "iceberg/transaction.h"
34+
#include "iceberg/util/error_collector.h"
35+
#include "iceberg/util/macros.h"
36+
37+
namespace iceberg {
38+
39+
Result<std::unique_ptr<FastAppend>> FastAppend::Make(
40+
std::string table_name, std::shared_ptr<Transaction> transaction) {
41+
return std::unique_ptr<FastAppend>(
42+
new FastAppend(std::move(table_name), std::move(transaction)));
43+
}
44+
45+
FastAppend::FastAppend(std::string table_name, std::shared_ptr<Transaction> transaction)
46+
: SnapshotUpdate(std::move(transaction)), table_name_(std::move(table_name)) {}
47+
48+
FastAppend& FastAppend::AppendFile(std::shared_ptr<DataFile> file) {
49+
ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null");
50+
ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(),
51+
"Data file must have partition spec ID");
52+
53+
int32_t spec_id = file->partition_spec_id.value();
54+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, Spec(spec_id));
55+
56+
auto& data_files = new_data_files_by_spec_[spec_id];
57+
// Check if file already exists (simple check by path)
58+
bool is_new = true;
59+
for (const auto& existing_file : data_files) {
60+
if (existing_file->file_path == file->file_path) {
61+
is_new = false;
62+
break;
63+
}
64+
}
65+
66+
if (is_new) {
67+
data_files.push_back(file);
68+
has_new_files_ = true;
69+
AddFileToSummary(spec, file);
70+
}
71+
72+
return *this;
73+
}
74+
75+
FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
76+
if (manifest.existing_files_count.has_value() && manifest.existing_files_count != 0) {
77+
AddError(ErrorKind::kInvalidArgument, "Cannot append manifest with existing files");
78+
return *this;
79+
}
80+
if (manifest.deleted_files_count.has_value() && manifest.deleted_files_count != 0) {
81+
AddError(ErrorKind::kInvalidArgument, "Cannot append manifest with deleted files");
82+
return *this;
83+
}
84+
if (manifest.added_snapshot_id != kInvalidSnapshotId &&
85+
manifest.added_snapshot_id != -1) {
86+
AddError(ErrorKind::kInvalidArgument, "Snapshot id must be assigned during commit");
87+
return *this;
88+
}
89+
if (manifest.sequence_number != -1 &&
90+
manifest.sequence_number != TableMetadata::kInitialSequenceNumber) {
91+
AddError(ErrorKind::kInvalidArgument,
92+
"Sequence number must be assigned during commit");
93+
return *this;
94+
}
95+
96+
if (can_inherit_snapshot_id() && (manifest.added_snapshot_id == kInvalidSnapshotId ||
97+
manifest.added_snapshot_id == -1)) {
98+
AddManifestToSummary(manifest);
99+
append_manifests_.push_back(manifest);
100+
} else {
101+
// The manifest must be rewritten with this update's snapshot ID
102+
auto copied_manifest_result = CopyManifest(manifest);
103+
if (!copied_manifest_result.has_value()) {
104+
AddError(copied_manifest_result.error());
105+
return *this;
106+
}
107+
rewritten_append_manifests_.push_back(copied_manifest_result.value());
108+
}
109+
110+
return *this;
111+
}
112+
113+
FastAppend& FastAppend::ToBranch(const std::string& branch) {
114+
ICEBERG_BUILDER_RETURN_IF_ERROR(SetTargetBranch(branch));
115+
return *this;
116+
}
117+
118+
FastAppend& FastAppend::Set(const std::string& property, const std::string& value) {
119+
summary_[property] = value;
120+
return *this;
121+
}
122+
123+
std::string FastAppend::operation() { return DataOperation::kAppend; }
124+
125+
Result<std::vector<ManifestFile>> FastAppend::Apply(
126+
const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) {
127+
std::vector<ManifestFile> manifests;
128+
129+
ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests());
130+
if (!new_written_manifests.empty()) {
131+
manifests.insert(manifests.end(), new_written_manifests.begin(),
132+
new_written_manifests.end());
133+
}
134+
135+
// Transform append manifests and rewritten append manifests with snapshot ID
136+
int64_t snapshot_id = SnapshotId();
137+
for (const auto& manifest : append_manifests_) {
138+
ManifestFile updated = manifest;
139+
updated.added_snapshot_id = snapshot_id;
140+
manifests.push_back(updated);
141+
}
142+
143+
for (const auto& manifest : rewritten_append_manifests_) {
144+
ManifestFile updated = manifest;
145+
updated.added_snapshot_id = snapshot_id;
146+
manifests.push_back(updated);
147+
}
148+
149+
// Add all manifests from the snapshot
150+
if (snapshot != nullptr) {
151+
// Use SnapshotCache to get manifests, similar to snapshot_update.cc
152+
auto cached_snapshot = SnapshotCache(snapshot.get());
153+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests_span,
154+
cached_snapshot.Manifests(transaction_->table()->io()));
155+
std::vector<ManifestFile> snapshot_manifests(snapshot_manifests_span.begin(),
156+
snapshot_manifests_span.end());
157+
manifests.insert(manifests.end(), snapshot_manifests.begin(),
158+
snapshot_manifests.end());
159+
}
160+
161+
return manifests;
162+
}
163+
164+
std::unordered_map<std::string, std::string> FastAppend::Summary() {
165+
// Set partition summary limit from table properties
166+
// Note: Partition summary limit is handled during manifest writing, not here
167+
// The limit affects how many partition summaries are included in manifest metadata
168+
169+
// Note: In Java, the summary builder has a method to set partition summary limit.
170+
// Here we just return the summary map. The partition summary limit is typically
171+
// used when building partition summaries, which happens during manifest writing.
172+
// We'll handle it in the manifest writing phase if needed.
173+
174+
return summary_;
175+
}
176+
177+
void FastAppend::CleanUncommitted(const std::unordered_set<std::string>& committed) {
178+
// Clean up new manifests that were written but not committed
179+
if (!new_manifests_.empty()) {
180+
for (const auto& manifest : new_manifests_) {
181+
if (committed.find(manifest.manifest_path) == committed.end()) {
182+
std::ignore = transaction_->table()->io()->DeleteFile(manifest.manifest_path);
183+
}
184+
}
185+
new_manifests_.clear();
186+
}
187+
188+
// Clean up only rewritten_append_manifests as they are always owned by the table
189+
// Don't clean up append_manifests as they are added to the manifest list and are
190+
// not compacted
191+
if (!rewritten_append_manifests_.empty()) {
192+
for (const auto& manifest : rewritten_append_manifests_) {
193+
if (committed.find(manifest.manifest_path) == committed.end()) {
194+
std::ignore = transaction_->table()->io()->DeleteFile(manifest.manifest_path);
195+
}
196+
}
197+
}
198+
}
199+
200+
bool FastAppend::CleanupAfterCommit() const {
201+
// Cleanup after committing is disabled for FastAppend unless there are
202+
// rewritten_append_manifests because:
203+
// 1.) Appended manifests are never rewritten
204+
// 2.) Manifests which are written out as part of appendFile are already cleaned
205+
// up between commit attempts in writeNewManifests
206+
return !rewritten_append_manifests_.empty();
207+
}
208+
209+
Result<std::shared_ptr<PartitionSpec>> FastAppend::Spec(int32_t spec_id) {
210+
return base().PartitionSpecById(spec_id);
211+
}
212+
213+
Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {
214+
const TableMetadata& current = base();
215+
ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema());
216+
ICEBERG_ASSIGN_OR_RAISE(auto spec,
217+
current.PartitionSpecById(manifest.partition_spec_id));
218+
219+
// Read the manifest entries
220+
ICEBERG_ASSIGN_OR_RAISE(
221+
auto reader,
222+
ManifestReader::Make(manifest, transaction_->table()->io(), schema, spec));
223+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
224+
225+
// Create a new manifest writer
226+
// Generate a unique manifest path using the transaction's metadata location
227+
std::string filename = std::format("copy-m{}.avro", copy_manifest_count_++);
228+
std::string new_manifest_path = transaction_->MetadataFileLocation(filename);
229+
int64_t snapshot_id = SnapshotId();
230+
ICEBERG_ASSIGN_OR_RAISE(
231+
auto writer, ManifestWriter::MakeWriter(
232+
current.format_version, snapshot_id, new_manifest_path,
233+
transaction_->table()->io(), spec, schema, ManifestContent::kData,
234+
/*first_row_id=*/current.next_row_id));
235+
236+
// Write all entries as added entries with the new snapshot ID
237+
for (auto& entry : entries) {
238+
ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded,
239+
"Manifest to copy must only contain added entries");
240+
entry.snapshot_id = snapshot_id;
241+
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
242+
}
243+
244+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
245+
ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile());
246+
247+
// Update summary with the copied manifest
248+
AddManifestToSummary(new_manifest);
249+
250+
return new_manifest;
251+
}
252+
253+
Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
254+
// If there are new files and manifests were already written, clean them up
255+
if (has_new_files_ && !new_manifests_.empty()) {
256+
for (const auto& manifest : new_manifests_) {
257+
std::ignore = transaction_->table()->io()->DeleteFile(manifest.manifest_path);
258+
}
259+
new_manifests_.clear();
260+
}
261+
262+
// Write new manifests if there are new data files
263+
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
264+
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
265+
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
266+
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests,
267+
WriteDataManifests(data_files, spec));
268+
new_manifests_.insert(new_manifests_.end(), written_manifests.begin(),
269+
written_manifests.end());
270+
}
271+
has_new_files_ = false;
272+
}
273+
274+
return new_manifests_;
275+
}
276+
277+
Status FastAppend::AddFileToSummary(const std::shared_ptr<PartitionSpec>& spec,
278+
const std::shared_ptr<DataFile>& file) {
279+
// Update added data files count
280+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
281+
int64_t added_data_files = (it != summary_.end()) ? std::stoll(it->second) : 0;
282+
summary_[SnapshotSummaryFields::kAddedDataFiles] = std::to_string(added_data_files + 1);
283+
284+
// Update added records count
285+
it = summary_.find(SnapshotSummaryFields::kAddedRecords);
286+
int64_t added_records = (it != summary_.end()) ? std::stoll(it->second) : 0;
287+
summary_[SnapshotSummaryFields::kAddedRecords] =
288+
std::to_string(added_records + file->record_count);
289+
290+
// Update added file size
291+
it = summary_.find(SnapshotSummaryFields::kAddedFileSize);
292+
int64_t added_file_size = (it != summary_.end()) ? std::stoll(it->second) : 0;
293+
summary_[SnapshotSummaryFields::kAddedFileSize] =
294+
std::to_string(added_file_size + file->file_size_in_bytes);
295+
296+
// Note: Partition summary is built during manifest writing, not here.
297+
// The partition summary limit is handled in the manifest writer.
298+
return {};
299+
}
300+
301+
Status FastAppend::AddManifestToSummary(const ManifestFile& manifest) {
302+
// Update added data files count from manifest
303+
if (manifest.added_files_count.has_value()) {
304+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
305+
int64_t added_data_files = (it != summary_.end()) ? std::stoll(it->second) : 0;
306+
summary_[SnapshotSummaryFields::kAddedDataFiles] =
307+
std::to_string(added_data_files + manifest.added_files_count.value());
308+
}
309+
310+
// Update added records count from manifest
311+
if (manifest.added_rows_count.has_value()) {
312+
auto it = summary_.find(SnapshotSummaryFields::kAddedRecords);
313+
int64_t added_records = (it != summary_.end()) ? std::stoll(it->second) : 0;
314+
summary_[SnapshotSummaryFields::kAddedRecords] =
315+
std::to_string(added_records + manifest.added_rows_count.value());
316+
}
317+
return {};
318+
}
319+
320+
} // namespace iceberg

0 commit comments

Comments
 (0)