Skip to content

Commit 23ea120

Browse files
author
shuxu.li
committed
feat: Implement NoopAuthManager and integrate AuthManager into RestCatalog
1 parent 2e00ce0 commit 23ea120

File tree

5 files changed

+269
-30
lines changed

5 files changed

+269
-30
lines changed

src/iceberg/catalog/rest/auth/auth_managers.cc

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
#include "iceberg/catalog/rest/auth/auth_managers.h"
2121

22+
#include <unordered_set>
23+
2224
#include "iceberg/catalog/rest/auth/auth_properties.h"
25+
#include "iceberg/catalog/rest/auth/auth_session.h"
2326
#include "iceberg/util/string_util.h"
2427

2528
namespace iceberg::rest::auth {
@@ -30,6 +33,17 @@ namespace {
3033
using AuthManagerRegistry =
3134
std::unordered_map<std::string, AuthManagerFactory, StringHash, StringEqual>;
3235

36+
/// \brief Known authentication types that are defined in the Iceberg spec.
37+
const std::unordered_set<std::string, StringHash, StringEqual>& KnownAuthTypes() {
38+
static const std::unordered_set<std::string, StringHash, StringEqual> types = {
39+
AuthProperties::kAuthTypeNone,
40+
AuthProperties::kAuthTypeBasic,
41+
AuthProperties::kAuthTypeOAuth2,
42+
AuthProperties::kAuthTypeSigV4,
43+
};
44+
return types;
45+
}
46+
3347
// Infer the authentication type from properties.
3448
std::string InferAuthType(
3549
const std::unordered_map<std::string, std::string>& properties) {
@@ -48,9 +62,30 @@ std::string InferAuthType(
4862
return AuthProperties::kAuthTypeNone;
4963
}
5064

65+
/// \brief Authentication manager that performs no authentication.
66+
class NoopAuthManager : public AuthManager {
67+
public:
68+
Result<std::shared_ptr<AuthSession>> CatalogSession(
69+
[[maybe_unused]] HttpClient& shared_client,
70+
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties)
71+
override {
72+
return AuthSession::MakeDefault({});
73+
}
74+
};
75+
5176
// Get the global registry of auth manager factories.
5277
AuthManagerRegistry& GetRegistry() {
53-
static AuthManagerRegistry registry;
78+
static AuthManagerRegistry registry = [] {
79+
AuthManagerRegistry r;
80+
// Register built-in "none" auth manager
81+
r[AuthProperties::kAuthTypeNone] =
82+
[]([[maybe_unused]] std::string_view name,
83+
[[maybe_unused]] const std::unordered_map<std::string, std::string>& props)
84+
-> Result<std::unique_ptr<AuthManager>> {
85+
return std::make_unique<NoopAuthManager>();
86+
};
87+
return r;
88+
}();
5489
return registry;
5590
}
5691

@@ -68,8 +103,10 @@ Result<std::unique_ptr<AuthManager>> AuthManagers::Load(
68103
auto& registry = GetRegistry();
69104
auto it = registry.find(auth_type);
70105
if (it == registry.end()) {
71-
// TODO(Li Shuxu): Fallback to default auth manager implementations
72-
return NotImplemented("Authentication type '{}' is not supported", auth_type);
106+
if (KnownAuthTypes().contains(auth_type)) {
107+
return NotImplemented("Authentication type '{}' is not yet supported", auth_type);
108+
}
109+
return InvalidArgument("Unknown authentication type: '{}'", auth_type);
73110
}
74111

75112
return it->second(name, properties);

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626

2727
#include <nlohmann/json.hpp>
2828

29+
#include "iceberg/catalog/rest/auth/auth_manager.h"
30+
#include "iceberg/catalog/rest/auth/auth_managers.h"
31+
#include "iceberg/catalog/rest/auth/auth_session.h"
2932
#include "iceberg/catalog/rest/catalog_properties.h"
3033
#include "iceberg/catalog/rest/constant.h"
3134
#include "iceberg/catalog/rest/endpoint.h"
@@ -66,12 +69,18 @@ std::unordered_set<Endpoint> GetDefaultEndpoints() {
6669
}
6770

6871
/// \brief Fetch server config and merge it with client config
69-
Result<CatalogConfig> FetchServerConfig(const ResourcePaths& paths,
70-
const RestCatalogProperties& current_config) {
72+
Result<CatalogConfig> FetchServerConfig(
73+
const ResourcePaths& paths, const RestCatalogProperties& current_config,
74+
const std::shared_ptr<auth::AuthSession>& session) {
7175
ICEBERG_ASSIGN_OR_RAISE(auto config_path, paths.Config());
7276
HttpClient client(current_config.ExtractHeaders());
77+
78+
// Get authentication headers
79+
std::unordered_map<std::string, std::string> auth_headers;
80+
ICEBERG_RETURN_UNEXPECTED(session->Authenticate(auth_headers));
81+
7382
ICEBERG_ASSIGN_OR_RAISE(const auto response,
74-
client.Get(config_path, /*params=*/{}, /*headers=*/{},
83+
client.Get(config_path, /*params=*/{}, auth_headers,
7584
*DefaultErrorHandler::Instance()));
7685
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
7786
return CatalogConfigFromJson(json);
@@ -114,11 +123,23 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
114123
if (!file_io) {
115124
return InvalidArgument("FileIO is required to create RestCatalog");
116125
}
126+
127+
std::string catalog_name = config.Get(RestCatalogProperties::kName);
128+
ICEBERG_ASSIGN_OR_RAISE(auto auth_manager,
129+
auth::AuthManagers::Load(catalog_name, config.configs()));
130+
117131
ICEBERG_ASSIGN_OR_RAISE(
118132
auto paths, ResourcePaths::Make(std::string(TrimTrailingSlash(uri)),
119133
config.Get(RestCatalogProperties::kPrefix)));
120-
ICEBERG_ASSIGN_OR_RAISE(auto server_config, FetchServerConfig(*paths, config));
121134

135+
// Create init session for fetching server configuration
136+
HttpClient init_client(config.ExtractHeaders());
137+
ICEBERG_ASSIGN_OR_RAISE(auto init_session,
138+
auth_manager->InitSession(init_client, config.configs()));
139+
ICEBERG_ASSIGN_OR_RAISE(auto server_config,
140+
FetchServerConfig(*paths, config, init_session));
141+
142+
// Merge client config with server defaults and overrides
122143
std::unique_ptr<RestCatalogProperties> final_config = RestCatalogProperties::FromMap(
123144
MergeConfigs(server_config.defaults, config.configs(), server_config.overrides));
124145

@@ -139,27 +160,43 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
139160
paths, ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
140161
final_config->Get(RestCatalogProperties::kPrefix)));
141162

142-
return std::shared_ptr<RestCatalog>(
143-
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),
144-
std::move(endpoints)));
163+
auto client = std::make_unique<HttpClient>(final_config->ExtractHeaders());
164+
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
165+
auth_manager->CatalogSession(*client, final_config->configs()));
166+
return std::shared_ptr<RestCatalog>(new RestCatalog(
167+
std::move(final_config), std::move(file_io), std::move(client), std::move(paths),
168+
std::move(endpoints), std::move(auth_manager), std::move(catalog_session)));
145169
}
146170

147171
RestCatalog::RestCatalog(std::unique_ptr<RestCatalogProperties> config,
148172
std::shared_ptr<FileIO> file_io,
173+
std::unique_ptr<HttpClient> client,
149174
std::unique_ptr<ResourcePaths> paths,
150-
std::unordered_set<Endpoint> endpoints)
175+
std::unordered_set<Endpoint> endpoints,
176+
std::unique_ptr<auth::AuthManager> auth_manager,
177+
std::shared_ptr<auth::AuthSession> catalog_session)
151178
: config_(std::move(config)),
152179
file_io_(std::move(file_io)),
153-
client_(std::make_unique<HttpClient>(config_->ExtractHeaders())),
180+
client_(std::move(client)),
154181
paths_(std::move(paths)),
155182
name_(config_->Get(RestCatalogProperties::kName)),
156-
supported_endpoints_(std::move(endpoints)) {}
183+
supported_endpoints_(std::move(endpoints)),
184+
auth_manager_(std::move(auth_manager)),
185+
catalog_session_(std::move(catalog_session)) {}
157186

