Skip to content

Commit bcbdc9c

Browse files
Sreesh Maheshwarclaude
andcommitted
feat: S3 credential vending for REST catalog
Implement per-table credential vending in RestCatalog, following the pattern from iceberg-java's RESTSessionCatalog and PyIceberg's REST catalog. - Set X-Iceberg-Access-Delegation header at session level on HttpClient - ResolveTableFileIO merges catalog properties with per-table config and storage credentials (longest-prefix match), creating per-table FileIO via FileIORegistry when credentials are vended - StorageCredential struct with JSON serde and prefix-based resolution - Applied consistently to LoadTable, CreateTable, RegisterTable, StageCreateTable - Table exposes io_properties() for consumers that need vended configuration - Table::Refresh() updates io_properties_ to prevent stale credentials - Comprehensive unit tests (property merging, storage credential resolution, io_properties lifecycle) and REST integration tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 633965f commit bcbdc9c

15 files changed

Lines changed: 638 additions & 22 deletions

src/iceberg/catalog/rest/constant.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ inline const std::string kMimeTypeFormUrlEncoded = "application/x-www-form-urlen
3838
inline const std::string kUserAgentPrefix = "iceberg-cpp/";
3939
inline const std::string kUserAgent = "iceberg-cpp/" ICEBERG_VERSION_STRING;
4040

41+
inline const std::string kHeaderAccessDelegation = "X-Iceberg-Access-Delegation";
42+
inline const std::string kAccessDelegationVendedCredentials = "vended-credentials";
43+
4144
inline const std::string kQueryParamParent = "parent";
4245
inline const std::string kQueryParamPageToken = "page_token";
4346

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ constexpr std::string_view kStack = "stack";
7272
constexpr std::string_view kError = "error";
7373
constexpr std::string_view kIdentifier = "identifier";
7474
constexpr std::string_view kRequirements = "requirements";
75+
constexpr std::string_view kStorageCredentials = "storage-credentials";
76+
constexpr std::string_view kPrefix = "prefix";
7577
constexpr std::string_view kAccessToken = "access_token";
7678
constexpr std::string_view kTokenType = "token_type";
7779
constexpr std::string_view kExpiresIn = "expires_in";
@@ -217,12 +219,35 @@ Result<RenameTableRequest> RenameTableRequestFromJson(const nlohmann::json& json
217219
return request;
218220
}
219221

222+
// StorageCredential serialization
223+
nlohmann::json ToJson(const StorageCredential& cred) {
224+
nlohmann::json json;
225+
json[kPrefix] = cred.prefix;
226+
json[kConfig] = cred.config;
227+
return json;
228+
}
229+
230+
Result<StorageCredential> StorageCredentialFromJson(const nlohmann::json& json) {
231+
StorageCredential cred;
232+
ICEBERG_ASSIGN_OR_RAISE(cred.prefix, GetJsonValue<std::string>(json, kPrefix));
233+
ICEBERG_ASSIGN_OR_RAISE(cred.config,
234+
GetJsonValueOrDefault<decltype(cred.config)>(json, kConfig));
235+
return cred;
236+
}
237+
220238
// LoadTableResult (used by CreateTableResponse, LoadTableResponse)
221239
nlohmann::json ToJson(const LoadTableResult& result) {
222240
nlohmann::json json;
223241
SetOptionalStringField(json, kMetadataLocation, result.metadata_location);
224242
json[kMetadata] = ToJson(*result.metadata);
225243
SetContainerField(json, kConfig, result.config);
244+
if (!result.storage_credentials.empty()) {
245+
nlohmann::json creds_json = nlohmann::json::array();
246+
for (const auto& cred : result.storage_credentials) {
247+
creds_json.push_back(ToJson(cred));
248+
}
249+
json[kStorageCredentials] = std::move(creds_json);
250+
}
226251
return json;
227252
}
228253

@@ -235,6 +260,14 @@ Result<LoadTableResult> LoadTableResultFromJson(const nlohmann::json& json) {
235260
ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json));
236261
ICEBERG_ASSIGN_OR_RAISE(result.config,
237262
GetJsonValueOrDefault<decltype(result.config)>(json, kConfig));
263+
if (json.contains(kStorageCredentials)) {
264+
ICEBERG_ASSIGN_OR_RAISE(auto creds_json,
265+
GetJsonValue<nlohmann::json>(json, kStorageCredentials));
266+
for (const auto& cred_json : creds_json) {
267+
ICEBERG_ASSIGN_OR_RAISE(auto cred, StorageCredentialFromJson(cred_json));
268+
result.storage_credentials.push_back(std::move(cred));
269+
}
270+
}
238271
ICEBERG_RETURN_UNEXPECTED(result.Validate());
239272
return result;
240273
}
@@ -528,5 +561,6 @@ ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
528561
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
529562
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)
530563
ICEBERG_DEFINE_FROM_JSON(OAuthTokenResponse)
564+
ICEBERG_DEFINE_FROM_JSON(StorageCredential)
531565

