Skip to content

Commit 620041b

Browse files
Sreesh Maheshwarclaude
andcommitted
feat: S3 credential vending for REST catalog
Implement per-table credential vending in RestCatalog, following the pattern from iceberg-java's RESTSessionCatalog and PyIceberg's REST catalog. - Set X-Iceberg-Access-Delegation header at session level on HttpClient - ResolveTableFileIO merges catalog properties with per-table config and storage credentials (longest-prefix match), creating per-table FileIO via FileIORegistry when credentials are vended - StorageCredential struct with JSON serde and prefix-based resolution - Applied consistently to LoadTable, CreateTable, RegisterTable, StageCreateTable - Table exposes io_properties() for consumers that need vended configuration - Table::Refresh() updates io_properties_ to prevent stale credentials - Comprehensive unit tests (property merging, storage credential resolution, io_properties lifecycle) and REST integration tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 633965f commit 620041b

15 files changed

Lines changed: 591 additions & 19 deletions

src/iceberg/catalog/rest/constant.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ inline const std::string kMimeTypeFormUrlEncoded = "application/x-www-form-urlen
3838
inline const std::string kUserAgentPrefix = "iceberg-cpp/";
3939
inline const std::string kUserAgent = "iceberg-cpp/" ICEBERG_VERSION_STRING;
4040

41+
inline const std::string kHeaderAccessDelegation = "X-Iceberg-Access-Delegation";
42+
inline const std::string kAccessDelegationVendedCredentials = "vended-credentials";
43+
4144
inline const std::string kQueryParamParent = "parent";
4245
inline const std::string kQueryParamPageToken = "page_token";
4346

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ constexpr std::string_view kStack = "stack";
7272
constexpr std::string_view kError = "error";
7373
constexpr std::string_view kIdentifier = "identifier";
7474
constexpr std::string_view kRequirements = "requirements";
75+
constexpr std::string_view kStorageCredentials = "storage-credentials";
76+
constexpr std::string_view kPrefix = "prefix";
7577
constexpr std::string_view kAccessToken = "access_token";
7678
constexpr std::string_view kTokenType = "token_type";
7779
constexpr std::string_view kExpiresIn = "expires_in";
@@ -217,12 +219,35 @@ Result<RenameTableRequest> RenameTableRequestFromJson(const nlohmann::json& json
217219
return request;
218220
}
219221

222+
// StorageCredential serialization
223+
nlohmann::json ToJson(const StorageCredential& cred) {
224+
nlohmann::json json;
225+
json[kPrefix] = cred.prefix;
226+
json[kConfig] = cred.config;
227+
return json;
228+
}
229+
230+
Result<StorageCredential> StorageCredentialFromJson(const nlohmann::json& json) {
231+
StorageCredential cred;
232+
ICEBERG_ASSIGN_OR_RAISE(cred.prefix, GetJsonValue<std::string>(json, kPrefix));
233+
ICEBERG_ASSIGN_OR_RAISE(cred.config,
234+
GetJsonValueOrDefault<decltype(cred.config)>(json, kConfig));
235+
return cred;
236+
}
237+
220238
// LoadTableResult (used by CreateTableResponse, LoadTableResponse)
221239
nlohmann::json ToJson(const LoadTableResult& result) {
222240
nlohmann::json json;
223241
SetOptionalStringField(json, kMetadataLocation, result.metadata_location);
224242
json[kMetadata] = ToJson(*result.metadata);
225243
SetContainerField(json, kConfig, result.config);
244+
if (!result.storage_credentials.empty()) {
245+
nlohmann::json creds_json = nlohmann::json::array();
246+
for (const auto& cred : result.storage_credentials) {
247+
creds_json.push_back(ToJson(cred));
248+
}
249+
json[kStorageCredentials] = std::move(creds_json);
250+
}
226251
return json;
227252
}
228253

@@ -235,6 +260,14 @@ Result<LoadTableResult> LoadTableResultFromJson(const nlohmann::json& json) {
235260
ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json));
236261
ICEBERG_ASSIGN_OR_RAISE(result.config,
237262
GetJsonValueOrDefault<decltype(result.config)>(json, kConfig));
263+
if (json.contains(kStorageCredentials)) {
264+
ICEBERG_ASSIGN_OR_RAISE(auto creds_json,
265+
GetJsonValue<nlohmann::json>(json, kStorageCredentials));
266+
for (const auto& cred_json : creds_json) {
267+
ICEBERG_ASSIGN_OR_RAISE(auto cred, StorageCredentialFromJson(cred_json));
268+
result.storage_credentials.push_back(std::move(cred));
269+
}
270+
}
238271
ICEBERG_RETURN_UNEXPECTED(result.Validate());
239272
return result;
240273
}
@@ -528,5 +561,6 @@ ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
528561
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
529562
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)
530563
ICEBERG_DEFINE_FROM_JSON(OAuthTokenResponse)
564+
ICEBERG_DEFINE_FROM_JSON(StorageCredential)
531565

