Skip to content

Commit 805973f

Browse files
committed
feat: InMemoryCatalog create table
1 parent 25daf33 commit 805973f

File tree

15 files changed

+358
-37
lines changed

15 files changed

+358
-37
lines changed

src/iceberg/catalog.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,15 @@ class ICEBERG_EXPORT Catalog {
105105
/// \brief Create a table
106106
///
107107
/// \param identifier a table identifier
108+
/// \param location a location for the table; leave empty if unspecified
108109
/// \param schema a schema
109110
/// \param spec a partition spec
110-
/// \param location a location for the table; leave empty if unspecified
111+
/// \param sort_order a sort order
111112
/// \param properties a string map of table properties
112113
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
113114
virtual Result<std::unique_ptr<Table>> CreateTable(
114-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
115-
const std::string& location,
115+
const TableIdentifier& identifier, const std::string& location,
116+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
116117
const std::unordered_map<std::string, std::string>& properties) = 0;
117118

118119
/// \brief Update a table
@@ -129,15 +130,16 @@ class ICEBERG_EXPORT Catalog {
129130
/// \brief Start a transaction to create a table
130131
///
131132
/// \param identifier a table identifier
133+
/// \param location a location for the table; leave empty if unspecified
132134
/// \param schema a schema
133135
/// \param spec a partition spec
134-
/// \param location a location for the table; leave empty if unspecified
136+
/// \param sort_order a sort order
135137
/// \param properties a string map of table properties
136138
/// \return a Transaction to create the table or ErrorKind::kAlreadyExists if the
137139
/// table already exists
138140
virtual Result<std::shared_ptr<Transaction>> StageCreateTable(
139-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
140-
const std::string& location,
141+
const TableIdentifier& identifier, const std::string& location,
142+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
141143
const std::unordered_map<std::string, std::string>& properties) = 0;
142144

143145
/// \brief Check whether table exists

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <algorithm>
2323
#include <iterator>
2424

25+
#include "iceberg/sort_order.h"
2526
#include "iceberg/table.h"
2627
#include "iceberg/table_identifier.h"
2728
#include "iceberg/table_metadata.h"
@@ -318,7 +319,7 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
318319
ICEBERG_RETURN_UNEXPECTED(ns);
319320
const auto it = ns.value()->table_metadata_locations_.find(table_ident.name);
320321
if (it == ns.value()->table_metadata_locations_.end()) {
321-
return NotFound("{} does not exist", table_ident.name);
322+
return NotFound("Table does not exist: {}", table_ident);
322323
}
323324
return it->second;
324325
}
@@ -400,11 +401,25 @@ Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
400401
}
401402

402403
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
403-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
404-
const std::string& location,
404+
const TableIdentifier& identifier, const std::string& location, const Schema& schema,
405+
const PartitionSpec& spec, const SortOrder& sort_order,
405406
const std::unordered_map<std::string, std::string>& properties) {
406407
std::unique_lock lock(mutex_);
407-
return NotImplemented("create table");
408+
if (root_namespace_->TableExists(identifier).value_or(false)) {
409+
return AlreadyExists("Table already exists: {}", identifier);
410+
}
411+
412+
std::string base_location =
413+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
414+
ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadata::Make(base_location, schema, spec,
415+
sort_order, properties));
416+
ICEBERG_ASSIGN_OR_RAISE(auto metadata_file_location,
417+
TableMetadataUtil::Write(*file_io_, nullptr, "", *metadata));
418+
ICEBERG_RETURN_UNEXPECTED(
419+
root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location));
420+
return std::make_unique<Table>(identifier, std::move(metadata),
421+
std::move(metadata_file_location), file_io_,
422+
std::static_pointer_cast<Catalog>(shared_from_this()));
408423
}
409424

410425
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
@@ -440,11 +455,20 @@ Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
440455
}
441456

442457
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
443-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
444-
const std::string& location,
458+
const TableIdentifier& identifier, const std::string& location, const Schema& schema,
459+
const PartitionSpec& spec, const SortOrder& sort_order,
445460
const std::unordered_map<std::string, std::string>& properties) {
446461
std::unique_lock lock(mutex_);
447-
return NotImplemented("stage create table");
462+
if (root_namespace_->TableExists(identifier).value_or(false)) {
463+
return AlreadyExists("Table already exists: {}", identifier);
464+
}
465+
466+
std::string base_location =
467+
location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location;
468+
ICEBERG_ASSIGN_OR_RAISE(auto metadata, TableMetadata::Make(base_location, schema, spec,
469+
sort_order, properties));
470+
// TODO(zhuo.wang) Init transaction with metadata
471+
return nullptr;
448472
}
449473