532566
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_serde_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
5959
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
6060
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
6161
ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)
62+
ICEBERG_DECLARE_JSON_SERDE(StorageCredential)
6263

6364
#undef ICEBERG_DECLARE_JSON_SERDE
6465

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "iceberg/catalog/rest/json_serde_internal.h"
3636
#include "iceberg/catalog/rest/resource_paths.h"
3737
#include "iceberg/catalog/rest/rest_file_io.h"
38+
#include "iceberg/catalog/rest/rest_file_io_internal.h"
3839
#include "iceberg/catalog/rest/rest_util.h"
3940
#include "iceberg/catalog/rest/types.h"
4041
#include "iceberg/json_serde_internal.h"
@@ -166,7 +167,9 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
166167
// Get snapshot loading mode
167168
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode());
168169

169-
auto client = std::make_unique<HttpClient>(final_config.ExtractHeaders());
170+
auto default_headers = final_config.ExtractHeaders();
171+
default_headers.emplace(kHeaderAccessDelegation, kAccessDelegationVendedCredentials);
172+
auto client = std::make_unique<HttpClient>(std::move(default_headers));
170173
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
171174
auth_manager->CatalogSession(*client, final_config.configs()));
172175

@@ -361,8 +364,13 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
361364
ICEBERG_ASSIGN_OR_RAISE(auto result,
362365
CreateTableInternal(identifier, schema, spec, order, location,
363366
properties, /*stage_create=*/false));
367+
ICEBERG_ASSIGN_OR_RAISE(
368+
auto table_io,
369+
ResolveTableFileIO(file_io_, config_.configs(),
370+
config_.Get(RestCatalogProperties::kWarehouse), result));
364371
return Table::Make(identifier, std::move(result.metadata),
365-
std::move(result.metadata_location), file_io_, shared_from_this());
372+
std::move(result.metadata_location), std::move(table_io),
373+
shared_from_this());
366374
}
367375

368376
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -404,10 +412,14 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
404412
ICEBERG_ASSIGN_OR_RAISE(auto result,
405413
CreateTableInternal(identifier, schema, spec, order, location,
406414
properties, /*stage_create=*/true));
415+
ICEBERG_ASSIGN_OR_RAISE(
416+
auto table_io,
417+
ResolveTableFileIO(file_io_, config_.configs(),
418+
config_.Get(RestCatalogProperties::kWarehouse), result));
407419
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
408420
StagedTable::Make(identifier, std::move(result.metadata),
409-
std::move(result.metadata_location), file_io_,
410-
shared_from_this()));
421+
std::move(result.metadata_location),
422+
std::move(table_io), shared_from_this()));
411423
return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
412424
}
413425

@@ -474,9 +486,13 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
474486
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
475487
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
476488
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
477-
/// FIXME: support per-table FileIO creation
489+
490+
ICEBERG_ASSIGN_OR_RAISE(
491+
auto table_io,
492+
ResolveTableFileIO(file_io_, config_.configs(),
493+
config_.Get(RestCatalogProperties::kWarehouse), load_result));
478494
return Table::Make(identifier, std::move(load_result.metadata),
479-
std::move(load_result.metadata_location), file_io_,
495+
std::move(load_result.metadata_location), std::move(table_io),
480496
shared_from_this());
481497
}
482498

@@ -498,8 +514,12 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
498514

499515
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
500516
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
517+
ICEBERG_ASSIGN_OR_RAISE(
518+
auto table_io,
519+
ResolveTableFileIO(file_io_, config_.configs(),
520+
config_.Get(RestCatalogProperties::kWarehouse), load_result));
501521
return Table::Make(identifier, std::move(load_result.metadata),
502-
std::move(load_result.metadata_location), file_io_,
522+
std::move(load_result.metadata_location), std::move(table_io),
503523
shared_from_this());
504524
}
505525

src/iceberg/catalog/rest/rest_file_io.cc

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
#include "iceberg/catalog/rest/rest_file_io.h"
2121