158187
std::string_view RestCatalog::name() const { return name_; }
159188

189+
Result<std::unordered_map<std::string, std::string>> RestCatalog::AuthHeaders() const {
190+
std::unordered_map<std::string, std::string> headers;
191+
ICEBERG_RETURN_UNEXPECTED(catalog_session_->Authenticate(headers));
192+
return headers;
193+
}
194+
160195
Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns) const {
161196
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListNamespaces());
162197
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
198+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
199+
163200
std::vector<Namespace> result;
164201
std::string next_token;
165202
while (true) {
@@ -172,7 +209,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
172209
}
173210
ICEBERG_ASSIGN_OR_RAISE(
174211
const auto response,
175-
client_->Get(path, params, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
212+
client_->Get(path, params, auth_headers, *NamespaceErrorHandler::Instance()));
176213
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
177214
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListNamespacesResponseFromJson(json));
178215
result.insert(result.end(), list_response.namespaces.begin(),
@@ -189,10 +226,12 @@ Status RestCatalog::CreateNamespace(
189226
const Namespace& ns, const std::unordered_map<std::string, std::string>& properties) {
190227
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateNamespace());
191228
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespaces());
229+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
230+
192231
CreateNamespaceRequest request{.namespace_ = ns, .properties = properties};
193232
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
194233
ICEBERG_ASSIGN_OR_RAISE(const auto response,
195-
client_->Post(path, json_request, /*headers=*/{},
234+
client_->Post(path, json_request, auth_headers,
196235
*NamespaceErrorHandler::Instance()));
197236
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
198237
ICEBERG_ASSIGN_OR_RAISE(auto create_response, CreateNamespaceResponseFromJson(json));
@@ -203,8 +242,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
203242
const Namespace& ns) const {
204243
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::GetNamespaceProperties());
205244
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
245+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
246+
206247
ICEBERG_ASSIGN_OR_RAISE(const auto response,
207-
client_->Get(path, /*params=*/{}, /*headers=*/{},
248+
client_->Get(path, /*params=*/{}, auth_headers,
208249
*NamespaceErrorHandler::Instance()));
209250
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
210251
ICEBERG_ASSIGN_OR_RAISE(auto get_response, GetNamespaceResponseFromJson(json));
@@ -214,8 +255,10 @@ Result<std::unordered_map<std::string, std::string>> RestCatalog::GetNamespacePr
214255
Status RestCatalog::DropNamespace(const Namespace& ns) {
215256
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DropNamespace());
216257
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
258+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
259+
217260
ICEBERG_ASSIGN_OR_RAISE(const auto response,
218-
client_->Delete(path, /*params=*/{}, /*headers=*/{},
261+
client_->Delete(path, /*params=*/{}, auth_headers,
219262
*DropNamespaceErrorHandler::Instance()));
220263
return {};
221264
}
@@ -227,21 +270,25 @@ Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
227270
}
228271

