Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/catalog/rest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(ICEBERG_REST_SOURCES
auth/auth_manager.cc
auth/auth_managers.cc
auth/auth_session.cc
auth/oauth2_util.cc
catalog_properties.cc
endpoint.cc
error_handlers.cc
Expand Down
135 changes: 135 additions & 0 deletions src/iceberg/catalog/rest/auth/auth_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

#include "iceberg/catalog/rest/auth/auth_manager.h"

#include <optional>

#include "iceberg/catalog/rest/auth/auth_manager_internal.h"
#include "iceberg/catalog/rest/auth/auth_properties.h"
#include "iceberg/catalog/rest/auth/auth_session.h"
#include "iceberg/catalog/rest/auth/oauth2_util.h"
#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/transform_util.h"

Expand Down Expand Up @@ -90,4 +94,135 @@ Result<std::unique_ptr<AuthManager>> MakeBasicAuthManager(
return std::make_unique<BasicAuthManager>();
}

/// \brief OAuth2 authentication manager.
///
/// Two-phase init: InitSession fetches and caches a token for the config request;
/// CatalogSession reuses the cached token and enables refresh.
class OAuth2AuthManager : public AuthManager {
public:
Result<std::shared_ptr<AuthSession>> InitSession(
HttpClient& init_client,
const std::unordered_map<std::string, std::string>& properties) override {
// Credential takes priority: fetch a fresh token for the config request.
auto credential_it = properties.find(AuthProperties::kOAuth2Credential);
if (credential_it != properties.end() && !credential_it->second.empty()) {
ICEBERG_ASSIGN_OR_RAISE(auto ctx, ParseOAuth2Context(properties));
auto noop_session = AuthSession::MakeDefault({});
ICEBERG_ASSIGN_OR_RAISE(init_token_response_,
FetchToken(init_client, ctx.token_endpoint, ctx.client_id,
ctx.client_secret, ctx.scope, *noop_session));
return AuthSession::MakeDefault(
{{"Authorization", "Bearer " + init_token_response_->access_token}});
}

auto token_it = properties.find(AuthProperties::kOAuth2Token);
if (token_it != properties.end() && !token_it->second.empty()) {
return AuthSession::MakeDefault({{"Authorization", "Bearer " + token_it->second}});
}

return AuthSession::MakeDefault({});
}

Result<std::shared_ptr<AuthSession>> CatalogSession(
HttpClient& client,
const std::unordered_map<std::string, std::string>& properties) override {
if (init_token_response_.has_value()) {
auto token_response = std::move(*init_token_response_);
init_token_response_.reset();
ICEBERG_ASSIGN_OR_RAISE(auto ctx, ParseOAuth2Context(properties));
return AuthSession::MakeOAuth2(token_response, ctx.token_endpoint, ctx.client_id,
ctx.client_secret, ctx.scope, client);
}

// If token is provided, use it directly.
auto token_it = properties.find(AuthProperties::kOAuth2Token);
if (token_it != properties.end() && !token_it->second.empty()) {
return AuthSession::MakeDefault({{"Authorization", "Bearer " + token_it->second}});
}

// Fetch a new token using client_credentials grant.
auto credential_it = properties.find(AuthProperties::kOAuth2Credential);
if (credential_it != properties.end() && !credential_it->second.empty()) {
ICEBERG_ASSIGN_OR_RAISE(auto ctx, ParseOAuth2Context(properties));
auto noop_session = AuthSession::MakeDefault({});
OAuthTokenResponse token_response;
ICEBERG_ASSIGN_OR_RAISE(token_response,
FetchToken(client, ctx.token_endpoint, ctx.client_id,
ctx.client_secret, ctx.scope, *noop_session));
return AuthSession::MakeOAuth2(token_response, ctx.token_endpoint, ctx.client_id,
ctx.client_secret, ctx.scope, client);
}

return AuthSession::MakeDefault({});
}

// TODO(lishuxu): Override TableSession() to support token exchange (RFC 8693).
// TODO(lishuxu): Override ContextualSession() to support per-context token exchange.

private:
struct OAuth2Context {
std::string client_id;
std::string client_secret;
std::string token_endpoint;
std::string scope;
};

/// \brief Parse credential, token endpoint, and scope from properties.
static Result<OAuth2Context> ParseOAuth2Context(
const std::unordered_map<std::string, std::string>& properties) {
OAuth2Context ctx;

auto credential_it = properties.find(AuthProperties::kOAuth2Credential);
if (credential_it == properties.end() || credential_it->second.empty()) {
return InvalidArgument("OAuth2 authentication requires '{}' property",
AuthProperties::kOAuth2Credential);
}
const auto& credential = credential_it->second;
auto colon_pos = credential.find(':');
if (colon_pos == std::string::npos) {
return InvalidArgument(
"Invalid OAuth2 credential format: expected 'client_id:client_secret'");
}
ctx.client_id = credential.substr(0, colon_pos);
ctx.client_secret = credential.substr(colon_pos + 1);

auto uri_it = properties.find(AuthProperties::kOAuth2ServerUri);
if (uri_it != properties.end() && !uri_it->second.empty()) {
ctx.token_endpoint = uri_it->second;
} else {
// {uri}/v1/oauth/tokens.
auto catalog_uri_it = properties.find(RestCatalogProperties::kUri.key());
if (catalog_uri_it == properties.end() || catalog_uri_it->second.empty()) {
return InvalidArgument(
"OAuth2 authentication requires '{}' or '{}' property to determine "
"token endpoint",
AuthProperties::kOAuth2ServerUri, RestCatalogProperties::kUri.key());
}
std::string_view base = catalog_uri_it->second;
while (!base.empty() && base.back() == '/') {
base.remove_suffix(1);
}
ctx.token_endpoint =
std::string(base) + "/" + std::string(AuthProperties::kOAuth2DefaultTokenPath);
}

ctx.scope = AuthProperties::kOAuth2DefaultScope;
auto scope_it = properties.find(AuthProperties::kOAuth2Scope);
if (scope_it != properties.end() && !scope_it->second.empty()) {
ctx.scope = scope_it->second;
}

return ctx;
}

/// Cached token from InitSession
std::optional<OAuthTokenResponse> init_token_response_;
};

Result<std::unique_ptr<AuthManager>> MakeOAuth2AuthManager(
[[maybe_unused]] std::string_view name,
[[maybe_unused]] const std::unordered_map<std::string, std::string>& properties) {
return std::make_unique<OAuth2AuthManager>();
}

} // namespace iceberg::rest::auth
5 changes: 5 additions & 0 deletions src/iceberg/catalog/rest/auth/auth_manager_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ Result<std::unique_ptr<AuthManager>> MakeBasicAuthManager(
std::string_view name,
const std::unordered_map<std::string, std::string>& properties);

