Skip to content

Commit 4db7c67

Browse files
authored
feat(rest): support snapshot loading mode (#543)
1 parent 72d3831 commit 4db7c67

File tree

9 files changed

+578
-703
lines changed

9 files changed

+578
-703
lines changed

src/iceberg/catalog/rest/catalog_properties.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include "iceberg/catalog/rest/catalog_properties.h"
2121

22+
#include <algorithm>
23+
#include <string>
2224
#include <string_view>
2325

2426
namespace iceberg::rest {
@@ -47,4 +49,16 @@ Result<std::string_view> RestCatalogProperties::Uri() const {
4749
return it->second;
4850
}
4951

52+
Result<SnapshotMode> RestCatalogProperties::SnapshotLoadingMode() const {
53+
std::string upper = StringUtils::ToUpper(Get(kSnapshotLoadingMode));
54+
if (upper == "ALL") {
55+
return SnapshotMode::kAll;
56+
} else if (upper == "REFS") {
57+
return SnapshotMode::kRefs;
58+
} else {
59+
return InvalidArgument("Invalid snapshot loading mode: '{}'.",
60+
Get(kSnapshotLoadingMode));
61+
}
62+
}
63+
5064
} // namespace iceberg::rest

src/iceberg/catalog/rest/catalog_properties.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131

3232
namespace iceberg::rest {
3333

34+
/// \brief Snapshot loading mode for REST catalog.
35+
enum class SnapshotMode : uint8_t { kAll, kRefs };
36+
3437
/// \brief Configuration class for a REST Catalog.
3538
class ICEBERG_REST_EXPORT RestCatalogProperties
3639
: public ConfigBase<RestCatalogProperties> {
@@ -46,6 +49,8 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
4649
inline static Entry<std::string> kWarehouse{"warehouse", ""};
4750
/// \brief The optional prefix for REST API paths.
4851
inline static Entry<std::string> kPrefix{"prefix", ""};
52+
/// \brief The snapshot loading mode (ALL or REFS).
53+
inline static Entry<std::string> kSnapshotLoadingMode{"snapshot-loading-mode", "ALL"};
4954
/// \brief The prefix for HTTP headers.
5055
inline static constexpr std::string_view kHeaderPrefix = "header.";
5156

@@ -62,6 +67,12 @@ class ICEBERG_REST_EXPORT RestCatalogProperties
6267
/// \brief Get the URI of the REST catalog server.
6368
/// \return The URI if configured, or an error if not set or empty.
6469
Result<std::string_view> Uri() const;
70+
71+
/// \brief Get the snapshot loading mode.
72+
/// \return SnapshotMode::kAll if configured as "ALL", SnapshotMode::kRefs if
73+
/// "REFS", or an error if the value is invalid. Parsing is
74+
/// case-insensitive to match Java behavior.
75+
Result<SnapshotMode> SnapshotLoadingMode() const;
6576
};
6677

6778
} // namespace iceberg::rest

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,29 +161,35 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
161161
paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
162162
final_config.Get(RestCatalogProperties::kPrefix)));
163163

164+
// Get snapshot loading mode
165+
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode());
166+
164167
auto client = std::make_unique<HttpClient>(final_config.ExtractHeaders());
165168
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
166169
auth_manager->CatalogSession(*client, final_config.configs()));
167170

168-
return std::shared_ptr<RestCatalog>(new RestCatalog(
169-
std::move(final_config), std::move(file_io), std::move(client), std::move(paths),
170-
std::move(endpoints), std::move(auth_manager), std::move(catalog_session)));
171+
return std::shared_ptr<RestCatalog>(
172+
new RestCatalog(std::move(final_config), std::move(file_io), std::move(client),
173+
std::move(paths), std::move(endpoints), std::move(auth_manager),
174+
std::move(catalog_session), snapshot_mode));
171175
}
172176

173177
RestCatalog::RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
174178
std::unique_ptr<HttpClient> client,
175179
std::unique_ptr<ResourcePaths> paths,
176180
std::unordered_set<Endpoint> endpoints,
177181
std::unique_ptr<auth::AuthManager> auth_manager,
178-
std::shared_ptr<auth::AuthSession> catalog_session)
182+
std::shared_ptr<auth::AuthSession> catalog_session,
183+
SnapshotMode snapshot_mode)
179184
: config_(std::move(config)),
180185
file_io_(std::move(file_io)),
181186
client_(std::move(client)),
182187
paths_(std::move(paths)),
183188
name_(config_.Get(RestCatalogProperties::kName)),
184189
supported_endpoints_(std::move(endpoints)),
185190
auth_manager_(std::move(auth_manager)),
186-
catalog_session_(std::move(catalog_session)) {
191+
catalog_session_(std::move(catalog_session)),
192+
snapshot_mode_(snapshot_mode) {
187193
ICEBERG_DCHECK(catalog_session_ != nullptr, "catalog_session must not be null");
188194
}
189195

@@ -442,9 +448,17 @@ Result<std::string> RestCatalog::LoadTableInternal(
442448
const TableIdentifier& identifier) const {
443449
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
444450
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
451+
452+
std::unordered_map<std::string, std::string> params;
453+
if (snapshot_mode_ == SnapshotMode::kRefs) {
454+
params["snapshots"] = "refs";
455+
} else {
456+
params["snapshots"] = "all";
457+
}
458+
445459
ICEBERG_ASSIGN_OR_RAISE(
446460
const auto response,
447-
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance(),
461+
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance(),
448462
*catalog_session_));
449463
return response.body();
450464
}
@@ -453,7 +467,6 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
453467
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
454468
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
455469
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
456-
457470
return Table::Make(identifier, std::move(load_result.metadata),
458471
std::move(load_result.metadata_location), file_io_,
459472
shared_from_this());

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
109109
std::unique_ptr<HttpClient> client, std::unique_ptr<ResourcePaths> paths,
110110
std::unordered_set<Endpoint> endpoints,
111111
std::unique_ptr<auth::AuthManager> auth_manager,
112-
std::shared_ptr<auth::AuthSession> catalog_session);
112+
std::shared_ptr<auth::AuthSession> catalog_session,
113+
SnapshotMode snapshot_mode);
113114

114115
Result<std::string> LoadTableInternal(const TableIdentifier& identifier) const;
115116

@@ -127,6 +128,7 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
127128
std::unordered_set<Endpoint> supported_endpoints_;
128129
std::unique_ptr<auth::AuthManager> auth_manager_;
129130
std::shared_ptr<auth::AuthSession> catalog_session_;
131+
SnapshotMode snapshot_mode_;
130132
};
131133

132134
} // namespace iceberg::rest

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ if(ICEBERG_BUILD_REST)
225225
if(ICEBERG_BUILD_REST_INTEGRATION_TESTS)
226226
add_rest_iceberg_test(rest_catalog_integration_test
227227
SOURCES
228-
rest_catalog_test.cc
228+
rest_catalog_integration_test.cc
229229
util/cmd_util.cc
230230
util/docker_compose_util.cc)
231231
endif()

src/iceberg/test/meson.build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ if get_option('rest').enabled()
120120
iceberg_tests += {
121121
'rest_integration_test': {
122122
'sources': files(
123-
'rest_catalog_test.cc',
123+
'rest_catalog_integration_test.cc',
124124
'util/cmd_util.cc',
125125
'util/docker_compose_util.cc',
126126
),

0 commit comments

Comments
 (0)