2222
#include <string>
23+
#include <unordered_map>
2324

25+
#include "iceberg/catalog/rest/catalog_properties.h"
26+
#include "iceberg/catalog/rest/rest_file_io_internal.h"
2427
#include "iceberg/file_io_registry.h"
2528
#include "iceberg/util/macros.h"
2629

@@ -69,12 +72,14 @@ Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(const RestCatalogProperties& c
6972

7073
if (io_impl.empty()) {
7174
if (warehouse.empty()) {
72-
return InvalidArgument(R"("{}" or "{}" property is required to create FileIO)",
73-
RestCatalogProperties::kIOImpl.key(),
74-
RestCatalogProperties::kWarehouse.key());
75+
// No io-impl or warehouse configured. Fall back to a local FileIO as a
76+
// default — the per-table ResolveTableFileIO will resolve the real FileIO
77+
// when a table is loaded (using the table's location to detect S3, etc.).
78+
io_impl = std::string(FileIORegistry::kArrowLocalFileIO);
79+
} else {
80+
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse));
81+
io_impl = std::string(BuiltinFileIOName(detected_kind));
7582
}
76-
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse));
77-
io_impl = std::string(BuiltinFileIOName(detected_kind));
7883
}
7984

8085
if (!warehouse.empty() && IsBuiltinImpl(io_impl)) {
@@ -92,4 +97,73 @@ Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(const RestCatalogProperties& c
9297
return FileIORegistry::Load(io_impl, config.configs());
9398
}
9499

100+
namespace {
101+
102+
const StorageCredential* ResolveStorageCredential(
103+
const std::vector<StorageCredential>& credentials, std::string_view location) {
104+
const StorageCredential* best = nullptr;
105+
for (const auto& cred : credentials) {
106+
if (location.starts_with(cred.prefix)) {
107+
if (!best || cred.prefix.size() > best->prefix.size()) {
108+
best = &cred;
109+
}
110+
}
111+
}
112+
return best;
113+
}
114+
115+
std::unordered_map<std::string, std::string> MergeTableProperties(
116+
const std::unordered_map<std::string, std::string>& catalog_props,
117+
const std::unordered_map<std::string, std::string>& table_config,
118+
const std::unordered_map<std::string, std::string>& credential_config) {
119+
auto merged = catalog_props;
120+
for (const auto& [k, v] : table_config) {
121+
merged[k] = v;
122+
}
123+
for (const auto& [k, v] : credential_config) {
124+
merged[k] = v;
125+
}
126+
return merged;
127+
}
128+
129+
} // namespace
130+
131+
Result<std::shared_ptr<FileIO>> ResolveTableFileIO(
132+
const std::shared_ptr<FileIO>& catalog_io,
133+
const std::unordered_map<std::string, std::string>& catalog_props,
134+
const std::string& warehouse, const LoadTableResult& result) {
135+
if (result.config.empty() && result.storage_credentials.empty()) {
136+
return catalog_io;
137+
}
138+
139+
// TODO: Python also merges result.metadata->properties into the chain here
140+
// (metadata props < catalog < table config < credentials). Java does not.
141+
// Consider adding this for parity with Python if needed.
142+
const StorageCredential* cred = nullptr;
143+
if (!result.metadata_location.empty()) {
144+
cred = ResolveStorageCredential(result.storage_credentials, result.metadata_location);
145+
}
146+
static const std::unordered_map<std::string, std::string> kEmpty;
147+
auto merged =
148+
MergeTableProperties(catalog_props, result.config, cred ? cred->config : kEmpty);
149+
150+
// Detect FileIO type: explicit io-impl > warehouse scheme > metadata_location scheme.
151+
std::string io_impl;
152+
if (auto it = merged.find(std::string(RestCatalogProperties::kIOImpl.key()));
153+
it != merged.end()) {
154+
io_impl = it->second;
155+
} else if (!warehouse.empty()) {
156+
ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(warehouse));
157+
io_impl = std::string(BuiltinFileIOName(kind));
158+
} else if (!result.metadata_location.empty()) {
159+
ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(result.metadata_location));
160+
io_impl = std::string(BuiltinFileIOName(kind));
161+
} else {
162+
return catalog_io;
163+
}
164+
165+
ICEBERG_ASSIGN_OR_RAISE(auto table_io, FileIORegistry::Load(io_impl, merged));
166+
return std::shared_ptr<FileIO>(std::move(table_io));
167+
}
168+
95169
} // namespace iceberg::rest
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
#include <memory>
23+
#include <string>
24+
#include <unordered_map>
25+
26+
#include "iceberg/catalog/rest/iceberg_rest_export.h"
27+
#include "iceberg/catalog/rest/types.h"
28+
#include "iceberg/file_io.h"
29+
#include "iceberg/result.h"
30+
31+
/// \file iceberg/catalog/rest/rest_file_io_internal.h
32+
/// Internal helpers for per-table FileIO resolution in the REST catalog.
33+
34+
namespace iceberg::rest {
35+
36+
/// \brief Resolve a per-table FileIO from a LoadTableResult.
37+
///
38+
/// Merges catalog properties, table config, and the best-matching storage
39+
/// credential (longest prefix match on metadata_location), then creates a
40+
/// per-table FileIO via FileIORegistry. Falls back to \p catalog_io when
41+
/// no per-table config is present or the FileIO type cannot be detected.
42+
ICEBERG_REST_EXPORT Result<std::shared_ptr<FileIO>> ResolveTableFileIO(
43+
const std::shared_ptr<FileIO>& catalog_io,
44+
const std::unordered_map<std::string, std::string>& catalog_props,
45+
const std::string& warehouse, const LoadTableResult& result);
46+
47+
} // namespace iceberg::rest

