Skip to content

Commit d4a21b5

Browse files
author
shuxu.li
committed
feat: RegisterTable support for InMemoryCatalog
1 parent be514fc commit d4a21b5

5 files changed

Lines changed: 136 additions & 231 deletions

File tree

src/iceberg/catalog/in_memory_catalog.cc

Lines changed: 42 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,14 @@
2121

2222
#include <algorithm>
2323
#include <iterator> // IWYU pragma: keep
24-
#include <mutex>
25-
#include <unordered_map>
2624

2725
#include "iceberg/exception.h"
2826
#include "iceberg/table.h"
27+
#include "iceberg/table_metadata.h"
2928
#include "iceberg/util/macros.h"
3029

3130
namespace iceberg {
3231

33-
namespace {
34-
3532
/// \brief A hierarchical namespace that manages namespaces and table metadata in-memory.
3633
///
3734
/// Each InMemoryNamespace represents a namespace level and can contain properties,
@@ -317,117 +314,56 @@ Result<std::string> InMemoryNamespace::GetTableMetadataLocation(
317314
return it->second;
318315
}
319316

320-
} // namespace
321-
322-
class ICEBERG_EXPORT InMemoryCatalogImpl {
323-
public:
324-
InMemoryCatalogImpl(std::string name, std::shared_ptr<FileIO> file_io,
325-
std::string warehouse_location,
326-
std::unordered_map<std::string, std::string> properties);
327-
328-
std::string_view name() const;
329-
330-
Status CreateNamespace(const Namespace& ns,
331-
const std::unordered_map<std::string, std::string>& properties);
332-
333-
Result<std::vector<Namespace>> ListNamespaces(const Namespace& ns) const;
334-
335-
Status DropNamespace(const Namespace& ns);
336-
337-
Result<bool> NamespaceExists(const Namespace& ns) const;
338-
339-
Result<std::unordered_map<std::string, std::string>> GetNamespaceProperties(
340-
const Namespace& ns) const;
341-
342-
Status UpdateNamespaceProperties(
343-
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
344-
const std::unordered_set<std::string>& removals);
345-
346-
Result<std::vector<TableIdentifier>> ListTables(const Namespace& ns) const;
347-
348-
Result<std::unique_ptr<Table>> CreateTable(
349-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
350-
const std::string& location,
351-
const std::unordered_map<std::string, std::string>& properties);
352-
353-
Result<std::unique_ptr<Table>> UpdateTable(
354-
const TableIdentifier& identifier,
355-
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
356-
const std::vector<std::unique_ptr<MetadataUpdate>>& updates);
357-
358-
Result<std::shared_ptr<Transaction>> StageCreateTable(
359-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
360-
const std::string& location,
361-
const std::unordered_map<std::string, std::string>& properties);
362-
363-
Result<bool> TableExists(const TableIdentifier& identifier) const;
364-
365-
Status DropTable(const TableIdentifier& identifier, bool purge);
366-
367-
Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) const;
368-
369-
Result<std::shared_ptr<Table>> RegisterTable(const TableIdentifier& identifier,
370-
const std::string& metadata_file_location);
371-
372-
std::unique_ptr<TableBuilder> BuildTable(const TableIdentifier& identifier,
373-
const Schema& schema) const;
374-
375-
private:
376-
std::string catalog_name_;
377-
std::unordered_map<std::string, std::string> properties_;
378-
std::shared_ptr<FileIO> file_io_;
379-
std::string warehouse_location_;
380-
std::unique_ptr<class InMemoryNamespace> root_namespace_;
381-
mutable std::recursive_mutex mutex_;
382-
};
383-
384-
InMemoryCatalogImpl::InMemoryCatalogImpl(
385-
std::string name, std::shared_ptr<FileIO> file_io, std::string warehouse_location,
386-
std::unordered_map<std::string, std::string> properties)
317+
InMemoryCatalog::InMemoryCatalog(
318+
std::string const& name, std::shared_ptr<FileIO> const& file_io,
319+
std::string const& warehouse_location,
320+
std::unordered_map<std::string, std::string> const& properties)
387321
: catalog_name_(std::move(name)),
388322
properties_(std::move(properties)),
389323
file_io_(std::move(file_io)),
390324
warehouse_location_(std::move(warehouse_location)),
391325
root_namespace_(std::make_unique<InMemoryNamespace>()) {}
392326

