Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/expire_snapshots.cc
update/fast_append.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_location.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/expire_snapshots.cc',
'update/fast_append.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
Expand Down
7 changes: 7 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
return transaction->NewUpdateLocation();
}

Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewFastAppend();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

/// \brief Create a new FastAppend to append data files and commit the changes.
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
Expand Down
197 changes: 197 additions & 0 deletions src/iceberg/test/fast_append_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/fast_append.h"

#include <format>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/avro/avro_register.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/table_metadata.h"
#include "iceberg/test/matchers.h"
#include "iceberg/test/test_resource.h"
#include "iceberg/test/update_test_base.h"
#include "iceberg/util/uuid.h"

namespace iceberg {

class FastAppendTest : public UpdateTestBase {
protected:
static void SetUpTestSuite() { avro::RegisterAll(); }

void SetUp() override {
UpdateTestBase::SetUp();

ASSERT_THAT(catalog_->DropTable(table_ident_, /*purge=*/false), IsOk());
Comment thread
wgtmac marked this conversation as resolved.
Outdated

auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
table_location_, Uuid::GenerateV7().ToString());
ICEBERG_UNWRAP_OR_FAIL(
auto metadata, ReadTableMetadataFromResource("TableMetadataV2ValidMinimal.json"));
metadata->location = table_location_;
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata),
IsOk());
ICEBERG_UNWRAP_OR_FAIL(table_,
catalog_->RegisterTable(table_ident_, metadata_location));

// Get partition spec and schema from the base table
ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());

// Create test data files
file_a_ = CreateDataFile("/data/file_a.parquet", 100, 1024);
file_b_ = CreateDataFile("/data/file_b.parquet", 200, 2048);
Comment thread
zhjwpku marked this conversation as resolved.
Outdated
}

std::shared_ptr<DataFile> CreateDataFile(const std::string& path, int64_t record_count,
int64_t size, int64_t partition_value = 0) {
auto data_file = std::make_shared<DataFile>();
data_file->content = DataFile::Content::kData;
data_file->file_path = table_location_ + path;
data_file->file_format = FileFormatType::kParquet;
// The base table has partition spec with identity(x), so we need 1 partition value
data_file->partition =
PartitionValues(std::vector<Literal>{Literal::Long(partition_value)});
data_file->file_size_in_bytes = size;
data_file->record_count = record_count;
data_file->partition_spec_id = spec_->spec_id();
return data_file;
}

std::shared_ptr<PartitionSpec> spec_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<DataFile> file_a_;
std::shared_ptr<DataFile> file_b_;
};

TEST_F(FastAppendTest, AppendDataFile) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);

EXPECT_THAT(fast_append->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
EXPECT_EQ(snapshot->summary.at("added-records"), "100");
EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024");
}

TEST_F(FastAppendTest, AppendMultipleDataFiles) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);
fast_append->AppendFile(file_b_);

EXPECT_THAT(fast_append->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("added-data-files"), "2");
EXPECT_EQ(snapshot->summary.at("added-records"), "300");
EXPECT_EQ(snapshot->summary.at("added-files-size"), "3072");
}

TEST_F(FastAppendTest, AppendManyFiles) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());

int64_t total_records = 0;
int64_t total_size = 0;
constexpr int kFileCount = 10;
for (int index = 0; index < kFileCount; ++index) {
auto data_file = CreateDataFile(std::format("/data/file_{}.parquet", index),
/*record_count=*/10 + index,
/*size=*/100 + index * 10,
/*partition_value=*/index % 2);
total_records += data_file->record_count;
total_size += data_file->file_size_in_bytes;
fast_append->AppendFile(std::move(data_file));
}

EXPECT_THAT(fast_append->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("added-data-files"), std::to_string(kFileCount));
EXPECT_EQ(snapshot->summary.at("added-records"), std::to_string(total_records));
EXPECT_EQ(snapshot->summary.at("added-files-size"), std::to_string(total_size));
}

TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) {
EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot"));
const int64_t base_sequence_number = table_->metadata()->last_sequence_number;

std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);

EXPECT_THAT(fast_append->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->sequence_number, base_sequence_number + 1);
EXPECT_EQ(table_->metadata()->last_sequence_number, base_sequence_number + 1);
}