229272
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
273+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
274+
230275
return CaptureNoSuchNamespace(
231-
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance()));
276+
client_->Head(path, auth_headers, *NamespaceErrorHandler::Instance()));
232277
}
233278

234279
Status RestCatalog::UpdateNamespaceProperties(
235280
const Namespace& ns, const std::unordered_map<std::string, std::string>& updates,
236281
const std::unordered_set<std::string>& removals) {
237282
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateNamespace());
238283
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->NamespaceProperties(ns));
284+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
285+
239286
UpdateNamespacePropertiesRequest request{
240287
.removals = std::vector<std::string>(removals.begin(), removals.end()),
241288
.updates = updates};
242289
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
243290
ICEBERG_ASSIGN_OR_RAISE(const auto response,
244-
client_->Post(path, json_request, /*headers=*/{},
291+
client_->Post(path, json_request, auth_headers,
245292
*NamespaceErrorHandler::Instance()));
246293
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
247294
ICEBERG_ASSIGN_OR_RAISE(auto update_response,
@@ -251,8 +298,9 @@ Status RestCatalog::UpdateNamespaceProperties(
251298

252299
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
253300
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListTables());
254-
255301
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
302+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
303+
256304
std::vector<TableIdentifier> result;
257305
std::string next_token;
258306
while (true) {
@@ -262,7 +310,7 @@ Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns
262310
}
263311
ICEBERG_ASSIGN_OR_RAISE(
264312
const auto response,
265-
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
313+
client_->Get(path, params, auth_headers, *TableErrorHandler::Instance()));
266314
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
267315
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListTablesResponseFromJson(json));
268316
result.insert(result.end(), list_response.identifiers.begin(),
@@ -282,6 +330,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
282330
const std::unordered_map<std::string, std::string>& properties, bool stage_create) {
283331
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
284332
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(identifier.ns));
333+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
285334

286335
CreateTableRequest request{
287336
.name = identifier.name,
@@ -296,7 +345,7 @@ Result<LoadTableResult> RestCatalog::CreateTableInternal(
296345
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
297346
ICEBERG_ASSIGN_OR_RAISE(
298347
const auto response,
299-
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
348+
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));
300349

301350
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
302351
return LoadTableResultFromJson(json);
@@ -320,6 +369,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
320369
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
321370
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateTable());
322371
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
372+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
323373

324374
CommitTableRequest request{.identifier = identifier};
325375
request.requirements.reserve(requirements.size());
@@ -334,7 +384,7 @@ Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
334384
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
335385
ICEBERG_ASSIGN_OR_RAISE(
336386
const auto response,
337-
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
387+
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));
338388

