Skip to content

Commit 4f2fd1c

Browse files
committed
feat(rest):implement stage create table
1 parent d328d0a commit 4f2fd1c

File tree

4 files changed

+85
-17
lines changed

4 files changed

+85
-17
lines changed

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "iceberg/schema.h"
4343
#include "iceberg/sort_order.h"
4444
#include "iceberg/table.h"
45+
#include "iceberg/transaction.h"
4546
#include "iceberg/util/macros.h"
4647

4748
namespace iceberg::rest {
@@ -249,11 +250,11 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
249250
return NotImplemented("Not implemented");
250251
}
251252

252-
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
253+
Result<LoadTableResult> RestCatalog::CreateTableInternal(
253254
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
254255
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
255256
const std::string& location,
256-
const std::unordered_map<std::string, std::string>& properties) {
257+
const std::unordered_map<std::string, std::string>& properties, bool stage_create) {
257258
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
258259
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
259260

@@ -263,7 +264,7 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
263264
.schema = schema,
264265
.partition_spec = spec,
265266
.write_order = order,
266-
.stage_create = false,
267+
.stage_create = stage_create,
267268
.properties = properties,
268269
};
269270

@@ -273,10 +274,19 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
273274
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
274275

275276
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
276-
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
277-
return Table::Make(identifier, load_result.metadata,
278-
std::move(load_result.metadata_location), file_io_,
279-
shared_from_this());
277+
return LoadTableResultFromJson(json);
278+
}
279+
280+
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
281+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
282+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
283+
const std::string& location,
284+
const std::unordered_map<std::string, std::string>& properties) {
285+
ICEBERG_ASSIGN_OR_RAISE(
286+
auto result,
287+
CreateTableInternal(identifier, schema, spec, order, location, properties, false));
288+
return Table::Make(identifier, result.metadata, std::move(result.metadata_location),
289+
file_io_, shared_from_this());
280290
}
281291

282292
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -287,13 +297,19 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
287297
}
288298

289299
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
290-
[[maybe_unused]] const TableIdentifier& identifier,
291-
[[maybe_unused]] const std::shared_ptr<Schema>& schema,
292-
[[maybe_unused]] const std::shared_ptr<PartitionSpec>& spec,
293-
[[maybe_unused]] const std::shared_ptr<SortOrder>& order,
294-
[[maybe_unused]] const std::string& location,
295-
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
296-
return NotImplemented("Not implemented");
300+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
301+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
302+
const std::string& location,
303+
const std::unordered_map<std::string, std::string>& properties) {
304+
ICEBERG_ASSIGN_OR_RAISE(
305+
auto result,
306+
CreateTableInternal(identifier, schema, spec, order, location, properties, true));
307+
ICEBERG_ASSIGN_OR_RAISE(
308+
auto staged_table,
309+
StagedTable::Make(identifier, result.metadata, std::move(result.metadata_location),
310+
file_io_, shared_from_this()));
311+
return Transaction::Make(staged_table, Transaction::Kind::kCreate,
312+
/*auto_commit=*/false);
297313
}
298314

299315
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
@@ -337,9 +353,6 @@ Result<std::string> RestCatalog::LoadTableInternal(
337353
}
338354

339355
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
340-
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
341-
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
342-
343356
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
344357
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
345358
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
110110

111111
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
112112

113+
Result<LoadTableResult> CreateTableInternal(
114+
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
115+
const std::shared_ptr<PartitionSpec>& spec, const std::shared_ptr<SortOrder>& order,
116+
const std::string& location,
117+
const std::unordered_map<std::string, std::string>& properties, bool stage_create);
118+
113119
std::unique_ptr<RestCatalogProperties> config_;
114120
std::shared_ptr<FileIO> file_io_;
115121
std::unique_ptr<HttpClient> client_;

src/iceberg/catalog/rest/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
namespace iceberg::rest {
2626

2727
struct ErrorResponse;
28+
struct LoadTableResult;
2829

2930
class Endpoint;
3031
class ErrorHandler;

src/iceberg/test/rest_catalog_test.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include "iceberg/test/std_io.h"
5050
#include "iceberg/test/test_resource.h"
5151
#include "iceberg/test/util/docker_compose_util.h"
52+
#include "iceberg/transaction.h"
5253

5354
namespace iceberg::rest {
5455

@@ -484,4 +485,51 @@ TEST_F(RestCatalogIntegrationTest, DropTable) {
484485
EXPECT_FALSE(load_result.value());
485486
}
486487

488+
TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
489+
auto catalog_result = CreateCatalog();
490+
ASSERT_THAT(catalog_result, IsOk());
491+
auto& catalog = catalog_result.value();
492+
493+
// Create namespace
494+
Namespace ns{.levels = {"test_stage_create"}};
495+
auto status = catalog->CreateNamespace(ns, {});
496+
EXPECT_THAT(status, IsOk());
497+
498+
// Stage create table
499+
auto schema = CreateDefaultSchema();
500+
auto partition_spec = PartitionSpec::Unpartitioned();
501+
auto sort_order = SortOrder::Unsorted();
502+
503+
TableIdentifier table_id{.ns = ns, .name = "staged_table"};
504+
std::unordered_map<std::string, std::string> table_properties{{"key1", "value1"}};
505+
auto txn_result = catalog->StageCreateTable(table_id, schema, partition_spec,
506+
sort_order, "", table_properties);
507+
ASSERT_THAT(txn_result, IsOk());
508+
auto& txn = txn_result.value();
509+
510+
// Verify the staged table in transaction
511+
EXPECT_NE(txn->table(), nullptr);
512+
EXPECT_EQ(txn->table()->name(), table_id);
513+
514+
// Table should NOT exist in catalog yet (staged but not committed)
515+
auto exists_result = catalog->TableExists(table_id);
516+
ASSERT_THAT(exists_result, IsOk());
517+
EXPECT_FALSE(exists_result.value());
518+
519+
// Commit the transaction
520+
auto commit_result = txn->Commit();
521+
ASSERT_THAT(commit_result, IsOk());
522+
auto& committed_table = commit_result.value();
523+
524+
// Verify table now exists
525+
exists_result = catalog->TableExists(table_id);
526+
ASSERT_THAT(exists_result, IsOk());
527+
EXPECT_TRUE(exists_result.value());
528+
529+
// Verify table properties
530+
EXPECT_EQ(committed_table->name(), table_id);
531+
auto& props = committed_table->metadata()->properties.configs();
532+
EXPECT_EQ(props.at("key1"), "value1");
533+
}
534+
487535
} // namespace iceberg::rest

0 commit comments

Comments
 (0)