Skip to content

Commit 141eadc

Browse files
committed
feat: add FastAppend
1 parent 20a961a commit 141eadc

File tree

14 files changed

+698
-3
lines changed

14 files changed

+698
-3
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ set(ICEBERG_SOURCES
8585
transform_function.cc
8686
type.cc
8787
update/expire_snapshots.cc
88+
update/fast_append.cc
8889
update/pending_update.cc
8990
update/snapshot_update.cc
9091
update/update_location.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ iceberg_sources = files(
103103
'transform_function.cc',
104104
'type.cc',
105105
'update/expire_snapshots.cc',
106+
'update/fast_append.cc',
106107
'update/pending_update.cc',
107108
'update/snapshot_update.cc',
108109
'update/update_location.cc',

src/iceberg/table.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,13 @@ Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
199199
return transaction->NewUpdateLocation();
200200
}
201201

202+
Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
203+
ICEBERG_ASSIGN_OR_RAISE(
204+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
205+
/*auto_commit=*/true));
206+
return transaction->NewFastAppend();
207+
}
208+
202209
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
203210
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
204211
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
156156
/// changes.
157157
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
158158

159+
/// \brief Create a new FastAppend to append data files and commit the changes.
160+
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();
161+
159162
protected:
160163
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
161164
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
172172
USE_BUNDLE
173173
SOURCES
174174
expire_snapshots_test.cc
175+
fast_append_test.cc
175176
transaction_test.cc
176177
update_location_test.cc
177178
update_partition_spec_test.cc
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/fast_append.h"
21+
22+
#include <format>
23+
24+
#include <gmock/gmock.h>
25+
#include <gtest/gtest.h>
26+
27+
#include "iceberg/avro/avro_register.h"
28+
#include "iceberg/partition_spec.h"
29+
#include "iceberg/schema.h"
30+
#include "iceberg/table_metadata.h"
31+
#include "iceberg/test/matchers.h"
32+
#include "iceberg/test/test_resource.h"
33+
#include "iceberg/test/update_test_base.h"
34+
#include "iceberg/util/uuid.h"
35+
36+
namespace iceberg {
37+
38+
class FastAppendTest : public UpdateTestBase {
39+
protected:
40+
static void SetUpTestSuite() { avro::RegisterAll(); }
41+
42+
void SetUp() override {
43+
UpdateTestBase::SetUp();
44+
45+
ASSERT_THAT(catalog_->DropTable(table_ident_, /*purge=*/false), IsOk());
46+
47+
auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
48+
table_location_, Uuid::GenerateV7().ToString());
49+
ICEBERG_UNWRAP_OR_FAIL(
50+
auto metadata, ReadTableMetadataFromResource("TableMetadataV2ValidMinimal.json"));
51+
metadata->location = table_location_;
52+
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata),
53+
IsOk());
54+
ICEBERG_UNWRAP_OR_FAIL(table_,
55+
catalog_->RegisterTable(table_ident_, metadata_location));
56+
57+
// Get partition spec and schema from the base table
58+
ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
59+
ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
60+
61+
// Create test data files
62+
file_a_ = CreateDataFile("/data/file_a.parquet", 100, 1024);
63+
file_b_ = CreateDataFile("/data/file_b.parquet", 200, 2048);
64+
}
65+
66+
std::shared_ptr<DataFile> CreateDataFile(const std::string& path, int64_t record_count,
67+
int64_t size, int64_t partition_value = 0) {
68+
auto data_file = std::make_shared<DataFile>();
69+
data_file->content = DataFile::Content::kData;
70+
data_file->file_path = table_location_ + path;
71+
data_file->file_format = FileFormatType::kParquet;
72+
// The base table has partition spec with identity(x), so we need 1 partition value
73+
data_file->partition =
74+
PartitionValues(std::vector<Literal>{Literal::Long(partition_value)});
75+
data_file->file_size_in_bytes = size;
76+
data_file->record_count = record_count;
77+
data_file->partition_spec_id = spec_->spec_id();
78+
return data_file;
79+
}
80+
81+
std::shared_ptr<PartitionSpec> spec_;
82+
std::shared_ptr<Schema> schema_;
83+
std::shared_ptr<DataFile> file_a_;
84+
std::shared_ptr<DataFile> file_b_;
85+
};
86+
87+
TEST_F(FastAppendTest, AppendDataFile) {
88+
std::shared_ptr<FastAppend> fast_append;
89+
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
90+
fast_append->AppendFile(file_a_);
91+
92+
EXPECT_THAT(fast_append->Commit(), IsOk());
93+
94+
EXPECT_THAT(table_->Refresh(), IsOk());
95+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
96+
EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
97+
EXPECT_EQ(snapshot->summary.at("added-records"), "100");
98+
EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024");
99+
}
100+
101+
TEST_F(FastAppendTest, AppendMultipleDataFiles) {
102+
std::shared_ptr<FastAppend> fast_append;
103+
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
104+
fast_append->AppendFile(file_a_);
105+
fast_append->AppendFile(file_b_);
106+
107+
EXPECT_THAT(fast_append->Commit(), IsOk());
108+
109+
EXPECT_THAT(table_->Refresh(), IsOk());
110+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
111+
EXPECT_EQ(snapshot->summary.at("added-data-files"), "2");
112+
EXPECT_EQ(snapshot->summary.at("added-records"), "300");
113+
EXPECT_EQ(snapshot->summary.at("added-files-size"), "3072");
114+
}
115+
116+
TEST_F(FastAppendTest, AppendManyFiles) {
117+
std::shared_ptr<FastAppend> fast_append;
118+
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
119+
120+
int64_t total_records = 0;
121+
int64_t total_size = 0;
122+
constexpr int kFileCount = 10;
123+
for (int index = 0; index < kFileCount; ++index) {
124+
auto data_file = CreateDataFile(std::format("/data/file_{}.parquet", index),
125+
/*record_count=*/10 + index,
126+
/*size=*/100 + index * 10,
127+
/*partition_value=*/index % 2);
128+
total_records += data_file->record_count;
129+
total_size += data_file->file_size_in_bytes;
130+
fast_append->AppendFile(std::move(data_file));
131+
}
132+
133+
EXPECT_THAT(fast_append->Commit(), IsOk());
134+
135+
EXPECT_THAT(table_->Refresh(), IsOk());
136+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
137+
EXPECT_EQ(snapshot->summary.at("added-data-files"), std::to_string(kFileCount));
138+
EXPECT_EQ(snapshot->summary.at("added-records"), std::to_string(total_records));
139+
EXPECT_EQ(snapshot->summary.at("added-files-size"), std::to_string(total_size));
140+
}
141+
142+
TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) {
143+
EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot"));
144+
const int64_t base_sequence_number = table_->metadata()->last_sequence_number;
145+
146+
std::shared_ptr<FastAppend> fast_append;
147+
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
148+
fast_append->AppendFile(file_a_);
149+
150+
EXPECT_THAT(fast_append->Commit(), IsOk());
151+
152+
EXPECT_THAT(table_->Refresh(), IsOk());
153+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
154+
EXPECT_EQ(snapshot->sequence_number, base_sequence_number + 1);
155+
EXPECT_EQ(table_->metadata()->last_sequence_number, base_sequence_number + 1);
156+
}
157+
158+
TEST_F(FastAppendTest, AppendNullFile) {
159+
std::shared_ptr<FastAppend> fast_append;
160+
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
161+
fast_append->AppendFile(nullptr);
162+
163+
auto result = fast_append->Commit();
164+
EXPECT_FALSE(result.has_value());
165+
EXPECT_THAT(result, HasErrorMessage("Invalid data file: null"));
166+
EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot"));
167+
}
168+
169+
TEST_F(FastAppendTest, AppendDuplicateFile) {
170+
std::shared_ptr<FastAppend> fast_append;
171+
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
172+
fast_append->AppendFile(file_a_);
173+
fast_append->AppendFile(file_a_); // Add same file twice
174+
175+
EXPECT_THAT(fast_append->Commit(), IsOk());
176+
177+
EXPECT_THAT(table_->Refresh(), IsOk());
178+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
179+
// Should only count the file once
180+
EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
181+
EXPECT_EQ(snapshot->summary.at("added-records"), "100");
182+
}
183+
184+
TEST_F(FastAppendTest, SetSnapshotProperty) {
185+
std::shared_ptr<FastAppend> fast_append;
186+
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
187+
fast_append->Set("custom-property", "custom-value");
188+
fast_append->AppendFile(file_a_);
189+
190+
EXPECT_THAT(fast_append->Commit(), IsOk());
191+
192+
EXPECT_THAT(table_->Refresh(), IsOk());
193+
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
194+
EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value");
195+
}
196+
197+
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
#include <optional>
2424