339389
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
340390
ICEBERG_ASSIGN_OR_RAISE(auto commit_response, CommitTableResponseFromJson(json));
@@ -363,14 +413,15 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
363413
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
364414
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::DeleteTable());
365415
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
416+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
366417

367418
std::unordered_map<std::string, std::string> params;
368419
if (purge) {
369420
params["purgeRequested"] = "true";
370421
}
371422
ICEBERG_ASSIGN_OR_RAISE(
372423
const auto response,
373-
client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
424+
client_->Delete(path, params, auth_headers, *TableErrorHandler::Instance()));
374425
return {};
375426
}
376427

@@ -381,19 +432,22 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
381432
}
382433

383434
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
435+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
436+
384437
return CaptureNoSuchTable(
385-
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
438+
client_->Head(path, auth_headers, *TableErrorHandler::Instance()));
386439
}
387440

388441
Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
389442
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RenameTable());
390443
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Rename());
444+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
391445

392446
RenameTableRequest request{.source = from, .destination = to};
393447
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
394448
ICEBERG_ASSIGN_OR_RAISE(
395449
const auto response,
396-
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
450+
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));
397451

398452
return {};
399453
}
@@ -402,9 +456,11 @@ Result<std::string> RestCatalog::LoadTableInternal(
402456
const TableIdentifier& identifier) const {
403457
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::LoadTable());
404458
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
459+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
460+
405461
ICEBERG_ASSIGN_OR_RAISE(
406462
const auto response,
407-
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
463+
client_->Get(path, /*params=*/{}, auth_headers, *TableErrorHandler::Instance()));
408464
return response.body();
409465
}
410466

@@ -422,6 +478,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
422478
const TableIdentifier& identifier, const std::string& metadata_file_location) {
423479
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
424480
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));
481+
ICEBERG_ASSIGN_OR_RAISE(auto auth_headers, AuthHeaders());
425482

426483
RegisterTableRequest request{
427484
.name = identifier.name,
@@ -431,7 +488,7 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
431488
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
432489
ICEBERG_ASSIGN_OR_RAISE(
433490
const auto response,
434-
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
491+
client_->Post(path, json_request, auth_headers, *TableErrorHandler::Instance()));
435492

436493
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
437494
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));

0 commit comments

Comments
 (0)