Skip to content

Commit d3176cd

Browse files
committed
feat(rest): support snapshot loading mode
1 parent e7f1d0f commit d3176cd

File tree

8 files changed

+236
-14
lines changed

8 files changed

+236
-14
lines changed

src/iceberg/catalog/rest/catalog_properties.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,17 @@ Result<std::string_view> RestCatalogProperties::Uri() const {
4848
return it->second;
4949
}
5050

51+
Result<SnapshotMode> RestCatalogProperties::SnapshotLoadingMode() const {
52+
auto mode_str = Get(kSnapshotLoadingMode);
53+
54+
if (mode_str == "ALL") {
55+
return SnapshotMode::ALL;
56+
} else if (mode_str == "REFS") {
57+
return SnapshotMode::REFS;
58+
} else {
59+
return InvalidArgument(
60+
"Invalid snapshot loading mode: '{}'. Expected 'ALL' or 'REFS'.", mode_str);
61+
}
62+
}
63+
5164
} // namespace iceberg::rest

src/iceberg/catalog/rest/catalog_properties.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@
3232

3333
namespace iceberg::rest {
3434

35+
/// \brief Snapshot loading mode for REST catalog.
36+
enum class SnapshotMode {
37+
/// Load all snapshots from the table metadata.
38+
ALL,
39+
/// Load only snapshots referenced in snapshot refs (branches/tags).
40+
REFS
41+
};
42+
3543
/// \brief Configuration class for a REST Catalog.
3644
class ICEBERG_REST_EXPORT RestCatalogProperties
3745
: public ConfigBase<RestCatalogProperties> {
@@ -47,6 +55,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
4755
inline static Entry<std::string> kWarehouse{"warehouse", ""};
4856
/// \brief The optional prefix for REST API paths.
4957
inline static Entry<std::string> kPrefix{"prefix", ""};
58+
/// \brief The snapshot loading mode (ALL or REFS).
59+
inline static Entry<std::string> kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"};
5060
/// \brief The prefix for HTTP headers.
5161
inline static constexpr std::string_view kHeaderPrefix = "header.";
5262

@@ -64,6 +74,11 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
6474
/// \return The URI if configured, or an error if not set or empty.
6575
Result<std::string_view> Uri() const;
6676

77+
/// \brief Get the snapshot loading mode.
78+
/// \return SnapshotMode::ALL if configured as "ALL", SnapshotMode::REFS if "REFS",
79+
/// or an error if the value is invalid.
80+
Result<SnapshotMode> SnapshotLoadingMode() const;
81+
6782
private:
6883
RestCatalogProperties() = default;
6984
};

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -139,21 +139,26 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
139139
paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
140140
final_config->Get(RestCatalogProperties::kPrefix)));
141141

142+
// Get snapshot loading mode
143+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config->SnapshotLoadingMode());
144+
142145
return std::shared_ptr<RestCatalog>(
143146
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),
144-
std::move(endpoints)));
147+
std::move(endpoints), snapshot_mode));
145148
}
146149

147150
RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
148151
std::shared_ptr<FileIO> file_io,
149152
std::unique_ptr<ResourcePaths> paths,
150-
std::unordered_set<Endpoint> endpoints)
153+
std::unordered_set<Endpoint> endpoints,
154+
SnapshotMode snapshot_mode)
151155
: config_(std::move(config)),
152156
file_io_(std::move(file_io)),
153157
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
154158
paths_(std::move(paths)),
155159
name_(config_->Get(RestCatalogProperties::kName)),
156-
supported_endpoints_(std::move(endpoints)) {}
160+
supported_endpoints_(std::move(endpoints)),
161+
snapshot_mode_(snapshot_mode) {}
157162

158163
std::string_view RestCatalog::name() const { return name_; }
159164

@@ -376,8 +381,8 @@ Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
376381

377382
Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
378383
if (!supported_endpoints_.contains(Endpoint::TableExists())) {
379-
// Fall back to call LoadTable
380-
return CaptureNoSuchTable(LoadTableInternal(identifier));
384+
// Fall back to call LoadTable with ALL mode (just checking existence)
385+
return CaptureNoSuchTable(LoadTableInternal(identifier, SnapshotMode::ALL));
381386
}
382387

383388
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
@@ -398,21 +403,31 @@ Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifi
398403
return {};
399404
}
400405

