Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ set(ICEBERG_SOURCES
update/pending_update.cc
update/update_partition_spec.cc
update/update_properties.cc
update/update_schema.cc
update/update_sort_order.cc
util/bucket_util.cc
util/content_file_util.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ iceberg_sources = files(
'update/pending_update.cc',
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_sort_order.cc',
'util/bucket_util.cc',
'util/content_file_util.cc',
Expand Down
12 changes: 12 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "iceberg/transaction.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -171,6 +172,13 @@ Result<std::shared_ptr<UpdateSortOrder>> Table::NewUpdateSortOrder() {
return transaction->NewUpdateSortOrder();
}

Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdateSchema();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down Expand Up @@ -221,4 +229,8 @@ Result<std::shared_ptr<UpdateProperties>> StaticTable::NewUpdateProperties() {
return NotSupported("Cannot create an update properties for a static table");
}

Result<std::shared_ptr<UpdateSchema>> StaticTable::NewUpdateSchema() {
return NotSupported("Cannot create an update schema for a static table");
}

} // namespace iceberg
6 changes: 6 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateSortOrder>> NewUpdateSortOrder();

/// \brief Create a new UpdateSchema to alter the columns of this table and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down Expand Up @@ -187,6 +191,8 @@ class ICEBERG_EXPORT StaticTable final : public Table {

Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties() override;

Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema() override;

private:
using Table::Table;
};
Expand Down
15 changes: 15 additions & 0 deletions src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>

#include "iceberg/catalog.h"
#include "iceberg/schema.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
Expand All @@ -30,6 +31,7 @@
#include "iceberg/update/pending_update.h"
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_sort_order.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
Expand Down Expand Up @@ -105,6 +107,12 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AddPartitionSpec(std::move(result.spec));
}
} break;
case PendingUpdate::Kind::kUpdateSchema: {
auto& update_schema = internal::checked_cast<UpdateSchema&>(update);
ICEBERG_ASSIGN_OR_RAISE(auto result, update_schema.Apply());
metadata_builder_->SetCurrentSchema(std::move(result.schema),
result.new_last_column_id);
} break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
Expand Down Expand Up @@ -178,4 +186,11 @@ Result<std::shared_ptr<UpdateSortOrder>> Transaction::NewUpdateSortOrder() {
return update_sort_order;
}

Result<std::shared_ptr<UpdateSchema>> Transaction::NewUpdateSchema() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSchema> update_schema,
UpdateSchema::Make(shared_from_this()));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_schema));
return update_schema;
}

} // namespace iceberg
4 changes: 4 additions & 0 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateSortOrder>> NewUpdateSortOrder();

/// \brief Create a new UpdateSchema to alter the columns of this table and commit the
/// changes.
Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();

private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class Transaction;
class PendingUpdate;
class UpdatePartitionSpec;
class UpdateProperties;
class UpdateSchema;
class UpdateSortOrder;

/// ----------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ install_headers(
[
'pending_update.h',
'update_partition_spec.h',
'update_schema.h',
'update_sort_order.h',
'update_properties.h',
],
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/pending_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
enum class Kind : uint8_t {
kUpdatePartitionSpec,
kUpdateProperties,
kUpdateSchema,
kUpdateSortOrder,
};

Expand Down
235 changes: 235 additions & 0 deletions src/iceberg/update/update_schema.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/update_schema.h"

#include <memory>
#include <optional>
#include <ranges>
#include <string>
#include <string_view>
#include <unordered_set>
#include <utility>
#include <vector>

#include "iceberg/schema.h"
#include "iceberg/table_metadata.h"
#include "iceberg/transaction.h"
#include "iceberg/type.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/macros.h"

namespace iceberg {

Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
std::shared_ptr<Transaction> transaction) {
ICEBERG_PRECHECK(transaction != nullptr,
"Cannot create UpdateSchema without transaction");
return std::shared_ptr<UpdateSchema>(new UpdateSchema(std::move(transaction)));
}

UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> transaction)
: PendingUpdate(std::move(transaction)) {
const TableMetadata& base_metadata = transaction_->current();

// Get the current schema
auto schema_result = base_metadata.Schema();
if (!schema_result.has_value()) {
AddError(schema_result.error());
return;
}
schema_ = std::move(schema_result.value());

// Initialize last_column_id from base metadata
last_column_id_ = base_metadata.last_column_id;

// Initialize identifier field names from the current schema
auto identifier_names_result = schema_->IdentifierFieldNames();
if (!identifier_names_result.has_value()) {
AddError(identifier_names_result.error());
return;
}
identifier_field_names_ = identifier_names_result.value() |
std::ranges::to<std::unordered_set<std::string>>();
}

UpdateSchema::~UpdateSchema() = default;

UpdateSchema& UpdateSchema::AllowIncompatibleChanges() {
allow_incompatible_changes_ = true;
return *this;
}

UpdateSchema& UpdateSchema::CaseSensitive(bool case_sensitive) {
case_sensitive_ = case_sensitive;
return *this;
}

UpdateSchema& UpdateSchema::AddColumn(std::string_view name, std::shared_ptr<Type> type) {
return AddColumn(name, std::move(type), "");
}

