Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,94 @@ Specific headers defined by the RESTCatalog spec include:
| ------------------------------------ | ------------------------------------- | -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `header.X-Iceberg-Access-Delegation` | `{vended-credentials,remote-signing}` | `vended-credentials` | Signal to the server that the client supports delegated access via a comma-separated list of access mechanisms. The server may choose to supply access via any or none of the requested mechanisms |

#### Authentication in RESTCatalog

The RESTCatalog supports pluggable authentication via the `auth` configuration block. This allows you to specify which how the access token will be fetched and managed for use with the HTTP requests to the RESTCatalog server. The authentication method is selected by setting the `auth.type` property, and additional configuration can be provided as needed for each method.

##### Supported Authentication Types

- `noop`: No authentication (no Authorization header sent).
- `basic`: HTTP Basic authentication.
- `oauth2`: OAuth2 client credentials flow.
- `legacyoauth2`: Legacy OAuth2 client credentials flow (Deprecated and will be removed in PyIceberg 1.0.0)
- `custom`: Custom authentication manager (requires `auth.impl`).

##### Configuration Properties

The `auth` block is structured as follows:

```yaml
catalog:
default:
type: rest
uri: http://rest-catalog/ws/
auth:
type: <auth_type>
<auth_type>:
# Type-specific configuration
impl: <custom_class_path> # Only for custom auth
```

**Property Reference:**

| Property | Required | Description |
|------------------|----------|-------------------------------------------------------------------------------------------------|
| `auth.type` | Yes | The authentication type to use (`noop`, `basic`, `oauth2`, or `custom`). |
| `auth.impl` | Conditionally | The fully qualified class path for a custom AuthManager. Required if `auth.type` is `custom`. |
| `auth.basic` | If type is `basic` | Block containing `username` and `password` for HTTP Basic authentication. |
| `auth.oauth2` | If type is `oauth2` | Block containing OAuth2 configuration (see below). |
| `auth.custom` | If type is `custom` | Block containing configuration for the custom AuthManager. |

##### Examples

**No Authentication:**

```yaml
auth:
type: noop
```

**Basic Authentication:**

```yaml
auth:
type: basic
basic:
username: myuser
password: mypass
```

**OAuth2 Authentication:**

```yaml
auth:
type: oauth2
oauth2:
client_id: my-client-id
client_secret: my-client-secret
token_url: https://auth.example.com/oauth/token
scope: read
refresh_margin: 60 # (optional) seconds before expiry to refresh
expires_in: 3600 # (optional) fallback if server does not provide
```

**Custom Authentication:**

```yaml
auth:
type: custom
impl: mypackage.module.MyAuthManager
custom:
property1: value1
property2: value2
```

##### Notes

- If `auth.type` is `custom`, you **must** specify `auth.impl` with the full class path to your custom AuthManager.
- If `auth.type` is not `custom`, specifying `auth.impl` is not allowed.
- The configuration block under each type (e.g., `basic`, `oauth2`, `custom`) is passed as keyword arguments to the corresponding AuthManager.

### SQL Catalog

The SQL catalog requires a database for its backend. PyIceberg supports PostgreSQL and SQLite through psycopg2. The database connection has to be configured using the `uri` property. The init_catalog_tables is optional and defaults to True. If it is set to False, the catalog tables will not be created when the SQLCatalog is initialized. See SQLAlchemy's [documentation for URL format](https://docs.sqlalchemy.org/en/20/core/engines.html#backend-specific-urls):
Expand Down
20 changes: 19 additions & 1 deletion pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ class IdentifierKind(Enum):
SIGV4_SERVICE = "rest.signing-name"
OAUTH2_SERVER_URI = "oauth2-server-uri"
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
AUTH = "auth"
CUSTOM = "custom"

NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)

Expand Down Expand Up @@ -247,7 +249,23 @@ def _create_session(self) -> Session:
elif ssl_client_cert := ssl_client.get(CERT):
session.cert = ssl_client_cert

session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session))
if auth_config := self.properties.get(AUTH):
auth_type = auth_config.get("type")
if auth_type is None:
raise ValueError("auth.type must be defined")
auth_type_config = auth_config.get(auth_type, {})
auth_impl = auth_config.get("impl")

if auth_type is CUSTOM and not auth_impl:
raise ValueError("auth.impl must be specified when using custom auth.type")

if auth_type is not CUSTOM and auth_impl:
raise ValueError("auth.impl can only be specified when using custom auth.type")

session.auth = AuthManagerAdapter(AuthManagerFactory.create(auth_impl or auth_type, auth_type_config))
else:
session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session))

# Set HTTP headers
self._config_headers(session)

Expand Down
103 changes: 103 additions & 0 deletions pyiceberg/catalog/rest/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