401-
Result<std::string> RestCatalog::LoadTableInternal(
402-
const TableIdentifier& identifier) const {
406+
Result<std::string> RestCatalog::LoadTableInternal(const TableIdentifier& identifier,
407+
SnapshotMode mode) const {
403408
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
404409
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
410+
411+
std::unordered_map<std::string, std::string> params;
412+
if (mode == SnapshotMode::REFS) {
413+
params["snapshots"] = "refs";
414+
}
415+
405416
ICEBERG_ASSIGN_OR_RAISE(
406417
const auto response,
407-
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
418+
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
408419
return response.body();
409420
}
410421

411422
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& identifier) {
412-
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
423+
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier, snapshot_mode_));
413424
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
414425
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
415426

427+
// In REFS mode, the server filters snapshots in the response to reduce payload size.
428+
// Unlike the Java implementation, we do not perform implicit lazy-loading of full
429+
// snapshots. We store only the returned (filtered) snapshots. Users requiring full
430+
// snapshot history must explicitly call LoadTable again with SnapshotMode::ALL.
416431
return Table::Make(identifier, std::move(load_result.metadata),
417432
std::move(load_result.metadata_location), file_io_,
418433
shared_from_this());

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <unordered_set>
2525

2626
#include "iceberg/catalog.h"
27+
#include "iceberg/catalog/rest/catalog_properties.h"
2728
#include "iceberg/catalog/rest/endpoint.h"
2829
#include "iceberg/catalog/rest/iceberg_rest_export.h"
2930
#include "iceberg/catalog/rest/type_fwd.h"
@@ -106,9 +107,10 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
106107
private:
107108
RestCatalog(std::unique_ptr<RestCatalogProperties> config,
108109
std::shared_ptr<FileIO> file_io, std::unique_ptr<ResourcePaths> paths,
109-
std::unordered_set<Endpoint> endpoints);
110+
std::unordered_set<Endpoint> endpoints, SnapshotMode snapshot_mode);
110111

111-
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
112+
Result<std::string> LoadTableInternal(const TableIdentifier& identifier,
113+
SnapshotMode mode) const;
112114

113115
Result<LoadTableResult> CreateTableInternal(
114116
const TableIdentifier& identifier, const std::shared_ptr<Schema>& schema,
@@ -122,6 +124,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
122124
std::unique_ptr<ResourcePaths> paths_;
123125
std::string name_;
124126
std::unordered_set<Endpoint> supported_endpoints_;
127+
SnapshotMode snapshot_mode_;
125128
};
126129

127130
} // namespace iceberg::rest

src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,5 @@ services:
2424
- CATALOG_WAREHOUSE=file:///tmp/iceberg_warehouse
2525
ports:
2626
- "8181:8181"
27+
volumes:
28+
- ${ICEBERG_TEST_DATA_DIR}:/tmp

src/iceberg/test/rest_catalog_test.cc

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121

2222
#include <unistd.h>
2323

24-
#include <algorithm>
2524
#include <chrono>
25+
#include <filesystem>
26+
#include <fstream>
2627
#include <memory>
2728
#include <print>
2829
#include <string>
@@ -687,4 +688,142 @@ TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
687688
EXPECT_EQ(props.at("key1"), "value1");
688689
}
689690