UpdateSchema& UpdateSchema::AddColumn(std::string_view name, std::shared_ptr<Type> type,
std::string_view doc) {
// Check for "." in top-level name
ICEBERG_BUILDER_CHECK(!name.contains('.'),
"Cannot add column with ambiguous name: {}, use "
"AddColumn(parent, name, type, doc)",
name);
return AddColumnInternal(std::nullopt, name, /*is_optional=*/true, std::move(type),
doc);
}

UpdateSchema& UpdateSchema::AddColumn(std::optional<std::string> parent,
std::string_view name, std::shared_ptr<Type> type) {
return AddColumnInternal(std::move(parent), name, /*is_optional=*/true, std::move(type),
"");
}

UpdateSchema& UpdateSchema::AddColumn(std::optional<std::string> parent,
std::string_view name, std::shared_ptr<Type> type,
std::string_view doc) {
return AddColumnInternal(std::move(parent), name, /*is_optional=*/true, std::move(type),
doc);
}

UpdateSchema& UpdateSchema::AddRequiredColumn(std::string_view name,
std::shared_ptr<Type> type) {
return AddRequiredColumn(name, std::move(type), "");
}

UpdateSchema& UpdateSchema::AddRequiredColumn(std::string_view name,
std::shared_ptr<Type> type,
std::string_view doc) {
// Check for "." in top-level name
ICEBERG_BUILDER_CHECK(!name.contains('.'),
"Cannot add column with ambiguous name: {}, use "
"AddRequiredColumn(parent, name, type, doc)",
name);
return AddColumnInternal(std::nullopt, name, /*is_optional=*/false, std::move(type),
doc);
}

UpdateSchema& UpdateSchema::AddRequiredColumn(std::optional<std::string> parent,
std::string_view name,
std::shared_ptr<Type> type) {
return AddColumnInternal(std::move(parent), name, /*is_optional=*/false,
std::move(type), "");
}

UpdateSchema& UpdateSchema::AddRequiredColumn(std::optional<std::string> parent,
std::string_view name,
std::shared_ptr<Type> type,
std::string_view doc) {
return AddColumnInternal(std::move(parent), name, /*is_optional=*/false,
std::move(type), doc);
}

UpdateSchema& UpdateSchema::UpdateColumn(std::string_view name,
std::shared_ptr<PrimitiveType> new_type) {
// TODO(Guotao Yu): Implement UpdateColumn
AddError(NotImplemented("UpdateSchema::UpdateColumn not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::UpdateColumn(std::string_view name,
std::shared_ptr<PrimitiveType> new_type,
std::string_view new_doc) {
return UpdateColumn(name, std::move(new_type)).UpdateColumnDoc(name, new_doc);
}

UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string> parent,
std::string_view name, bool is_optional,
std::shared_ptr<Type> type,
std::string_view doc) {
// TODO(Guotao Yu): Implement AddColumnInternal logic
// This is where the real work happens - finding parent, validating, etc.
AddError(NotImplemented("UpdateSchema::AddColumnInternal not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::RenameColumn(std::string_view name,
std::string_view new_name) {
// TODO(Guotao Yu): Implement RenameColumn
AddError(NotImplemented("UpdateSchema::RenameColumn not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::UpdateColumnDoc(std::string_view name,
std::string_view new_doc) {
// TODO(Guotao Yu): Implement UpdateColumnDoc
AddError(NotImplemented("UpdateSchema::UpdateColumnDoc not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::MakeColumnOptional(std::string_view name) {
// TODO(Guotao Yu): Implement MakeColumnOptional
AddError(NotImplemented("UpdateSchema::MakeColumnOptional not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::RequireColumn(std::string_view name) {
// TODO(Guotao Yu): Implement RequireColumn
AddError(NotImplemented("UpdateSchema::RequireColumn not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::DeleteColumn(std::string_view name) {
// TODO(Guotao Yu): Implement DeleteColumn
AddError(NotImplemented("UpdateSchema::DeleteColumn not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::MoveFirst(std::string_view name) {
// TODO(Guotao Yu): Implement MoveFirst
AddError(NotImplemented("UpdateSchema::MoveFirst not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::MoveBefore(std::string_view name,
std::string_view before_name) {
// TODO(Guotao Yu): Implement MoveBefore
AddError(NotImplemented("UpdateSchema::MoveBefore not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::MoveAfter(std::string_view name,
std::string_view after_name) {
// TODO(Guotao Yu): Implement MoveAfter
AddError(NotImplemented("UpdateSchema::MoveAfter not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema) {
// TODO(Guotao Yu): Implement UnionByNameWith
AddError(NotImplemented("UpdateSchema::UnionByNameWith not implemented"));
return *this;
}

UpdateSchema& UpdateSchema::SetIdentifierFields(const std::vector<std::string>& names) {
identifier_field_names_ = names | std::ranges::to<std::unordered_set<std::string>>();
return *this;
}

Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
// TODO(Guotao Yu): Implement Apply
return NotImplemented("UpdateSchema::Apply not implemented");
}

} // namespace iceberg
Loading
Loading