import base64
import importlib
import threading
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type

import requests
from requests import HTTPError, PreparedRequest, Session
from requests.auth import AuthBase

Expand All @@ -42,11 +45,15 @@ def auth_header(self) -> Optional[str]:


class NoopAuthManager(AuthManager):
"""Auth Manager implementation with no auth."""

def auth_header(self) -> Optional[str]:
return None


class BasicAuthManager(AuthManager):
"""AuthManager implementation that supports basic password auth."""

def __init__(self, username: str, password: str):
credentials = f"{username}:{password}"
self._token = base64.b64encode(credentials.encode()).decode()
Expand All @@ -56,6 +63,12 @@ def auth_header(self) -> str:


class LegacyOAuth2AuthManager(AuthManager):
"""Legacy OAuth2 AuthManager implementation.

This class exists for backward compatibility, and will be removed in
PyIceberg 1.0.0 in favor of OAuth2AuthManager.
"""

_session: Session
_auth_url: Optional[str]
_token: Optional[str]
Expand Down Expand Up @@ -109,6 +122,95 @@ def auth_header(self) -> str:
return f"Bearer {self._token}"


class OAuth2TokenProvider:
"""Thread-safe OAuth2 token provider with token refresh support."""

client_id: str
client_secret: str
token_url: str
scope: Optional[str]
refresh_margin: int
expires_in: Optional[int]

_token: Optional[str]
_expires_at: int
_lock: threading.Lock

def __init__(
self,
client_id: str,
client_secret: str,
token_url: str,
scope: Optional[str] = None,
refresh_margin: int = 60,
expires_in: Optional[int] = None,
):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.scope = scope
self.refresh_margin = refresh_margin
self.expires_in = expires_in

self._token = None
self._expires_at = 0
self._lock = threading.Lock()

def _refresh_token(self) -> None:
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
}
if self.scope:
data["scope"] = self.scope

response = requests.post(self.token_url, data=data)
response.raise_for_status()
result = response.json()

self._token = result["access_token"]
expires_in = result.get("expires_in", self.expires_in)
if expires_in is None:
raise ValueError(
"The expiration time of the Token must be provided by the Server in the Access Token Response in `expired_in` field, or by the PyIceberg Client."
)
self._expires_at = time.time() + expires_in - self.refresh_margin

def get_token(self) -> str:
with self._lock:
if not self._token or time.time() >= self._expires_at:
self._refresh_token()
if self._token is None:
raise ValueError("Authorization token is None after refresh")
return self._token


class OAuth2AuthManager(AuthManager):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any tests for LegacyOAuth2AuthManager? do we want OAuth2AuthManager to be feature parity in this first release?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see credential, resource, and audience

| credential | client_id:client_secret | Credential to use for OAuth2 credential flow when initializing the catalog |
| scope | openid offline corpds:ds:profile | Desired scope of the requested security token (default : catalog) |
| resource | rest_catalog.iceberg.com | URI for the target resource or service |
| audience | rest_catalog | Logical name of target resource or service |

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are using client_id and client_secret instead in the current implementation, as opposed to credential. This is also currently in draft mode, and I intend to review OAuth2 spec a little bit more in depth and other industry standard implementations before finalizing the implementation.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I researched the IETF RFCs on OAuth closely, and my understanding is that resource and audience parameters were introduced in subsequent RFCs as supplemental parameters. What do you think about merging this version of the OAuth2AuthManager that closely follows the initial RFC6749 and introducing those as followup PRs?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense! thanks for looking into it

"""Auth Manager implementation that supports OAuth2 as defined in IETF RFC6749."""

def __init__(
self,
client_id: str,
client_secret: str,
token_url: str,
scope: Optional[str] = None,
refresh_margin: int = 60,
expires_in: Optional[int] = None,
):
self.token_provider = OAuth2TokenProvider(
client_id,
client_secret,
token_url,
scope,
refresh_margin,
expires_in,
)

def auth_header(self) -> str:
return f"Bearer {self.token_provider.get_token()}"


class AuthManagerAdapter(AuthBase):
"""A `requests.auth.AuthBase` adapter that integrates an `AuthManager` into a `requests.Session` to automatically attach the appropriate Authorization header to every request.

Expand Down Expand Up @@ -187,3 +289,4 @@ def create(cls, class_or_name: str, config: Dict[str, Any]) -> AuthManager:
AuthManagerFactory.register("noop", NoopAuthManager)
AuthManagerFactory.register("basic", BasicAuthManager)
AuthManagerFactory.register("legacyoauth2", LegacyOAuth2AuthManager)
AuthManagerFactory.register("oauth2", OAuth2AuthManager)
Loading