393-
std::string_view InMemoryCatalogImpl::name() const { return catalog_name_; }
327+
InMemoryCatalog::~InMemoryCatalog() = default;
328+
329+
std::string_view InMemoryCatalog::name() const { return catalog_name_; }
394330

395-
Status InMemoryCatalogImpl::CreateNamespace(
331+
Status InMemoryCatalog::CreateNamespace(
396332
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
397333
std::unique_lock lock(mutex_);
398334
return root_namespace_->CreateNamespace(ns, properties);
399335
}
400336

401-
Result<std::vector<Namespace>> InMemoryCatalogImpl::ListNamespaces(
337+
Result<std::unordered_map<std::string, std::string>>
338+
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
339+
std::unique_lock lock(mutex_);
340+
return root_namespace_->GetProperties(ns);
341+
}
342+
343+
Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
402344
const Namespace& ns) const {
403345
std::unique_lock lock(mutex_);
404346
return root_namespace_->ListNamespaces(ns);
405347
}
406348

407-
Status InMemoryCatalogImpl::DropNamespace(const Namespace& ns) {
349+
Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
408350
std::unique_lock lock(mutex_);
409351
return root_namespace_->DropNamespace(ns);
410352
}
411353

412-
Result<bool> InMemoryCatalogImpl::NamespaceExists(const Namespace& ns) const {
354+
Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
413355
std::unique_lock lock(mutex_);
414356
return root_namespace_->NamespaceExists(ns);
415357
}
416358