450474
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
@@ -495,7 +519,7 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
495519

496520
std::unique_lock lock(mutex_);
497521
if (!root_namespace_->NamespaceExists(identifier.ns)) {
498-
return NoSuchNamespace("table namespace does not exist.");
522+
return NoSuchNamespace("Table namespace does not exist: {}", identifier.ns);
499523
}
500524
if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) {
501525
return UnknownError("The registry failed.");

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class ICEBERG_EXPORT InMemoryCatalog
7171
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7272

7373
Result<std::unique_ptr<Table>> CreateTable(
74-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
75-
const std::string& location,
74+
const TableIdentifier& identifier, const std::string& location,
75+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
7676
const std::unordered_map<std::string, std::string>& properties) override;
7777

7878
Result<std::unique_ptr<Table>> UpdateTable(
@@ -81,8 +81,8 @@ class ICEBERG_EXPORT InMemoryCatalog
8181
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
8282

8383
Result<std::shared_ptr<Transaction>> StageCreateTable(
84-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
85-
const std::string& location,
84+
const TableIdentifier& identifier, const std::string& location,
85+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
8686
const std::unordered_map<std::string, std::string>& properties) override;
8787

8888
Result<bool> TableExists(const TableIdentifier& identifier) const override;

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,9 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
242242

243243
Result<std::unique_ptr<Table>> RestCatalog::CreateTable(
244244
[[maybe_unused]] const TableIdentifier& identifier,
245-
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
246-
[[maybe_unused]] const std::string& location,
245+
[[maybe_unused]] const std::string& location, [[maybe_unused]] const Schema& schema,
246+
[[maybe_unused]] const PartitionSpec& spec,
247+
[[maybe_unused]] const SortOrder& sort_order,
247248
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
248249
return NotImplemented("Not implemented");
249250
}
@@ -257,8 +258,9 @@ Result<std::unique_ptr<Table>> RestCatalog::UpdateTable(
257258

258259
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
259260
[[maybe_unused]] const TableIdentifier& identifier,
260-
[[maybe_unused]] const Schema& schema, [[maybe_unused]] const PartitionSpec& spec,
261-
[[maybe_unused]] const std::string& location,
261+
[[maybe_unused]] const std::string& location, [[maybe_unused]] const Schema& schema,
262+
[[maybe_unused]] const PartitionSpec& spec,
263+
[[maybe_unused]] const SortOrder& sort_order,
262264
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
263265
return NotImplemented("Not implemented");
264266
}

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#pragma once
2121

2222
#include <memory>
23-
#include <set>
2423
#include <string>
2524

2625
#include "iceberg/catalog.h"
@@ -72,8 +71,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
7271
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const override;
7372

7473
Result<std::unique_ptr<Table>> CreateTable(
75-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
76-
const std::string& location,
74+
const TableIdentifier& identifier, const std::string& location,
75+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
7776
const std::unordered_map<std::string, std::string>& properties) override;
7877

7978
Result<std::unique_ptr<Table>> UpdateTable(
@@ -82,8 +81,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog {
8281
const std::vector<std::unique_ptr<TableUpdate>>& updates) override;
8382

8483
Result<std::shared_ptr<Transaction>> StageCreateTable(
85-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
86-
const std::string& location,
84+
const TableIdentifier& identifier, const std::string& location,
85+
const Schema& schema, const PartitionSpec& spec, const SortOrder& sort_order,
8786
const std::unordered_map<std::string, std::string>& properties) override;
8887

8988
Result<bool> TableExists(const TableIdentifier& identifier) const override;

src/iceberg/schema.cc

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,11 @@
3333

3434
namespace iceberg {
3535

36-
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id)
37-
: StructType(std::move(fields)), schema_id_(schema_id) {}
36+
Schema::Schema(std::vector<SchemaField> fields, std::optional<int32_t> schema_id,
37+
std::vector<int32_t> identifier_field_ids)
38+
: StructType(std::move(fields)),
39+
schema_id_(schema_id),
40+
identifier_field_ids_(std::move(identifier_field_ids)) {}
3841

3942
std::optional<int32_t> Schema::schema_id() const { return schema_id_; }
4043

@@ -179,4 +182,20 @@ Result<std::unique_ptr<Schema>> Schema::Project(
179182
std::nullopt);
180183
}
181184

185+
std::vector<int32_t> Schema::IdentifierFieldIds() const { return identifier_field_ids_; }
186+
187+
Result<std::vector<std::string>> Schema::IdentifierFieldNames() const {
188+
using std::ranges::to;
189+
using std::views::transform;
190+
std::vector<std::string> names;
191+
for (auto id : identifier_field_ids_) {
192+
ICEBERG_ASSIGN_OR_RAISE(auto field, FindFieldById(id));
193+
if (!field.has_value()) {
194+
return InvalidSchema("Can not find field to the specified id: {}", id);
195+
}
196+
names.emplace_back(field.value().get().name());
197+
}
198+
return names;
199+
}
200+
182201
} // namespace iceberg

src/iceberg/schema.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class ICEBERG_EXPORT Schema : public StructType {
4949
static constexpr int32_t kInvalidColumnId = -1;
5050

5151
explicit Schema(std::vector<SchemaField> fields,
52-
std::optional<int32_t> schema_id = std::nullopt);
52+
std::optional<int32_t> schema_id = std::nullopt,
53+
std::vector<int32_t> identifier_field_ids = {});
5354

5455
/// \brief Get the schema ID.
5556
///
@@ -100,6 +101,9 @@ class ICEBERG_EXPORT Schema : public StructType {
100101
Result<std::unique_ptr<Schema>> Project(
101102
const std::unordered_set<int32_t>& field_ids) const;
102103

104+
std::vector<int32_t> IdentifierFieldIds() const;
105+
Result<std::vector<std::string>> IdentifierFieldNames() const;
106+
103107
friend bool operator==(const Schema& lhs, const Schema& rhs) { return lhs.Equals(rhs); }
104108

105109
private:
@@ -116,6 +120,8 @@ class ICEBERG_EXPORT Schema : public StructType {
116120
const Schema&);
117121

118122
const std::optional<int32_t> schema_id_;
123+
std::vector<int32_t> identifier_field_ids_;
124+
119125
/// Mapping from field id to field.
120126
Lazy<InitIdToFieldMap> id_to_field_;
121127
/// Mapping from field name to field id.

src/iceberg/table_identifier.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
/// \file iceberg/table_identifier.h
2323
/// A TableIdentifier is a unique identifier for a table
2424

25+
#include <format>
26+
#include <sstream>
2527
#include <string>
2628
#include <vector>
2729

@@ -35,6 +37,15 @@ struct ICEBERG_EXPORT Namespace {
3537
std::vector<std::string> levels;
3638

3739
bool operator==(const Namespace& other) const { return levels == other.levels; }
40+
41+
std::string ToString() const {
42+
std::ostringstream oss;
43+
for (size_t i = 0; i < levels.size(); ++i) {
44+
if (i) oss << '.';
45+
oss << levels[i];
46+
}
47+
return oss.str();
48+
}
3849
};
3950

4051
/// \brief Identifies a table in iceberg catalog.
@@ -53,6 +64,27 @@ struct ICEBERG_EXPORT TableIdentifier {
5364
}
5465
return {};
5566
}
67+
68+
std::string ToString() const { return ns.ToString() + '.' + name; }
5669
};
5770

5871
} // namespace iceberg
72+
73+
namespace std {
74+
75+
template <>
76+
struct formatter<iceberg::Namespace> : std::formatter<std::string> {
77+
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
78+
auto format(const iceberg::Namespace& ns, format_context& ctx) const {
79+
return std::formatter<std::string>::format(ns.ToString(), ctx);
80+
}
81+
};
82+
83+
template <>
84+
struct formatter<iceberg::TableIdentifier> : std::formatter<std::string> {
85+
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
86+
auto format(const iceberg::TableIdentifier& id, format_context& ctx) const {
87+
return std::formatter<std::string>::format(id.ToString(), ctx);
88+
}
89+
};
90+
} // namespace std

0 commit comments

Comments
 (0)