691+
TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotLoadingMode) {
692+
auto catalog_result = CreateCatalog();
693+
ASSERT_THAT(catalog_result, IsOk());
694+
auto& catalog = catalog_result.value();
695+
696+
Namespace ns{.levels = {"test_snapshot_mode"}};
697+
auto status = catalog->CreateNamespace(ns, {});
698+
ASSERT_THAT(status, IsOk());
699+
700+
auto schema = CreateDefaultSchema();
701+
auto partition_spec = PartitionSpec::Unpartitioned();
702+
auto sort_order = SortOrder::Unsorted();
703+
704+
TableIdentifier table_id{.ns = ns, .name = "snapshot_mode_table"};
705+
std::unordered_map<std::string, std::string> table_properties;
706+
auto table_result = catalog->CreateTable(table_id, schema, partition_spec, sort_order,
707+
"", table_properties);
708+
ASSERT_THAT(table_result, IsOk());
709+
auto& table = table_result.value();
710+
711+
std::string original_metadata_location(table->metadata_file_location());
712+
status = catalog->DropTable(table_id, /*purge=*/false);
713+
ASSERT_THAT(status, IsOk());
714+
715+
// Create a fake metadata JSON with 2 snapshots:
716+
// - Snapshot 1: not referenced by any branch/tag (will be filtered in REFS mode)
717+
// - Snapshot 2: referenced by main branch (will be loaded in both modes)
718+
auto& test_data_dir = docker_compose_->test_data_dir();
719+
auto metadata_filename = std::format("00000-{}.metadata.json", getpid());
720+
auto metadata_path = test_data_dir / metadata_filename;
721+
auto container_metadata_path = std::format("/tmp/{}", metadata_filename);
722+
std::string fake_metadata_json = std::format(R"({{
723+
"format-version": 2,
724+
"table-uuid": "12345678-1234-5678-1234-123456789abc",
725+
"location": "file:/tmp/iceberg_warehouse/{}",
726+
"last-sequence-number": 2,
727+
"last-updated-ms": 1602638573590,
728+
"last-column-id": 2,
729+
"current-schema-id": 0,
730+
"schemas": [{{"type": "struct", "schema-id": 0, "fields": [
731+
{{"id": 1, "name": "id", "required": true, "type": "int"}},
732+
{{"id": 2, "name": "data", "required": false, "type": "string"}}
733+
]}}],
734+
"default-spec-id": 0,
735+
"partition-specs": [{{"spec-id": 0, "fields": []}}],
736+
"last-partition-id": 1000,
737+
"default-sort-order-id": 0,
738+
"sort-orders": [{{"order-id": 0, "fields": []}}],
739+
"properties": {{}},
740+
"current-snapshot-id": 2,
741+
"snapshots": [
742+
{{
743+
"snapshot-id": 1,
744+
"timestamp-ms": 1515100955770,
745+
"sequence-number": 1,
746+
"summary": {{"operation": "append"}},
747+
"manifest-list": "file:/tmp/iceberg_warehouse/{}/metadata/snap-1.avro"
748+
}},
749+
{{
750+
"snapshot-id": 2,
751+
"parent-snapshot-id": 1,
752+
"timestamp-ms": 1525100955770,
753+
"sequence-number": 2,
754+
"summary": {{"operation": "append"}},
755+
"manifest-list": "file:/tmp/iceberg_warehouse/{}/metadata/snap-2.avro"
756+
}}
757+
],
758+
"snapshot-log": [
759+
{{"snapshot-id": 1, "timestamp-ms": 1515100955770}},
760+
{{"snapshot-id": 2, "timestamp-ms": 1525100955770}}
761+
],
762+
"metadata-log": [],
763+
"refs": {{
764+
"main": {{
765+
"snapshot-id": 2,
766+
"type": "branch"
767+
}}
768+
}}
769+
}})",
770+
ns.levels[0], ns.levels[0], ns.levels[0]);
771+
772+
// Write metadata file and register the table
773+
std::ofstream metadata_file(metadata_path.string());
774+
metadata_file << fake_metadata_json;
775+
metadata_file.close();
776+
auto register_metadata_location = std::format("file:{}", container_metadata_path);
777+
auto register_result = catalog->RegisterTable(table_id, register_metadata_location);
778+
ASSERT_THAT(register_result, IsOk());
779+
780+
// Test with ALL mode (default)
781+
auto config_all = RestCatalogProperties::default_properties();
782+
config_all
783+
->Set(RestCatalogProperties::kUri,
784+
std::format("{}:{}", kLocalhostUri, kRestCatalogPort))
785+
.Set(RestCatalogProperties::kName, std::string(kCatalogName))
786+
.Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName))
787+
.Set(RestCatalogProperties::kSnapshotLoadingMode, std::string("ALL"));
788+
auto catalog_all_result =
789+
RestCatalog::Make(*config_all, std::make_shared<test::StdFileIO>());
790+
ASSERT_THAT(catalog_all_result, IsOk());
791+
auto& catalog_all = catalog_all_result.value();
792+
793+
// Load table with ALL mode and verify both snapshots are loaded
794+
auto table_all_result = catalog_all->LoadTable(table_id);
795+
ASSERT_THAT(table_all_result, IsOk());
796+
auto& table_all = table_all_result.value();
797+
EXPECT_EQ(table_all->metadata()->snapshots.size(), 2);
798+
799+
// Test with REFS mode
800+
auto config_refs = RestCatalogProperties::default_properties();
801+
config_refs
802+
->Set(RestCatalogProperties::kUri,
803+
std::format("{}:{}", kLocalhostUri, kRestCatalogPort))
804+
.Set(RestCatalogProperties::kName, std::string(kCatalogName))
805+
.Set(RestCatalogProperties::kWarehouse, std::string(kWarehouseName))
806+
.Set(RestCatalogProperties::kSnapshotLoadingMode, std::string("REFS"));
807+
auto catalog_refs_result =
808+
RestCatalog::Make(*config_refs, std::make_shared<test::StdFileIO>());
809+
ASSERT_THAT(catalog_refs_result, IsOk());
810+
auto& catalog_refs = catalog_refs_result.value();
811+
812+
// Load table with REFS mode and verify only referenced snapshot is loaded
813+
auto table_refs_result = catalog_refs->LoadTable(table_id);
814+
ASSERT_THAT(table_refs_result, IsOk());
815+
auto& table_refs = table_refs_result.value();
816+
EXPECT_EQ(table_refs->metadata()->snapshots.size(), 1);
817+
EXPECT_EQ(table_refs->metadata()->snapshots[0]->snapshot_id, 2);
818+
819+
// Verify refs are preserved in both modes
820+
EXPECT_EQ(table_all->metadata()->refs.size(), 1);
821+
EXPECT_EQ(table_refs->metadata()->refs.size(), 1);
822+
EXPECT_TRUE(table_all->metadata()->refs.contains("main"));
823+
EXPECT_TRUE(table_refs->metadata()->refs.contains("main"));
824+
825+
// Clean up metadata file
826+
std::filesystem::remove(metadata_path);
827+
}
828+
690829
} // namespace iceberg::rest

