Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ set(ICEBERG_SOURCES
sort_order.cc
statistics_file.cc
table.cc
table_identifier.cc
table_metadata.cc
table_properties.cc
table_requirement.cc
Expand All @@ -84,6 +85,7 @@ set(ICEBERG_SOURCES
util/decimal.cc
util/gzip_internal.cc
util/murmurhash3_internal.cc
util/property_util.cc
util/snapshot_util.cc
util/temporal_util.cc
util/timepoint.cc
Expand Down
73 changes: 62 additions & 11 deletions src/iceberg/catalog/memory/in_memory_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
#include "iceberg/transaction.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -318,7 +320,7 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
ICEBERG_RETURN_UNEXPECTED(ns);
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
if (it == ns.value()->table_metadata_locations_.end()) {
return NotFound("{} does not exist", table_ident.name);
return NotFound("Table does not exist: {}", table_ident);
}
return it->second;
}
Expand Down Expand Up @@ -405,32 +407,68 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::CreateTable(
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
return NotImplemented("create table");
if (root_namespace_->TableExists(identifier).value_or(false)) {
return AlreadyExists("Table already exists: {}", identifier);
}

std::string base_location =
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;

ICEBERG_ASSIGN_OR_RAISE(auto table_metadata, TableMetadata::Make(*schema, *spec, *order,
location, properties));

ICEBERG_ASSIGN_OR_RAISE(
auto metadata_file_location,
TableMetadataUtil::Write(*file_io_, nullptr, "", *table_metadata));
ICEBERG_RETURN_UNEXPECTED(
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
return Table::Make(identifier, std::move(table_metadata),
std::move(metadata_file_location), file_io_,
std::static_pointer_cast<Catalog>(shared_from_this()));
Comment thread
wgtmac marked this conversation as resolved.
Outdated
}

Result<std::shared_ptr<Table>> InMemoryCatalog::UpdateTable(
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
std::unique_lock lock(mutex_);
ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location,
root_namespace_->GetTableMetadataLocation(identifier));

ICEBERG_ASSIGN_OR_RAISE(auto base,
TableMetadataUtil::Read(*file_io_, base_metadata_location));
auto base_metadata_location = root_namespace_->GetTableMetadataLocation(identifier);
std::unique_ptr<TableMetadata> base;
std::unique_ptr<TableMetadataBuilder> builder;
ICEBERG_ASSIGN_OR_RAISE(auto is_create, TableRequirements::IsCreate(requirements));
if (is_create) {
if (base_metadata_location.has_value()) {
return AlreadyExists("Table already exists: {}", identifier);
}
int8_t format_version = TableMetadata::kDefaultTableFormatVersion;
for (const auto& update : updates) {
if (update->kind() == TableUpdate::Kind::kUpgradeFormatVersion) {
format_version =
dynamic_cast<const table::UpgradeFormatVersion&>(*update).format_version();
Comment thread
wgtmac marked this conversation as resolved.
Outdated
}
}
builder = TableMetadataBuilder::BuildFromEmpty(format_version);
} else {
ICEBERG_RETURN_UNEXPECTED(base_metadata_location);
ICEBERG_ASSIGN_OR_RAISE(
base, TableMetadataUtil::Read(*file_io_, base_metadata_location.value()));
builder = TableMetadataBuilder::BuildFrom(base.get());
}

for (const auto& requirement : requirements) {
ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get()));
}

auto builder = TableMetadataBuilder::BuildFrom(base.get());
for (const auto& update : updates) {
update->ApplyTo(*builder);
}
ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build());
ICEBERG_ASSIGN_OR_RAISE(
auto new_metadata_location,
TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location, *updated));
TableMetadataUtil::Write(
*file_io_, base.get(),
base_metadata_location.has_value() ? base_metadata_location.value() : "",
*updated));
ICEBERG_RETURN_UNEXPECTED(
root_namespace_->UpdateTableMetadataLocation(identifier, new_metadata_location));
TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated);
Expand All @@ -445,7 +483,20 @@ Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
const std::string& location,
const std::unordered_map<std::string, std::string>& properties) {
std::unique_lock lock(mutex_);
return NotImplemented("stage create table");
if (root_namespace_->TableExists(identifier).value_or(false)) {
return AlreadyExists("Table already exists: {}", identifier);
}

std::string base_location =
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;

ICEBERG_ASSIGN_OR_RAISE(
auto table_metadata,
TableMetadata::Make(*schema, *spec, *order, base_location, properties));
ICEBERG_ASSIGN_OR_RAISE(
auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_,
shared_from_this()));
return Transaction::Make(std::move(table), Transaction::Kind::kCreate, false);
Comment thread
wgtmac marked this conversation as resolved.
Outdated
}

Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
Expand Down Expand Up @@ -495,7 +546,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(

std::unique_lock lock(mutex_);
if (!root_namespace_->NamespaceExists(identifier.ns)) {
return NoSuchNamespace("table namespace does not exist.");
return NoSuchNamespace("Table namespace does not exist: {}", identifier.ns);
}
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
return UnknownError("The registry failed.");
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ iceberg_sources = files(
'sort_order.cc',
'statistics_file.cc',
'table.cc',
'table_identifier.cc',
'table_metadata.cc',
'table_properties.cc',
'table_requirement.cc',
Expand All @@ -106,6 +107,7 @@ iceberg_sources = files(
'util/decimal.cc',
'util/gzip_internal.cc',
'util/murmurhash3_internal.cc',
'util/property_util.cc',
'util/snapshot_util.cc',
'util/temporal_util.cc',
'util/timepoint.cc',
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
for (auto id : identifier_field_ids_) {
ICEBERG_ASSIGN_OR_RAISE(auto name, FindColumnNameById(id));
if (!name.has_value()) {
return InvalidSchema("Cannot find the field of the specified field id: {}", id);
return InvalidSchema("Cannot find identifier field id: {}", id);
}
names.emplace_back(name.value());
}
Expand Down
32 changes: 32 additions & 0 deletions src/iceberg/table_identifier.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/table_identifier.h"

#include "iceberg/util/formatter_internal.h"

namespace iceberg {

std::string Namespace::ToString() const { return FormatRange(levels, ".", "", ""); }

std::string TableIdentifier::ToString() const {
return std::format("{}.{}", ns.ToString(), name);
Comment thread
wgtmac marked this conversation as resolved.
Outdated
}

} // namespace iceberg
11 changes: 11 additions & 0 deletions src/iceberg/table_identifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep

namespace iceberg {

Expand All @@ -35,8 +36,12 @@ struct ICEBERG_EXPORT Namespace {
std::vector<std::string> levels;

bool operator==(const Namespace& other) const { return levels == other.levels; }

std::string ToString() const;
};

ICEBERG_EXPORT inline std::string ToString(const Namespace& ns) { return ns.ToString(); }

/// \brief Identifies a table in iceberg catalog.
struct ICEBERG_EXPORT TableIdentifier {
Namespace ns;
Expand All @@ -53,6 +58,12 @@ struct ICEBERG_EXPORT TableIdentifier {
}
return {};
}

std::string ToString() const;
};

ICEBERG_EXPORT inline std::string ToString(const TableIdentifier& ident) {
return ident.ToString();
}

} // namespace iceberg
Loading
Loading