532566
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_serde_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
5959
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
6060
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
6161
ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)
62+
ICEBERG_DECLARE_JSON_SERDE(StorageCredential)
6263

6364
#undef ICEBERG_DECLARE_JSON_SERDE
6465

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
166166
// Get snapshot loading mode
167167
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode());
168168

169-
auto client = std::make_unique<HttpClient>(final_config.ExtractHeaders());
169+
auto default_headers = final_config.ExtractHeaders();
170+
default_headers.emplace(kHeaderAccessDelegation, kAccessDelegationVendedCredentials);
171+
auto client = std::make_unique<HttpClient>(std::move(default_headers));
170172
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
171173
auth_manager->CatalogSession(*client, final_config.configs()));
172174

@@ -361,8 +363,13 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
361363
ICEBERG_ASSIGN_OR_RAISE(auto result,
362364
CreateTableInternal(identifier, schema, spec, order, location,
363365
properties, /*stage_create=*/false));
366+
ICEBERG_ASSIGN_OR_RAISE(
367+
auto table_io,
368+
ResolveTableFileIO(file_io_, config_.configs(),
369+
config_.Get(RestCatalogProperties::kWarehouse), result));
364370
return Table::Make(identifier, std::move(result.metadata),
365-
std::move(result.metadata_location), file_io_, shared_from_this());
371+
std::move(result.metadata_location), std::move(table_io),
372+
shared_from_this());
366373
}
367374

368375
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
@@ -404,10 +411,14 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
404411
ICEBERG_ASSIGN_OR_RAISE(auto result,
405412
CreateTableInternal(identifier, schema, spec, order, location,
406413
properties, /*stage_create=*/true));
414+
ICEBERG_ASSIGN_OR_RAISE(
415+
auto table_io,
416+
ResolveTableFileIO(file_io_, config_.configs(),
417+
config_.Get(RestCatalogProperties::kWarehouse), result));
407418
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
408419
StagedTable::Make(identifier, std::move(result.metadata),
409-
std::move(result.metadata_location), file_io_,
410-
shared_from_this()));
420+
std::move(result.metadata_location),
421+
std::move(table_io), shared_from_this()));
411422
return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
412423
}
413424

@@ -474,9 +485,13 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
474485
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
475486
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
476487
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
477-
/// FIXME: support per-table FileIO creation
488+
489+
ICEBERG_ASSIGN_OR_RAISE(
490+
auto table_io,
491+
ResolveTableFileIO(file_io_, config_.configs(),
492+
config_.Get(RestCatalogProperties::kWarehouse), load_result));
478493
return Table::Make(identifier, std::move(load_result.metadata),
479-
std::move(load_result.metadata_location), file_io_,
494+
std::move(load_result.metadata_location), std::move(table_io),
480495
shared_from_this());
481496
}
482497

@@ -498,8 +513,12 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
498513

499514
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
500515
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
516+
ICEBERG_ASSIGN_OR_RAISE(
517+
auto table_io,
518+
ResolveTableFileIO(file_io_, config_.configs(),
519+
config_.Get(RestCatalogProperties::kWarehouse), load_result));
501520
return Table::Make(identifier, std::move(load_result.metadata),
502-
std::move(load_result.metadata_location), file_io_,
521+
std::move(load_result.metadata_location), std::move(table_io),
503522
shared_from_this());
504523
}
505524

src/iceberg/catalog/rest/rest_file_io.cc

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include "iceberg/catalog/rest/rest_file_io.h"
2121

2222
#include <string>
23+
#include <unordered_map>
2324

25+
#include "iceberg/catalog/rest/catalog_properties.h"
2426
#include "iceberg/file_io_registry.h"
2527
#include "iceberg/util/macros.h"
2628

