Skip to content

Commit 4f8b7bc

Browse files
committed
feat: add FastAppend
1 parent 08e8127 commit 4f8b7bc

2 files changed

Lines changed: 471 additions & 0 deletions

File tree

src/iceberg/update/fast_append.cc

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/fast_append.h"
21+
22+
#include <format>
23+
#include <vector>
24+
25+
#include "iceberg/constants.h"
26+
#include "iceberg/file_io.h"
27+
#include "iceberg/manifest/manifest_entry.h"
28+
#include "iceberg/manifest/manifest_reader.h"
29+
#include "iceberg/manifest/manifest_writer.h"
30+
#include "iceberg/snapshot.h"
31+
#include "iceberg/table_metadata.h"
32+
#include "iceberg/transaction.h"
33+
#include "iceberg/util/macros.h"
34+
35+
namespace iceberg {
36+
37+
Result<std::unique_ptr<FastAppend>> FastAppend::Make(
38+
const std::string& table_name, std::shared_ptr<Transaction> transaction) {
39+
return std::make_unique<FastAppend>(table_name, std::move(transaction));
40+
}
41+
42+
FastAppend::FastAppend(const std::string& table_name,
43+
std::shared_ptr<Transaction> transaction)
44+
: SnapshotUpdate(std::move(transaction)), table_name_(table_name) {}
45+
46+
FastAppend& FastAppend::AppendFile(std::shared_ptr<DataFile> file) {
47+
if (file == nullptr) {
48+
AddError(ErrorKind::kInvalidArgument, "Invalid data file: null");
49+
return *this;
50+
}
51+
52+
if (!file->partition_spec_id.has_value()) {
53+
AddError(ErrorKind::kInvalidArgument, "Data file must have partition spec ID");
54+
return *this;
55+
}
56+
57+
int32_t spec_id = file->partition_spec_id.value();
58+
auto spec_result = Spec(spec_id);
59+
if (!spec_result.has_value()) {
60+
AddError(spec_result.error());
61+
return *this;
62+
}
63+
64+
auto spec = spec_result.value();
65+
if (spec == nullptr) {
66+
AddError(ErrorKind::kInvalidArgument,
67+
"Cannot find partition spec {} for data file: {}", spec_id, file->file_path);
68+
return *this;
69+
}
70+
71+
auto& data_files = new_data_files_by_spec_[spec_id];
72+
// Check if file already exists (simple check by path)
73+
bool is_new = true;
74+
for (const auto& existing_file : data_files) {
75+
if (existing_file->file_path == file->file_path) {
76+
is_new = false;
77+
break;
78+
}
79+
}
80+
81+
if (is_new) {
82+
data_files.push_back(file);
83+
has_new_files_ = true;
84+
AddFileToSummary(spec, file);
85+
}
86+
87+
return *this;
88+
}
89+
90+
FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
91+
if (manifest.existing_files_count.has_value() && manifest.existing_files_count != 0) {
92+
AddError(ErrorKind::kInvalidArgument, "Cannot append manifest with existing files");
93+
return *this;
94+
}
95+
if (manifest.deleted_files_count.has_value() && manifest.deleted_files_count != 0) {
96+
AddError(ErrorKind::kInvalidArgument, "Cannot append manifest with deleted files");
97+
return *this;
98+
}
99+
if (manifest.added_snapshot_id != kInvalidSnapshotId &&
100+
manifest.added_snapshot_id != -1) {
101+
AddError(ErrorKind::kInvalidArgument, "Snapshot id must be assigned during commit");
102+
return *this;
103+
}
104+
if (manifest.sequence_number != -1 &&
105+
manifest.sequence_number != TableMetadata::kInitialSequenceNumber) {
106+
AddError(ErrorKind::kInvalidArgument,
107+
"Sequence number must be assigned during commit");
108+
return *this;
109+
}
110+
111+
if (can_inherit_snapshot_id() && (manifest.added_snapshot_id == kInvalidSnapshotId ||
112+
manifest.added_snapshot_id == -1)) {
113+
AddManifestToSummary(manifest);
114+
append_manifests_.push_back(manifest);
115+
} else {
116+
// The manifest must be rewritten with this update's snapshot ID
117+
auto copied_manifest_result = CopyManifest(manifest);
118+
if (!copied_manifest_result.has_value()) {
119+
AddError(copied_manifest_result.error());
120+
return *this;
121+
}
122+
rewritten_append_manifests_.push_back(copied_manifest_result.value());
123+
}
124+
125+
return *this;
126+
}
127+
128+
FastAppend& FastAppend::ToBranch(const std::string& branch) {
129+
auto status = SetTargetBranch(branch);
130+
if (!status.has_value()) {
131+
AddError(status.error());
132+
}
133+
return *this;
134+
}
135+
136+
FastAppend& FastAppend::Set(const std::string& property, const std::string& value) {
137+
summary_[property] = value;
138+
return *this;
139+
}
140+
141+
std::string FastAppend::operation() { return DataOperation::kAppend; }
142+
143+
Result<std::vector<ManifestFile>> FastAppend::Apply(
144+
const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& snapshot) {
145+
std::vector<ManifestFile> manifests;
146+
147+
ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests());
148+
if (!new_written_manifests.empty()) {
149+
manifests.insert(manifests.end(), new_written_manifests.begin(),
150+
new_written_manifests.end());
151+
}
152+
153+
// Transform append manifests and rewritten append manifests with snapshot ID
154+
int64_t snapshot_id = SnapshotId();
155+
for (const auto& manifest : append_manifests_) {
156+
ManifestFile updated = manifest;
157+
updated.added_snapshot_id = snapshot_id;
158+
manifests.push_back(updated);
159+
}
160+
161+
for (const auto& manifest : rewritten_append_manifests_) {
162+
ManifestFile updated = manifest;
163+
updated.added_snapshot_id = snapshot_id;
164+
manifests.push_back(updated);
165+
}
166+
167+
// Add all manifests from the snapshot
168+
if (snapshot != nullptr) {
169+
// Use SnapshotCache to get manifests, similar to snapshot_update.cc
170+
auto cached_snapshot = SnapshotCache(snapshot.get());
171+
auto snapshot_manifests_result =
172+
cached_snapshot.Manifests(transaction_->table()->io());
173+
if (!snapshot_manifests_result.has_value()) {
174+
return snapshot_manifests_result.error();
175+
}
176+
auto snapshot_manifests_span = snapshot_manifests_result.value();
177+
std::vector<ManifestFile> snapshot_manifests(snapshot_manifests_span.begin(),
178+
snapshot_manifests_span.end());
179+
manifests.insert(manifests.end(), snapshot_manifests.begin(),
180+
snapshot_manifests.end());
181+
}
182+
183+
return manifests;
184+
}
185+
186+
std::unordered_map<std::string, std::string> FastAppend::Summary() {
187+
// Set partition summary limit from table properties
188+
// Note: Partition summary limit is handled during manifest writing, not here
189+
// The limit affects how many partition summaries are included in manifest metadata
190+
191+
// Note: In Java, the summary builder has a method to set partition summary limit.
192+
// Here we just return the summary map. The partition summary limit is typically
193+
// used when building partition summaries, which happens during manifest writing.
194+
// We'll handle it in the manifest writing phase if needed.
195+
196+
return summary_;
197+
}
198+
199+
void FastAppend::CleanUncommitted(const std::unordered_set<std::string>& committed) {
200+
// Clean up new manifests that were written but not committed
201+
if (!new_manifests_.empty()) {
202+
for (const auto& manifest : new_manifests_) {
203+
if (committed.find(manifest.manifest_path) == committed.end()) {
204+
std::ignore = transaction_->table()->io()->DeleteFile(manifest.manifest_path);
205+
}
206+
}
207+
new_manifests_.clear();
208+
}
209+
210+
// Clean up only rewritten_append_manifests as they are always owned by the table
211+
// Don't clean up append_manifests as they are added to the manifest list and are
212+
// not compacted
213+
if (!rewritten_append_manifests_.empty()) {
214+
for (const auto& manifest : rewritten_append_manifests_) {
215+
if (committed.find(manifest.manifest_path) == committed.end()) {
216+
std::ignore = transaction_->table()->io()->DeleteFile(manifest.manifest_path);
217+
}
218+
}
219+
}
220+
}
221+
222+
bool FastAppend::CleanupAfterCommit() const {
223+
// Cleanup after committing is disabled for FastAppend unless there are
224+
// rewritten_append_manifests because:
225+
// 1.) Appended manifests are never rewritten
226+
// 2.) Manifests which are written out as part of appendFile are already cleaned
227+
// up between commit attempts in writeNewManifests
228+
return !rewritten_append_manifests_.empty();
229+
}
230+
231+
Result<std::shared_ptr<PartitionSpec>> FastAppend::Spec(int32_t spec_id) {
232+
return base().PartitionSpecById(spec_id);
233+
}
234+
235+
Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {
236+
const TableMetadata& current = base();
237+
ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema());
238+
ICEBERG_ASSIGN_OR_RAISE(auto spec,
239+
current.PartitionSpecById(manifest.partition_spec_id));
240+
241+
// Read the manifest entries
242+
ICEBERG_ASSIGN_OR_RAISE(
243+
auto reader,
244+
ManifestReader::Make(manifest, transaction_->table()->io(), schema, spec));
245+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
246+
247+
// Create a new manifest writer
248+
// Generate a unique manifest path using the transaction's metadata location
249+
std::string filename = std::format("copy-m{}.avro", copy_manifest_count_++);
250+
std::string new_manifest_path = transaction_->MetadataFileLocation(filename);
251+
int64_t snapshot_id = SnapshotId();
252+
ICEBERG_ASSIGN_OR_RAISE(
253+
auto writer, ManifestWriter::MakeWriter(
254+
current.format_version, snapshot_id, new_manifest_path,
255+
transaction_->table()->io(), spec, schema, ManifestContent::kData,
256+
/*first_row_id=*/current.next_row_id));
257+
258+
// Write all entries as added entries with the new snapshot ID
259+
for (auto& entry : entries) {
260+
ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded,
261+
"Manifest to copy must only contain added entries");
262+
entry.snapshot_id = snapshot_id;
263+
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
264+
}
265+
266+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
267+
ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile());
268+
269+
// Update summary with the copied manifest
270+
AddManifestToSummary(new_manifest);
271+
272+
return new_manifest;
273+
}
274+
275+
Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
276+
// If there are new files and manifests were already written, clean them up
277+
if (has_new_files_ && !new_manifests_.empty()) {
278+
for (const auto& manifest : new_manifests_) {
279+
std::ignore = transaction_->table()->io()->DeleteFile(manifest.manifest_path);
280+
}
281+
new_manifests_.clear();
282+
}
283+
284+
// Write new manifests if there are new data files
285+
if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
286+
for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
287+
ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
288+
ICEBERG_ASSIGN_OR_RAISE(auto written_manifests,
289+
WriteDataManifests(data_files, spec));
290+
new_manifests_.insert(new_manifests_.end(), written_manifests.begin(),
291+
written_manifests.end());
292+
}
293+
has_new_files_ = false;
294+
}
295+
296+
return new_manifests_;
297+
}
298+
299+
void FastAppend::AddFileToSummary(const std::shared_ptr<PartitionSpec>& spec,
300+
const std::shared_ptr<DataFile>& file) {
301+
// Update added data files count
302+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
303+
int64_t added_data_files = (it != summary_.end()) ? std::stoll(it->second) : 0;
304+
summary_[SnapshotSummaryFields::kAddedDataFiles] = std::to_string(added_data_files + 1);
305+
306+
// Update added records count
307+
it = summary_.find(SnapshotSummaryFields::kAddedRecords);
308+
int64_t added_records = (it != summary_.end()) ? std::stoll(it->second) : 0;
309+
summary_[SnapshotSummaryFields::kAddedRecords] =
310+
std::to_string(added_records + file->record_count);
311+
312+
// Update added file size
313+
it = summary_.find(SnapshotSummaryFields::kAddedFileSize);
314+
int64_t added_file_size = (it != summary_.end()) ? std::stoll(it->second) : 0;
315+
summary_[SnapshotSummaryFields::kAddedFileSize] =
316+
std::to_string(added_file_size + file->file_size_in_bytes);
317+
318+
// Note: Partition summary is built during manifest writing, not here.
319+
// The partition summary limit is handled in the manifest writer.
320+
}
321+
322+
void FastAppend::AddManifestToSummary(const ManifestFile& manifest) {
323+
// Update added data files count from manifest
324+
if (manifest.added_files_count.has_value()) {
325+
auto it = summary_.find(SnapshotSummaryFields::kAddedDataFiles);
326+
int64_t added_data_files = (it != summary_.end()) ? std::stoll(it->second) : 0;
327+
summary_[SnapshotSummaryFields::kAddedDataFiles] =
328+
std::to_string(added_data_files + manifest.added_files_count.value());
329+
}
330+
331+
// Update added records count from manifest
332+
if (manifest.added_rows_count.has_value()) {
333+
auto it = summary_.find(SnapshotSummaryFields::kAddedRecords);
334+
int64_t added_records = (it != summary_.end()) ? std::stoll(it->second) : 0;
335+
summary_[SnapshotSummaryFields::kAddedRecords] =
336+
std::to_string(added_records + manifest.added_rows_count.value());
337+
}
338+
}
339+
340+
} // namespace iceberg

0 commit comments

Comments
 (0)