2626
2727#include < nlohmann/json.hpp>
2828
29+ #include " iceberg/catalog/rest/auth/auth_manager.h"
30+ #include " iceberg/catalog/rest/auth/auth_managers.h"
31+ #include " iceberg/catalog/rest/auth/auth_session.h"
2932#include " iceberg/catalog/rest/catalog_properties.h"
3033#include " iceberg/catalog/rest/constant.h"
3134#include " iceberg/catalog/rest/endpoint.h"
@@ -66,12 +69,18 @@ std::unordered_set<Endpoint> GetDefaultEndpoints() {
6669}
6770
6871// / \brief Fetch server config and merge it with client config
69- Result<CatalogConfig> FetchServerConfig (const ResourcePaths& paths,
70- const RestCatalogProperties& current_config) {
72+ Result<CatalogConfig> FetchServerConfig (
73+ const ResourcePaths& paths, const RestCatalogProperties& current_config,
74+ const std::shared_ptr<auth::AuthSession>& session) {
7175 ICEBERG_ASSIGN_OR_RAISE (auto config_path, paths.Config ());
7276 HttpClient client (current_config.ExtractHeaders ());
77+
78+ // Get authentication headers
79+ std::unordered_map<std::string, std::string> auth_headers;
80+ ICEBERG_RETURN_UNEXPECTED (session->Authenticate (auth_headers));
81+
7382 ICEBERG_ASSIGN_OR_RAISE (const auto response,
74- client.Get (config_path, /* params=*/ {}, /* headers= */ {} ,
83+ client.Get (config_path, /* params=*/ {}, auth_headers ,
7584 *DefaultErrorHandler::Instance ()));
7685 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
7786 return CatalogConfigFromJson (json);
@@ -114,11 +123,23 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
114123 if (!file_io) {
115124 return InvalidArgument (" FileIO is required to create RestCatalog" );
116125 }
126+
127+ std::string catalog_name = config.Get (RestCatalogProperties::kName );
128+ ICEBERG_ASSIGN_OR_RAISE (auto auth_manager,
129+ auth::AuthManagers::Load (catalog_name, config.configs ()));
130+
117131 ICEBERG_ASSIGN_OR_RAISE (
118132 auto paths, ResourcePaths::Make (std::string (TrimTrailingSlash (uri)),
119133 config.Get (RestCatalogProperties::kPrefix )));
120- ICEBERG_ASSIGN_OR_RAISE (auto server_config, FetchServerConfig (*paths, config));
121134
135+ // Create init session for fetching server configuration
136+ HttpClient init_client (config.ExtractHeaders ());
137+ ICEBERG_ASSIGN_OR_RAISE (auto init_session,
138+ auth_manager->InitSession (init_client, config.configs ()));
139+ ICEBERG_ASSIGN_OR_RAISE (auto server_config,
140+ FetchServerConfig (*paths, config, init_session));
141+
142+ // Merge client config with server defaults and overrides
122143 std::unique_ptr<RestCatalogProperties> final_config = RestCatalogProperties::FromMap (
123144 MergeConfigs (server_config.defaults , config.configs (), server_config.overrides ));
124145
@@ -139,27 +160,43 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
139160 paths, ResourcePaths::Make (std::string (TrimTrailingSlash (final_uri)),
140161 final_config->Get (RestCatalogProperties::kPrefix )));
141162
142- return std::shared_ptr<RestCatalog>(
143- new RestCatalog (std::move (final_config), std::move (file_io), std::move (paths),
144- std::move (endpoints)));
163+ auto client = std::make_unique<HttpClient>(final_config->ExtractHeaders ());
164+ ICEBERG_ASSIGN_OR_RAISE (auto catalog_session,
165+ auth_manager->CatalogSession (*client, final_config->configs ()));
166+ return std::shared_ptr<RestCatalog>(new RestCatalog (
167+ std::move (final_config), std::move (file_io), std::move (client), std::move (paths),
168+ std::move (endpoints), std::move (auth_manager), std::move (catalog_session)));
145169}
146170
147171RestCatalog::RestCatalog (std::unique_ptr<RestCatalogProperties> config,
148172 std::shared_ptr<FileIO> file_io,
173+ std::unique_ptr<HttpClient> client,
149174 std::unique_ptr<ResourcePaths> paths,
150- std::unordered_set<Endpoint> endpoints)
175+ std::unordered_set<Endpoint> endpoints,
176+ std::unique_ptr<auth::AuthManager> auth_manager,
177+ std::shared_ptr<auth::AuthSession> catalog_session)
151178 : config_(std::move(config)),
152179 file_io_ (std::move(file_io)),
153- client_(std::make_unique<HttpClient>(config_-> ExtractHeaders () )),
180+ client_(std::move(client )),
154181 paths_(std::move(paths)),
155182 name_(config_->Get (RestCatalogProperties::kName )),
156- supported_endpoints_(std::move(endpoints)) {}
183+ supported_endpoints_(std::move(endpoints)),
184+ auth_manager_(std::move(auth_manager)),
185+ catalog_session_(std::move(catalog_session)) {}
157186
158187std::string_view RestCatalog::name () const { return name_; }
159188
189+ Result<std::unordered_map<std::string, std::string>> RestCatalog::AuthHeaders () const {
190+ std::unordered_map<std::string, std::string> headers;
191+ ICEBERG_RETURN_UNEXPECTED (catalog_session_->Authenticate (headers));
192+ return headers;
193+ }
194+
160195Result<std::vector<Namespace>> RestCatalog::ListNamespaces (const Namespace& ns) const {
161196 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::ListNamespaces ());
162197 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Namespaces ());
198+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
199+
163200 std::vector<Namespace> result;
164201 std::string next_token;
165202 while (true ) {
@@ -172,7 +209,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
172209 }
173210 ICEBERG_ASSIGN_OR_RAISE (
174211 const auto response,
175- client_->Get (path, params, /* headers= */ {} , *NamespaceErrorHandler::Instance ()));
212+ client_->Get (path, params, auth_headers , *NamespaceErrorHandler::Instance ()));
176213 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
177214 ICEBERG_ASSIGN_OR_RAISE (auto list_response, ListNamespacesResponseFromJson (json));
178215 result.insert (result.end (), list_response.namespaces .begin (),
@@ -189,10 +226,12 @@ Status RestCatalog::CreateNamespace(
189226 const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
190227 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::CreateNamespace ());
191228 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Namespaces ());
229+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
230+
192231 CreateNamespaceRequest request{.namespace_ = ns, .properties = properties};
193232 ICEBERG_ASSIGN_OR_RAISE (auto json_request, ToJsonString (ToJson (request)));
194233 ICEBERG_ASSIGN_OR_RAISE (const auto response,
195- client_->Post (path, json_request, /* headers= */ {} ,
234+ client_->Post (path, json_request, auth_headers ,
196235 *NamespaceErrorHandler::Instance ()));
197236 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
198237 ICEBERG_ASSIGN_OR_RAISE (auto create_response, CreateNamespaceResponseFromJson (json));
@@ -203,8 +242,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
203242 const Namespace& ns) const {
204243 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::GetNamespaceProperties ());
205244 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Namespace_ (ns));
245+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
246+
206247 ICEBERG_ASSIGN_OR_RAISE (const auto response,
207- client_->Get (path, /* params=*/ {}, /* headers= */ {} ,
248+ client_->Get (path, /* params=*/ {}, auth_headers ,
208249 *NamespaceErrorHandler::Instance ()));
209250 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
210251 ICEBERG_ASSIGN_OR_RAISE (auto get_response, GetNamespaceResponseFromJson (json));
@@ -214,8 +255,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
214255Status RestCatalog::DropNamespace (const Namespace& ns) {
215256 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::DropNamespace ());
216257 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Namespace_ (ns));
258+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
259+
217260 ICEBERG_ASSIGN_OR_RAISE (const auto response,
218- client_->Delete (path, /* params=*/ {}, /* headers= */ {} ,
261+ client_->Delete (path, /* params=*/ {}, auth_headers ,
219262 *DropNamespaceErrorHandler::Instance ()));
220263 return {};
221264}
@@ -227,21 +270,25 @@ Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
227270 }
228271
229272 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Namespace_ (ns));
273+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
274+
230275 return CaptureNoSuchNamespace (
231- client_->Head (path, /* headers= */ {} , *NamespaceErrorHandler::Instance ()));
276+ client_->Head (path, auth_headers , *NamespaceErrorHandler::Instance ()));
232277}
233278
234279Status RestCatalog::UpdateNamespaceProperties (
235280 const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
236281 const std::unordered_set<std::string>& removals) {
237282 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::UpdateNamespace ());
238283 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->NamespaceProperties (ns));
284+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
285+
239286 UpdateNamespacePropertiesRequest request{
240287 .removals = std::vector<std::string>(removals.begin (), removals.end ()),
241288 .updates = updates};
242289 ICEBERG_ASSIGN_OR_RAISE (auto json_request, ToJsonString (ToJson (request)));
243290 ICEBERG_ASSIGN_OR_RAISE (const auto response,
244- client_->Post (path, json_request, /* headers= */ {} ,
291+ client_->Post (path, json_request, auth_headers ,
245292 *NamespaceErrorHandler::Instance ()));
246293 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
247294 ICEBERG_ASSIGN_OR_RAISE (auto update_response,
@@ -251,8 +298,9 @@ Status RestCatalog::UpdateNamespaceProperties(
251298
252299Result<std::vector<TableIdentifier>> RestCatalog::ListTables (const Namespace& ns) const {
253300 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::ListTables ());
254-
255301 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Tables (ns));
302+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
303+
256304 std::vector<TableIdentifier> result;
257305 std::string next_token;
258306 while (true ) {
@@ -262,7 +310,7 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns
262310 }
263311 ICEBERG_ASSIGN_OR_RAISE (
264312 const auto response,
265- client_->Get (path, params, /* headers= */ {} , *TableErrorHandler::Instance ()));
313+ client_->Get (path, params, auth_headers , *TableErrorHandler::Instance ()));
266314 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
267315 ICEBERG_ASSIGN_OR_RAISE (auto list_response, ListTablesResponseFromJson (json));
268316 result.insert (result.end (), list_response.identifiers .begin (),
@@ -282,6 +330,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
282330 const std::unordered_map<std::string, std::string>& properties, bool stage_create) {
283331 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::CreateTable ());
284332 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Tables (identifier.ns ));
333+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
285334
286335 CreateTableRequest request{
287336 .name = identifier.name ,
@@ -296,7 +345,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
296345 ICEBERG_ASSIGN_OR_RAISE (auto json_request, ToJsonString (ToJson (request)));
297346 ICEBERG_ASSIGN_OR_RAISE (
298347 const auto response,
299- client_->Post (path, json_request, /* headers= */ {} , *TableErrorHandler::Instance ()));
348+ client_->Post (path, json_request, auth_headers , *TableErrorHandler::Instance ()));
300349
301350 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
302351 return LoadTableResultFromJson (json);
@@ -320,6 +369,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
320369 const std::vector<std::unique_ptr<TableUpdate>>& updates) {
321370 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::UpdateTable ());
322371 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Table (identifier));
372+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
323373
324374 CommitTableRequest request{.identifier = identifier};
325375 request.requirements .reserve (requirements.size ());
@@ -334,7 +384,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
334384 ICEBERG_ASSIGN_OR_RAISE (auto json_request, ToJsonString (ToJson (request)));
335385 ICEBERG_ASSIGN_OR_RAISE (
336386 const auto response,
337- client_->Post (path, json_request, /* headers= */ {} , *TableErrorHandler::Instance ()));
387+ client_->Post (path, json_request, auth_headers , *TableErrorHandler::Instance ()));
338388
339389 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
340390 ICEBERG_ASSIGN_OR_RAISE (auto commit_response, CommitTableResponseFromJson (json));
@@ -363,14 +413,15 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
363413Status RestCatalog::DropTable (const TableIdentifier& identifier, bool purge) {
364414 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::DeleteTable ());
365415 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Table (identifier));
416+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
366417
367418 std::unordered_map<std::string, std::string> params;
368419 if (purge) {
369420 params[" purgeRequested" ] = " true" ;
370421 }
371422 ICEBERG_ASSIGN_OR_RAISE (
372423 const auto response,
373- client_->Delete (path, params, /* headers= */ {} , *TableErrorHandler::Instance ()));
424+ client_->Delete (path, params, auth_headers , *TableErrorHandler::Instance ()));
374425 return {};
375426}
376427
@@ -381,19 +432,22 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
381432 }
382433
383434 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Table (identifier));
435+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
436+
384437 return CaptureNoSuchTable (
385- client_->Head (path, /* headers= */ {} , *TableErrorHandler::Instance ()));
438+ client_->Head (path, auth_headers , *TableErrorHandler::Instance ()));
386439}
387440
388441Status RestCatalog::RenameTable (const TableIdentifier& from, const TableIdentifier& to) {
389442 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::RenameTable ());
390443 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Rename ());
444+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
391445
392446 RenameTableRequest request{.source = from, .destination = to};
393447 ICEBERG_ASSIGN_OR_RAISE (auto json_request, ToJsonString (ToJson (request)));
394448 ICEBERG_ASSIGN_OR_RAISE (
395449 const auto response,
396- client_->Post (path, json_request, /* headers= */ {} , *TableErrorHandler::Instance ()));
450+ client_->Post (path, json_request, auth_headers , *TableErrorHandler::Instance ()));
397451
398452 return {};
399453}
@@ -402,9 +456,11 @@ Result<std::string> RestCatalog::LoadTableInternal(
402456 const TableIdentifier& identifier) const {
403457 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::LoadTable ());
404458 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Table (identifier));
459+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
460+
405461 ICEBERG_ASSIGN_OR_RAISE (
406462 const auto response,
407- client_->Get (path, /* params=*/ {}, /* headers= */ {} , *TableErrorHandler::Instance ()));
463+ client_->Get (path, /* params=*/ {}, auth_headers , *TableErrorHandler::Instance ()));
408464 return response.body ();
409465}
410466
@@ -422,6 +478,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
422478 const TableIdentifier& identifier, const std::string& metadata_file_location) {
423479 ICEBERG_ENDPOINT_CHECK (supported_endpoints_, Endpoint::RegisterTable ());
424480 ICEBERG_ASSIGN_OR_RAISE (auto path, paths_->Register (identifier.ns ));
481+ ICEBERG_ASSIGN_OR_RAISE (auto auth_headers, AuthHeaders ());
425482
426483 RegisterTableRequest request{
427484 .name = identifier.name ,
@@ -431,7 +488,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
431488 ICEBERG_ASSIGN_OR_RAISE (auto json_request, ToJsonString (ToJson (request)));
432489 ICEBERG_ASSIGN_OR_RAISE (
433490 const auto response,
434- client_->Post (path, json_request, /* headers= */ {} , *TableErrorHandler::Instance ()));
491+ client_->Post (path, json_request, auth_headers , *TableErrorHandler::Instance ()));
435492
436493 ICEBERG_ASSIGN_OR_RAISE (auto json, FromJsonString (response.body ()));
437494 ICEBERG_ASSIGN_OR_RAISE (auto load_result, LoadTableResultFromJson (json));
0 commit comments