/// \brief Create an OAuth2 authentication manager.
Result<std::unique_ptr<AuthManager>> MakeOAuth2AuthManager(
std::string_view name,
const std::unordered_map<std::string, std::string>& properties);

} // namespace iceberg::rest::auth
1 change: 1 addition & 0 deletions src/iceberg/catalog/rest/auth/auth_managers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ AuthManagerRegistry CreateDefaultRegistry() {
return {
{AuthProperties::kAuthTypeNone, MakeNoopAuthManager},
{AuthProperties::kAuthTypeBasic, MakeBasicAuthManager},
{AuthProperties::kAuthTypeOAuth2, MakeOAuth2AuthManager},
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/catalog/rest/auth/auth_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct AuthProperties {
inline static const std::string kOAuth2TokenRefreshEnabled = "token-refresh-enabled";
/// \brief Default OAuth2 scope for catalog operations.
inline static const std::string kOAuth2DefaultScope = "catalog";
/// \brief Default OAuth2 token endpoint path (relative to catalog URI).
inline static constexpr std::string_view kOAuth2DefaultTokenPath = "v1/oauth/tokens";

/// \brief Property key for SigV4 region.
inline static const std::string kSigV4Region = "rest.auth.sigv4.region";
Expand Down
10 changes: 10 additions & 0 deletions src/iceberg/catalog/rest/auth/auth_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include <utility>

#include "iceberg/catalog/rest/auth/oauth2_util.h"

namespace iceberg::rest::auth {

namespace {
Expand Down Expand Up @@ -49,4 +51,12 @@ std::shared_ptr<AuthSession> AuthSession::MakeDefault(
return std::make_shared<DefaultAuthSession>(std::move(headers));
}

std::shared_ptr<AuthSession> AuthSession::MakeOAuth2(
const OAuthTokenResponse& initial_token, const std::string& /*token_endpoint*/,
const std::string& /*client_id*/, const std::string& /*client_secret*/,
const std::string& /*scope*/, HttpClient& /*client*/) {
// TODO(lishuxu): Create OAuth2AuthSession with auto-refresh support.
return MakeDefault({{"Authorization", "Bearer " + initial_token.access_token}});
}

} // namespace iceberg::rest::auth
22 changes: 22 additions & 0 deletions src/iceberg/catalog/rest/auth/auth_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unordered_map>

#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/type_fwd.h"
#include "iceberg/result.h"

/// \file iceberg/catalog/rest/auth/auth_session.h
Expand Down Expand Up @@ -70,6 +71,27 @@ class ICEBERG_REST_EXPORT AuthSession {
/// \return A new session that adds the given headers to requests.
static std::shared_ptr<AuthSession> MakeDefault(
std::unordered_map<std::string, std::string> headers);

/// \brief Create an OAuth2 session with automatic token refresh.
///
/// This factory method creates a session that holds an access token and
/// optionally a refresh token. When Authenticate() is called and the token
/// is expired, it transparently refreshes the token before setting the
/// Authorization header.
///
/// \param initial_token The initial token response from FetchToken().
/// \param token_endpoint Full URL of the OAuth2 token endpoint for refresh.
/// \param client_id OAuth2 client ID for refresh requests.
/// \param client_secret OAuth2 client secret for re-fetch if refresh fails.
/// \param scope OAuth2 scope for refresh requests.
/// \param client HTTP client for making refresh requests.
/// \return A new session that manages token lifecycle automatically.
static std::shared_ptr<AuthSession> MakeOAuth2(const OAuthTokenResponse& initial_token,
const std::string& token_endpoint,
const std::string& client_id,
const std::string& client_secret,
const std::string& scope,
HttpClient& client);
};

} // namespace iceberg::rest::auth
115 changes: 115 additions & 0 deletions src/iceberg/catalog/rest/auth/oauth2_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/catalog/rest/auth/oauth2_util.h"

#include <nlohmann/json.hpp>

#include "iceberg/catalog/rest/auth/auth_session.h"
#include "iceberg/catalog/rest/error_handlers.h"
#include "iceberg/catalog/rest/http_client.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/util/json_util_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg::rest::auth {

namespace {

constexpr std::string_view kAccessToken = "access_token";
constexpr std::string_view kTokenType = "token_type";
constexpr std::string_view kExpiresIn = "expires_in";
constexpr std::string_view kRefreshToken = "refresh_token";
constexpr std::string_view kScope = "scope";

constexpr std::string_view kGrantType = "grant_type";
constexpr std::string_view kClientCredentials = "client_credentials";
constexpr std::string_view kClientId = "client_id";
constexpr std::string_view kClientSecret = "client_secret";

} // namespace

Status OAuthTokenResponse::Validate() const {
if (access_token.empty()) {
return ValidationFailed("OAuth2 token response missing required 'access_token'");
}
if (token_type.empty()) {
return ValidationFailed("OAuth2 token response missing required 'token_type'");
}
return {};
}

Result<OAuthTokenResponse> OAuthTokenResponseFromJsonString(const std::string& json_str) {
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(json_str));

OAuthTokenResponse response;
ICEBERG_ASSIGN_OR_RAISE(response.access_token,
GetJsonValue<std::string>(json, kAccessToken));
ICEBERG_ASSIGN_OR_RAISE(response.token_type,
GetJsonValue<std::string>(json, kTokenType));
ICEBERG_ASSIGN_OR_RAISE(response.expires_in,
GetJsonValueOrDefault<int64_t>(json, kExpiresIn, 0));
ICEBERG_ASSIGN_OR_RAISE(response.refresh_token,
GetJsonValueOrDefault<std::string>(json, kRefreshToken));
ICEBERG_ASSIGN_OR_RAISE(response.scope,
GetJsonValueOrDefault<std::string>(json, kScope));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

Result<OAuthTokenResponse> FetchToken(HttpClient& client,
const std::string& token_endpoint,
const std::string& client_id,
const std::string& client_secret,
const std::string& scope, AuthSession& session) {
std::unordered_map<std::string, std::string> form_data{
{std::string(kGrantType), std::string(kClientCredentials)},
{std::string(kClientId), client_id},
{std::string(kClientSecret), client_secret},
};
if (!scope.empty()) {
form_data.emplace(std::string(kScope), scope);
}

ICEBERG_ASSIGN_OR_RAISE(auto response,
client.PostForm(token_endpoint, form_data, /*headers=*/{},
*DefaultErrorHandler::Instance(), session));

return OAuthTokenResponseFromJsonString(response.body());
}

Result<OAuthTokenResponse> RefreshToken(HttpClient& client,
const std::string& token_endpoint,
const std::string& client_id,
const std::string& refresh_token,
const std::string& scope, AuthSession& session) {
// TODO(lishuxu): Implement refresh_token grant type.
return NotImplemented("RefreshToken is not yet implemented");
}

Result<OAuthTokenResponse> ExchangeToken(HttpClient& client,
const std::string& token_endpoint,
const std::string& subject_token,
const std::string& subject_token_type,
const std::string& scope, AuthSession& session) {
// TODO(lishuxu): Implement token exchange (RFC 8693).
return NotImplemented("ExchangeToken is not yet implemented");
}

} // namespace iceberg::rest::auth
Loading
Loading