src/iceberg/test/util/docker_compose_util.cc

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,35 @@
1919

2020
#include "iceberg/test/util/docker_compose_util.h"
2121

22+
#include <unistd.h>
23+
2224
#include <cctype>
25+
#include <chrono>
26+
#include <format>
27+
#include <print>
2328

2429
#include "iceberg/test/util/cmd_util.h"
2530

2631
namespace iceberg {
2732

33+
namespace {
34+
/// \brief Generate a unique test data directory path
35+
std::filesystem::path GenerateTestDataDir() {
36+
auto timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(
37+
std::chrono::system_clock::now().time_since_epoch())
38+
.count();
39+
auto pid = getpid();
40+
return {std::format("/tmp/iceberg-test-{}-{}", timestamp, pid)};
41+
}
42+
} // namespace
43+
2844
DockerCompose::DockerCompose(std::string project_name,
2945
std::filesystem::path docker_compose_dir)
3046
: project_name_(std::move(project_name)),
31-
docker_compose_dir_(std::move(docker_compose_dir)) {}
47+
docker_compose_dir_(std::move(docker_compose_dir)),
48+
test_data_dir_(GenerateTestDataDir()) {
49+
std::filesystem::create_directories(test_data_dir_);
50+
}
3251

3352
DockerCompose::~DockerCompose() { Down(); }
3453

@@ -39,13 +58,24 @@ void DockerCompose::Up() {
3958

4059
void DockerCompose::Down() {
4160
auto cmd = BuildDockerCommand({"down", "-v", "--remove-orphans"});
42-
return cmd.RunCommand("docker compose down");
61+
cmd.RunCommand("docker compose down");
62+
63+
// Clean up the test data directory
64+
if (!test_data_dir_.empty() && std::filesystem::exists(test_data_dir_)) {
65+
std::error_code ec;
66+
std::filesystem::remove_all(test_data_dir_, ec);
67+
if (!ec) {
68+
std::println("[INFO] Cleaned up test data directory: {}", test_data_dir_.string());
69+
}
70+
}
4371
}
4472

4573
Command DockerCompose::BuildDockerCommand(const std::vector<std::string>& args) const {
4674
Command cmd("docker");
4775
// Set working directory
4876
cmd.CurrentDir(docker_compose_dir_);
77+
// Set the test data directory environment variable
78+
cmd.Env("ICEBERG_TEST_DATA_DIR", test_data_dir_.string());
4979
// Use 'docker compose' subcommand with project name
5080
cmd.Arg("compose").Arg("-p").Arg(project_name_).Args(args);
5181
return cmd;

0 commit comments

Comments
 (0)