417-
Result<std::unordered_map<std::string, std::string>>
418-
InMemoryCatalogImpl::GetNamespaceProperties(const Namespace& ns) const {
419-
std::unique_lock lock(mutex_);
420-
return root_namespace_->GetProperties(ns);
421-
}
422-
423-
Status InMemoryCatalogImpl::UpdateNamespaceProperties(
359+
Status InMemoryCatalog::UpdateNamespaceProperties(
424360
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
425361
const std::unordered_set<std::string>& removals) {
426362
std::unique_lock lock(mutex_);
427363
return root_namespace_->UpdateNamespaceProperties(ns, updates, removals);
428364
}
429365

430-
Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
366+
Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
431367
const Namespace& ns) const {
432368
std::unique_lock lock(mutex_);
433369
const auto& table_names = root_namespace_->ListTables(ns);
@@ -440,44 +376,58 @@ Result<std::vector<TableIdentifier>> InMemoryCatalogImpl::ListTables(
440376
return table_idents;
441377
}
442378

443-
Result<std::unique_ptr<Table>> InMemoryCatalogImpl::CreateTable(
379+
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
444380
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
445381
const std::string& location,
446382
const std::unordered_map<std::string, std::string>& properties) {
447383
return NotImplemented("create table");
448384
}
449385

450-
Result<std::unique_ptr<Table>> InMemoryCatalogImpl::UpdateTable(
386+
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
451387
const TableIdentifier& identifier,
452388
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
453389
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
454390
return NotImplemented("update table");
455391
}
456392

457-
Result<std::shared_ptr<Transaction>> InMemoryCatalogImpl::StageCreateTable(
393+
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
458394
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
459395
const std::string& location,
460396
const std::unordered_map<std::string, std::string>& properties) {
461397
return NotImplemented("stage create table");
462398
}
463399

464-
Result<bool> InMemoryCatalogImpl::TableExists(const TableIdentifier& identifier) const {
400+
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
465401
std::unique_lock lock(mutex_);
466402
return root_namespace_->TableExists(identifier);
467403
}
468404

469-
Status InMemoryCatalogImpl::DropTable(const TableIdentifier& identifier, bool purge) {
405+
Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
470406
std::unique_lock lock(mutex_);
471407
// TODO(Guotao): Delete all metadata files if purge is true.
472408
return root_namespace_->UnregisterTable(identifier);
473409
}
474410

475-
Result<std::shared_ptr<Table>> InMemoryCatalogImpl::LoadTable(
411+
Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
476412
const TableIdentifier& identifier) const {
477-
return NotImplemented("load table");
413+
if (!file_io_) [[unlikely]] {
414+
return NotSupported("file_io is not set for catalog {}", catalog_name_);
415+
}
416+
417+
std::unique_lock lock(mutex_);
418+
auto metadata_location = root_namespace_->GetTableMetadataLocation(identifier);
419+
ICEBERG_RETURN_UNEXPECTED(metadata_location);
420+
421+
auto metadata = TableMetadataUtil::Read(*file_io_, metadata_location.value());
422+
ICEBERG_RETURN_UNEXPECTED(metadata);
423+
424+
return std::make_shared<Table>(
425+
identifier, std::move(metadata.value()), metadata_location.value(), file_io_,
426+
std::static_pointer_cast<Catalog>(
427+
std::const_pointer_cast<InMemoryCatalog>(shared_from_this())));
478428
}
479429

480-
Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
430+
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
481431
const TableIdentifier& identifier, const std::string& metadata_file_location) {
482432
std::unique_lock lock(mutex_);
483433
if (!root_namespace_->NamespaceExists(identifier.ns)) {
@@ -489,95 +439,6 @@ Result<std::shared_ptr<Table>> InMemoryCatalogImpl::RegisterTable(
489439
return LoadTable(identifier);
490440
}
491441

492-
std::unique_ptr<TableBuilder> InMemoryCatalogImpl::BuildTable(
493-
const TableIdentifier& identifier, const Schema& schema) const {
494-
throw IcebergError("not implemented");
495-
}
496-
497-
InMemoryCatalog::InMemoryCatalog(
498-
std::string const& name, std::shared_ptr<FileIO> const& file_io,
499-
std::string const& warehouse_location,
500-
std::unordered_map<std::string, std::string> const& properties)
501-
: impl_(std::make_unique<InMemoryCatalogImpl>(name, file_io, warehouse_location,
502-
properties)) {}
503-
504-
InMemoryCatalog::~InMemoryCatalog() = default;
505-
506-
std::string_view InMemoryCatalog::name() const { return impl_->name(); }
507-
508-
Status InMemoryCatalog::CreateNamespace(
509-
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
510-
return impl_->CreateNamespace(ns, properties);
511-
}
512-
513-
Result<std::unordered_map<std::string, std::string>>
514-
InMemoryCatalog::GetNamespaceProperties(const Namespace& ns) const {
515-
return impl_->GetNamespaceProperties(ns);
516-
}
517-
518-
Result<std::vector<Namespace>> InMemoryCatalog::ListNamespaces(
519-
const Namespace& ns) const {
520-
return impl_->ListNamespaces(ns);
521-
}
522-
523-
Status InMemoryCatalog::DropNamespace(const Namespace& ns) {
524-
return impl_->DropNamespace(ns);
525-
}
526-
527-
Result<bool> InMemoryCatalog::NamespaceExists(const Namespace& ns) const {
528-
return impl_->NamespaceExists(ns);
529-
}
530-
531-
Status InMemoryCatalog::UpdateNamespaceProperties(
532-
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
533-
const std::unordered_set<std::string>& removals) {
534-
return impl_->UpdateNamespaceProperties(ns, updates, removals);
535-
}
536-
537-
Result<std::vector<TableIdentifier>> InMemoryCatalog::ListTables(
538-
const Namespace& ns) const {
539-
return impl_->ListTables(ns);
540-
}
541-
542-
Result<std::unique_ptr<Table>> InMemoryCatalog::CreateTable(
543-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
544-
const std::string& location,
545-
const std::unordered_map<std::string, std::string>& properties) {
546-
return impl_->CreateTable(identifier, schema, spec, location, properties);
547-
}
548-
549-
Result<std::unique_ptr<Table>> InMemoryCatalog::UpdateTable(
550-
const TableIdentifier& identifier,
551-
const std::vector<std::unique_ptr<UpdateRequirement>>& requirements,
552-
const std::vector<std::unique_ptr<MetadataUpdate>>& updates) {
553-
return impl_->UpdateTable(identifier, requirements, updates);
554-
}
555-
556-
Result<std::shared_ptr<Transaction>> InMemoryCatalog::StageCreateTable(
557-
const TableIdentifier& identifier, const Schema& schema, const PartitionSpec& spec,
558-
const std::string& location,
559-
const std::unordered_map<std::string, std::string>& properties) {
560-
return impl_->StageCreateTable(identifier, schema, spec, location, properties);
561-
}
562-
563-
Result<bool> InMemoryCatalog::TableExists(const TableIdentifier& identifier) const {
564-
return impl_->TableExists(identifier);
565-
}
566-
567-
Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
568-
return impl_->DropTable(identifier, purge);
569-
}
570-
571-
Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
572-
const TableIdentifier& identifier) const {
573-
return impl_->LoadTable(identifier);
574-
}
575-
576-
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(
577-
const TableIdentifier& identifier, const std::string& metadata_file_location) {
578-
return impl_->RegisterTable(identifier, metadata_file_location);
579-
}
580-
581442
std::unique_ptr<TableBuilder> InMemoryCatalog::BuildTable(
582443
const TableIdentifier& identifier, const Schema& schema) const {
583444
throw IcebergError("not implemented");

src/iceberg/catalog/in_memory_catalog.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
#pragma once
2121

22+
#include <mutex>
23+
2224
#include "iceberg/catalog.h"
2325

2426
namespace iceberg {
27+
2528
/**
2629
* @brief An in-memory implementation of the Iceberg Catalog interface.
2730
*
@@ -32,7 +35,9 @@ namespace iceberg {
3235
* @note This class is **not** suitable for production use.
3336
* All data will be lost when the process exits.
3437
*/
35-
class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
38+
class ICEBERG_EXPORT InMemoryCatalog
39+
: public Catalog,
40+
public std::enable_shared_from_this<InMemoryCatalog> {
3641
public:
3742
InMemoryCatalog(std::string const& name, std::shared_ptr<FileIO> const& file_io,
3843
std::string const& warehouse_location,
@@ -90,7 +95,12 @@ class ICEBERG_EXPORT InMemoryCatalog : public Catalog {
9095
const Schema& schema) const override;
9196

9297
private:
93-
std::unique_ptr<class InMemoryCatalogImpl> impl_;
98+
std::string catalog_name_;
99+
std::unordered_map<std::string, std::string> properties_;
100+
std::shared_ptr<FileIO> file_io_;
101+
std::string warehouse_location_;
102+
std::unique_ptr<class InMemoryNamespace> root_namespace_;
103+
mutable std::recursive_mutex mutex_;
94104
};
95105

96106
} // namespace iceberg

test/CMakeLists.txt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,6 @@ target_sources(schema_test
4444
target_link_libraries(schema_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
4545
add_test(NAME schema_test COMMAND schema_test)
4646

47-
add_executable(catalog_test)
48-
target_sources(catalog_test PRIVATE in_memory_catalog_test.cc)
49-
target_link_libraries(catalog_test PRIVATE iceberg_static GTest::gtest_main GTest::gmock)
50-
add_test(NAME catalog_test COMMAND catalog_test)
51-
5247
add_executable(table_test)
5348
target_include_directories(table_test PRIVATE "${CMAKE_BINARY_DIR}")
5449
target_sources(table_test PRIVATE test_common.cc json_internal_test.cc table_test.cc
@@ -89,4 +84,11 @@ if(ICEBERG_BUILD_BUNDLE)
8984
target_link_libraries(arrow_test PRIVATE iceberg_bundle_static GTest::gtest_main
9085
GTest::gmock)
9186
add_test(NAME arrow_test COMMAND arrow_test)
87+
88+
add_executable(catalog_test)
89+
target_include_directories(catalog_test PRIVATE "${CMAKE_BINARY_DIR}")
90+
target_sources(catalog_test PRIVATE test_common.cc in_memory_catalog_test.cc)
91+
target_link_libraries(catalog_test PRIVATE iceberg_bundle_static GTest::gtest_main
92+
GTest::gmock)
93+
add_test(NAME catalog_test COMMAND catalog_test)
9294
endif()

0 commit comments

Comments
 (0)