2525
#include "iceberg/catalog.h"
26-
#include "iceberg/schema.h"
2726
#include "iceberg/table.h"
2827
#include "iceberg/table_metadata.h"
2928
#include "iceberg/table_properties.h"
3029
#include "iceberg/table_requirement.h"
3130
#include "iceberg/table_requirements.h"
3231
#include "iceberg/table_update.h"
3332
#include "iceberg/update/expire_snapshots.h"
33+
#include "iceberg/update/fast_append.h"
3434
#include "iceberg/update/pending_update.h"
3535
#include "iceberg/update/snapshot_update.h"
3636
#include "iceberg/update/update_location.h"
@@ -293,4 +293,11 @@ Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
293293
return update_location;
294294
}
295295

296+
Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
297+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
298+
FastAppend::Make(table_->name().name, shared_from_this()));
299+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append));
300+
return fast_append;
301+
}
302+
296303
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
8686
/// changes.
8787
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
8888

89+
/// \brief Create a new FastAppend to append data files and commit the changes.
90+
Result<std::shared_ptr<FastAppend>> NewFastAppend();
91+
8992
private:
9093
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
9194
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,9 @@ class TableUpdateContext;
187187
class Transaction;
188188

189189
/// \brief Update family.
190+
class AppendFiles;
190191
class ExpireSnapshots;
192+
class FastAppend;
191193
class PendingUpdate;
192194
class SnapshotUpdate;
193195
class UpdateLocation;
@@ -200,7 +202,6 @@ class UpdateSortOrder;
200202
/// TODO: Forward declarations below are not added yet.
201203
/// ----------------------------------------------------------------------------
202204