TEST_F(FastAppendTest, AppendNullFile) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(nullptr);

auto result = fast_append->Commit();
EXPECT_FALSE(result.has_value());
EXPECT_THAT(result, HasErrorMessage("Invalid data file: null"));
EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current snapshot"));
}

TEST_F(FastAppendTest, AppendDuplicateFile) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->AppendFile(file_a_);
fast_append->AppendFile(file_a_); // Add same file twice

EXPECT_THAT(fast_append->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
// Should only count the file once
EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
EXPECT_EQ(snapshot->summary.at("added-records"), "100");
}

TEST_F(FastAppendTest, SetSnapshotProperty) {
std::shared_ptr<FastAppend> fast_append;
ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
fast_append->Set("custom-property", "custom-value");
fast_append->AppendFile(file_a_);

EXPECT_THAT(fast_append->Commit(), IsOk());

EXPECT_THAT(table_->Refresh(), IsOk());
ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value");
}

} // namespace iceberg
9 changes: 8 additions & 1 deletion src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
#include <optional>

#include "iceberg/catalog.h"
#include "iceberg/schema.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
#include "iceberg/update/expire_snapshots.h"
#include "iceberg/update/fast_append.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_location.h"
Expand Down Expand Up @@ -293,4 +293,11 @@ Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
return update_location;
}

Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
FastAppend::Make(table_->name().name, shared_from_this()));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append));
return fast_append;
}

} // namespace iceberg
3 changes: 3 additions & 0 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();

/// \brief Create a new FastAppend to append data files and commit the changes.
Result<std::shared_ptr<FastAppend>> NewFastAppend();

private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ class TableUpdateContext;
class Transaction;

/// \brief Update family.
class AppendFiles;
class ExpireSnapshots;
class FastAppend;
class PendingUpdate;
class SnapshotUpdate;
class UpdateLocation;
Expand All @@ -200,7 +202,6 @@ class UpdateSortOrder;
/// TODO: Forward declarations below are not added yet.
/// ----------------------------------------------------------------------------

class AppendFiles;
class EncryptedKey;

} // namespace iceberg
70 changes: 70 additions & 0 deletions src/iceberg/update/append_files.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>

#include "iceberg/iceberg_export.h"
#include "iceberg/type_fwd.h"

namespace iceberg {

/// \brief API for appending new files in a table.
///
/// This API accumulates file additions, produces a new Snapshot of the table, and commits
/// that snapshot as the current.
///
/// When committing, these changes will be applied to the latest table snapshot. Commit
/// conflicts will be resolved by applying the changes to the new latest snapshot and
/// reattempting the commit.
class ICEBERG_EXPORT AppendFiles {
Comment thread
zhjwpku marked this conversation as resolved.
Outdated
public:
virtual ~AppendFiles() = default;

/// \brief Append a DataFile to the table.
///
/// \param file A data file
/// \return Reference to this for method chaining
virtual AppendFiles& AppendFile(std::shared_ptr<DataFile> file) = 0;

/// \brief Append a ManifestFile to the table.
///
/// The manifest must contain only appended files. All files in the manifest will be
/// appended to the table in the snapshot created by this update.
///
/// The manifest will be used directly if snapshot ID inheritance is enabled (all tables
/// with the format version > 1 or if the inheritance is enabled explicitly via table
/// properties). Otherwise, the manifest will be rewritten to assign all entries this
/// update's snapshot ID.
///
/// If the manifest is rewritten, it is always the responsibility of the caller to
/// manage the lifecycle of the original manifest. If the manifest is used directly, it
/// should never be deleted manually if the commit succeeds as it will become part of
/// the table metadata and will be cleaned upon expiry. If the manifest gets merged with
/// others while preparing a new snapshot, it will be deleted automatically if this
/// operation is successful. If the commit fails, the manifest will never be deleted,
/// and it is up to the caller whether to delete or reuse it.
///
/// \param file A manifest file
/// \return Reference to this for method chaining
virtual AppendFiles& AppendManifest(const ManifestFile& file) = 0;
};

} // namespace iceberg
Loading
Loading