@@ -69,12 +71,13 @@ Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(const RestCatalogProperties& c
6971

7072
if (io_impl.empty()) {
7173
if (warehouse.empty()) {
72-
return InvalidArgument(R"("{}" or "{}" property is required to create FileIO)",
73-
RestCatalogProperties::kIOImpl.key(),
74-
RestCatalogProperties::kWarehouse.key());
74+
// No io-impl or warehouse configured. Fall back to a local FileIO as a
75+
// default — enabling per-table ResolveTableFileIO (vending).
76+
io_impl = std::string(FileIORegistry::kArrowLocalFileIO);
77+
} else {
78+
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse));
79+
io_impl = std::string(BuiltinFileIOName(detected_kind));
7580
}
76-
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse));
77-
io_impl = std::string(BuiltinFileIOName(detected_kind));
7881
}
7982

8083
if (!warehouse.empty() && IsBuiltinImpl(io_impl)) {
@@ -92,4 +95,71 @@ Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(const RestCatalogProperties& c
9295
return FileIORegistry::Load(io_impl, config.configs());
9396
}
9497

98+
namespace {
99+
100+
const StorageCredential* ResolveStorageCredential(
101+
const std::vector<StorageCredential>& credentials, std::string_view location) {
102+
const StorageCredential* best = nullptr;
103+
for (const auto& cred : credentials) {
104+
if (location.starts_with(cred.prefix)) {
105+
if (!best || cred.prefix.size() > best->prefix.size()) {
106+
best = &cred;
107+
}
108+
}
109+
}
110+
return best;
111+
}
112+
113+
std::unordered_map<std::string, std::string> MergeTableProperties(
114+
const std::unordered_map<std::string, std::string>& catalog_props,
115+
const std::unordered_map<std::string, std::string>& table_config,
116+
const std::unordered_map<std::string, std::string>& credential_config) {
117+
auto merged = catalog_props;
118+
for (const auto& [k, v] : table_config) {
119+
merged[k] = v;
120+
}
121+
for (const auto& [k, v] : credential_config) {
122+
merged[k] = v;
123+
}
124+
return merged;
125+
}
126+
127+
} // namespace
128+
129+
Result<std::shared_ptr<FileIO>> ResolveTableFileIO(
130+
const std::shared_ptr<FileIO>& catalog_io,
131+
const std::unordered_map<std::string, std::string>& catalog_props,
132+
const std::string& warehouse, const LoadTableResult& result) {
133+
if (result.config.empty() && result.storage_credentials.empty()) {
134+
return catalog_io;
135+
}
136+
137+
// Merge order: catalog props < table config < storage credentials (highest priority).
138+
const StorageCredential* cred = nullptr;
139+
if (!result.metadata_location.empty()) {
140+
cred = ResolveStorageCredential(result.storage_credentials, result.metadata_location);
141+
}
142+
static const std::unordered_map<std::string, std::string> kEmpty;
143+
auto merged =
144+
MergeTableProperties(catalog_props, result.config, cred ? cred->config : kEmpty);
145+
146+
// Detect FileIO type: explicit io-impl > warehouse scheme > metadata_location scheme.
147+
std::string io_impl;
148+
if (auto it = merged.find(std::string(RestCatalogProperties::kIOImpl.key()));
149+
it != merged.end()) {
150+
io_impl = it->second;
151+
} else if (!warehouse.empty()) {
152+
ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(warehouse));
153+
io_impl = std::string(BuiltinFileIOName(kind));
154+
} else if (!result.metadata_location.empty()) {
155+
ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(result.metadata_location));
156+
io_impl = std::string(BuiltinFileIOName(kind));
157+
} else {
158+
return catalog_io;
159+
}
160+
161+
ICEBERG_ASSIGN_OR_RAISE(auto table_io, FileIORegistry::Load(io_impl, merged));
162+
return std::shared_ptr<FileIO>(std::move(table_io));
163+
}
164+
95165
} // namespace iceberg::rest

src/iceberg/catalog/rest/rest_file_io.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121

2222
#include <cstdint>
2323
#include <memory>
24+
#include <string>
2425
#include <string_view>
26+
#include <unordered_map>
2527

2628
#include "iceberg/catalog/rest/catalog_properties.h"
2729
#include "iceberg/catalog/rest/iceberg_rest_export.h"
30+
#include "iceberg/catalog/rest/types.h"
2831
#include "iceberg/file_io.h"
2932
#include "iceberg/file_io_registry.h"
3033
#include "iceberg/result.h"
@@ -44,4 +47,15 @@ ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind);
4447
ICEBERG_REST_EXPORT Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(
4548
const RestCatalogProperties& config);
4649