src/iceberg/catalog/rest/types.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,25 @@ struct ICEBERG_REST_EXPORT CreateTableRequest {
169169
/// \brief An opaque token that allows clients to make use of pagination for list APIs.
170170
using PageToken = std::string;
171171

172+
/// \brief A storage credential returned by the REST catalog server.
173+
///
174+
/// Each credential has a prefix (a location prefix like "s3://bucket/path") and
175+
/// a config map containing cloud-provider-specific properties (e.g., temporary
176+
/// access keys). Clients resolve the best credential via longest prefix match
177+
/// against the table's metadata location.
178+
struct ICEBERG_REST_EXPORT StorageCredential {
179+
std::string prefix;
180+
std::unordered_map<std::string, std::string> config;
181+
182+
bool operator==(const StorageCredential&) const = default;
183+
};
184+
172185
/// \brief Result body for table create/load/register APIs.
173186
struct ICEBERG_REST_EXPORT LoadTableResult {
174187
std::string metadata_location;
175188
std::shared_ptr<TableMetadata> metadata; // required
176189
std::unordered_map<std::string, std::string> config;
177-
// TODO(Li Feiyang): Add std::shared_ptr<StorageCredential> storage_credential;
190+
std::vector<StorageCredential> storage_credentials;
178191

179192
/// \brief Validates the LoadTableResult.
180193
Status Validate() const {

src/iceberg/file_io.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <optional>
2323
#include <string>
2424
#include <string_view>
25+
#include <unordered_map>
2526

2627
#include "iceberg/iceberg_export.h"
2728
#include "iceberg/result.h"
@@ -42,6 +43,14 @@ class ICEBERG_EXPORT FileIO {
4243
FileIO() = default;
4344
virtual ~FileIO() = default;
4445

46+
/// \brief Returns the configuration properties used to initialize this FileIO.
47+
///
48+
/// Engines that need to configure their own storage access (e.g., for credential
49+
/// vending) can read these properties to obtain the resolved credentials.
50+
const std::unordered_map<std::string, std::string>& properties() const {
51+
return properties_;
52+
}
53+
4554
/// \brief Read the content of the file at the given location.
4655
///
4756
/// \param file_location The location of the file to read.
@@ -73,6 +82,10 @@ class ICEBERG_EXPORT FileIO {
7382
virtual Status DeleteFile(const std::string& file_location) {
7483
return NotImplemented("DeleteFile not implemented");
7584
}
85+
86+
private:
87+
friend class FileIORegistry;
88+
std::unordered_map<std::string, std::string> properties_;
7689
};
7790

7891
} // namespace iceberg

src/iceberg/file_io_registry.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include <mutex>
2121
#include <utility>
2222

23+
#include "iceberg/util/macros.h"
24+
2325
namespace iceberg {
2426

2527
namespace {
@@ -55,7 +57,9 @@ Result<std::unique_ptr<FileIO>> FileIORegistry::Load(
5557
}
5658
factory = it->second;
5759
}
58-
return factory(properties);
60+
ICEBERG_ASSIGN_OR_RAISE(auto io, factory(properties));
61+
io->properties_ = properties;
62+
return io;
5963
}
6064

6165
} // namespace iceberg

0 commit comments

Comments
 (0)