203-
class AppendFiles;
204205
class EncryptedKey;
205206

206207
} // namespace iceberg

src/iceberg/update/append_files.h

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
24+
#include "iceberg/iceberg_export.h"
25+
#include "iceberg/type_fwd.h"
26+
27+
namespace iceberg {
28+
29+
/// \brief API for appending new files in a table.
30+
///
31+
/// This API accumulates file additions, produces a new Snapshot of the table, and commits
32+
/// that snapshot as the current.
33+
///
34+
/// When committing, these changes will be applied to the latest table snapshot. Commit
35+
/// conflicts will be resolved by applying the changes to the new latest snapshot and
36+
/// reattempting the commit.
37+
class ICEBERG_EXPORT AppendFiles {
38+
public:
39+
virtual ~AppendFiles() = default;
40+
41+
/// \brief Append a DataFile to the table.
42+
///
43+
/// \param file A data file
44+
/// \return Reference to this for method chaining
45+
virtual AppendFiles& AppendFile(std::shared_ptr<DataFile> file) = 0;
46+
47+
/// \brief Append a ManifestFile to the table.
48+
///
49+
/// The manifest must contain only appended files. All files in the manifest will be
50+
/// appended to the table in the snapshot created by this update.
51+
///
52+
/// The manifest will be used directly if snapshot ID inheritance is enabled (all tables
53+
/// with the format version > 1 or if the inheritance is enabled explicitly via table
54+
/// properties). Otherwise, the manifest will be rewritten to assign all entries this
55+
/// update's snapshot ID.
56+
///
57+
/// If the manifest is rewritten, it is always the responsibility of the caller to
58+
/// manage the lifecycle of the original manifest. If the manifest is used directly, it
59+
/// should never be deleted manually if the commit succeeds as it will become part of
60+
/// the table metadata and will be cleaned upon expiry. If the manifest gets merged with
61+
/// others while preparing a new snapshot, it will be deleted automatically if this
62+
/// operation is successful. If the commit fails, the manifest will never be deleted,
63+
/// and it is up to the caller whether to delete or reuse it.
64+
///
65+
/// \param file A manifest file
66+
/// \return Reference to this for method chaining
67+
virtual AppendFiles& AppendManifest(const ManifestFile& file) = 0;
68+
};
69+
70+
} // namespace iceberg

0 commit comments

Comments
 (0)