50+
/// \brief Resolve a per-table FileIO from a LoadTableResult.
51+
///
52+
/// Merges catalog properties, table config, and the best-matching storage
53+
/// credential (longest prefix match on metadata_location), then creates a
54+
/// per-table FileIO via FileIORegistry. Falls back to \p catalog_io when
55+
/// no per-table config is present or the FileIO type cannot be detected.
56+
ICEBERG_REST_EXPORT Result<std::shared_ptr<FileIO>> ResolveTableFileIO(
57+
const std::shared_ptr<FileIO>& catalog_io,
58+
const std::unordered_map<std::string, std::string>& catalog_props,
59+
const std::string& warehouse, const LoadTableResult& result);
60+
4761
} // namespace iceberg::rest

src/iceberg/catalog/rest/types.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,25 @@ struct ICEBERG_REST_EXPORT CreateTableRequest {
169169
/// \brief An opaque token that allows clients to make use of pagination for list APIs.
170170
using PageToken = std::string;
171171

172+
/// \brief A storage credential returned by the REST catalog server.
173+
///
174+
/// Each credential has a prefix (a location prefix like "s3://bucket/path") and
175+
/// a config map containing cloud-provider-specific properties (e.g., temporary
176+
/// access keys). Clients resolve the best credential via longest prefix match
177+
/// against the table's metadata location.
178+
struct ICEBERG_REST_EXPORT StorageCredential {
179+
std::string prefix;
180+
std::unordered_map<std::string, std::string> config;
181+
182+
bool operator==(const StorageCredential&) const = default;
183+
};
184+
172185
/// \brief Result body for table create/load/register APIs.
173186
struct ICEBERG_REST_EXPORT LoadTableResult {
174187
std::string metadata_location;
175188
std::shared_ptr<TableMetadata> metadata; // required
176189
std::unordered_map<std::string, std::string> config;
177-
// TODO(Li Feiyang): Add std::shared_ptr<StorageCredential> storage_credential;
190+
std::vector<StorageCredential> storage_credentials;
178191

179192
/// \brief Validates the LoadTableResult.
180193
Status Validate() const {

src/iceberg/file_io.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <optional>
2323
#include <string>
2424
#include <string_view>
25+
#include <unordered_map>
2526

2627
#include "iceberg/iceberg_export.h"
2728
#include "iceberg/result.h"
@@ -42,6 +43,14 @@ class ICEBERG_EXPORT FileIO {
4243
FileIO() = default;
4344
virtual ~FileIO() = default;
4445

46+
/// \brief Returns the configuration properties used to initialize this FileIO.
47+
///
48+
/// Engines that need to configure their own storage access (e.g., for credential
49+
/// vending) can read these properties to obtain the resolved credentials.
50+
const std::unordered_map<std::string, std::string>& properties() const {
51+
return properties_;
52+
}
53+
4554
/// \brief Read the content of the file at the given location.
4655
///
4756
/// \param file_location The location of the file to read.
@@ -73,6 +82,10 @@ class ICEBERG_EXPORT FileIO {
7382
virtual Status DeleteFile(const std::string& file_location) {
7483
return NotImplemented("DeleteFile not implemented");
7584
}
85+
86+
private:
87+
friend class FileIORegistry;
88+
std::unordered_map<std::string, std::string> properties_;
7689
};
7790

7891
} // namespace iceberg

src/iceberg/file_io_registry.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include <mutex>
2121
#include <utility>
2222

23+
#include "iceberg/util/macros.h"
24+
2325
namespace iceberg {
2426

2527
namespace {
@@ -55,7 +57,9 @@ Result<std::unique_ptr<FileIO>> FileIORegistry::Load(
5557
}
5658
factory = it->second;
5759
}
58-
return factory(properties);
60+
ICEBERG_ASSIGN_OR_RAISE(auto io, factory(properties));
61+
io->properties_ = properties;
62+
return io;
5963
}
6064

6165
} // namespace iceberg

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ if(ICEBERG_BUILD_REST)
232232
add_rest_iceberg_test(rest_catalog_test
233233
SOURCES
234234
auth_manager_test.cc
235+
credential_vending_test.cc
235236
endpoint_test.cc
236237
rest_file_io_test.cc
237238
rest_json_serde_test.cc

0 commit comments

Comments
 (0)