Skip to content

Commit 321413a

Browse files
committed
Load table credentials from credentials endpoint
1 parent a38bbe3 commit 321413a

2 files changed

Lines changed: 133 additions & 13 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from typing import (
2222
TYPE_CHECKING,
2323
Any,
24+
cast,
2425
)
2526
from urllib.parse import quote, unquote
2627

@@ -233,6 +234,7 @@ class ScanPlanningMode(Enum):
233234

234235

235236
ACCESS_DELEGATION_DEFAULT = "vended-credentials"
237+
ACCESS_DELEGATION_HEADER = "X-Iceberg-Access-Delegation"
236238
AUTHORIZATION_HEADER = "Authorization"
237239
BEARER_PREFIX = "Bearer"
238240
CATALOG_SCOPE = "catalog"
@@ -486,6 +488,40 @@ def _resolve_storage_credentials(storage_credentials: list[StorageCredential], l
486488

487489
return best_match.config if best_match else {}
488490

491+
def _should_load_credentials_from_endpoint(self) -> bool:
492+
if Capability.V1_LOAD_CREDENTIALS not in self._supported_endpoints:
493+
return False
494+
495+
access_delegation = cast(str, self._session.headers.get(ACCESS_DELEGATION_HEADER, ""))
496+
# The spec encodes access delegation as a comma-separated list of mechanisms.
497+
# Load credentials only when vended-credentials is requested.
498+
return any(delegation.strip().lower() == ACCESS_DELEGATION_DEFAULT for delegation in access_delegation.split(","))
499+
500+
def _resolve_table_credentials_from_response_or_endpoint(
501+
self, identifier_tuple: tuple[str, ...], table_response: TableResponse
502+
) -> Properties:
503+
"""Resolve credentials from the table response or load them from the credentials endpoint.
504+
505+
Inline storage-credentials from the table response take precedence. When the response
506+
does not include credentials, the catalog advertises loadCredentials, and vended
507+
credentials are requested, load credentials from the table's /credentials endpoint.
508+
"""
509+
location = table_response.metadata_location or table_response.metadata.location
510+
# Java keeps storage credentials with FileIO for path-level selection. PyIceberg resolves
511+
# them to FileIO properties here, so it needs a location to choose a matching prefix.
512+
if not location:
513+
return {}
514+
515+
credential_config = self._resolve_storage_credentials(table_response.storage_credentials, location)
516+
if table_response.storage_credentials:
517+
return credential_config
518+
519+
if not self._should_load_credentials_from_endpoint():
520+
return {}
521+
522+
credentials_response = self._load_credentials(identifier_tuple)
523+
return self._resolve_storage_credentials(credentials_response.storage_credentials, location)
524+
489525
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
490526
merged_properties = {**self.properties, **properties}
491527
if self._auth_manager:
@@ -826,10 +862,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin
826862
session.mount(self.uri, SigV4Adapter(**self.properties))
827863

828864
def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
829-
# Per Iceberg spec: storage-credentials take precedence over config
830-
credential_config = self._resolve_storage_credentials(
831-
table_response.storage_credentials, table_response.metadata_location
832-
)
865+
credential_config = self._resolve_table_credentials_from_response_or_endpoint(identifier_tuple, table_response)
833866
return Table(
834867
identifier=identifier_tuple,
835868
metadata_location=table_response.metadata_location, # type: ignore
@@ -878,7 +911,7 @@ def _config_headers(self, session: Session) -> None:
878911
session.headers["Content-type"] = "application/json"
879912
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
880913
session.headers["X-Client-Version"] = f"PyIceberg {__version__}"
881-
session.headers.setdefault("X-Iceberg-Access-Delegation", ACCESS_DELEGATION_DEFAULT)
914+
session.headers.setdefault(ACCESS_DELEGATION_HEADER, ACCESS_DELEGATION_DEFAULT)
882915

883916
def _create_table(
884917
self,

tests/catalog/test_rest.py

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@
104104
Capability.V1_DELETE_TABLE,
105105
Capability.V1_RENAME_TABLE,
106106
Capability.V1_REGISTER_TABLE,
107-
Capability.V1_LOAD_CREDENTIALS,
108107
Capability.V1_LIST_VIEWS,
109108
Capability.V1_LOAD_VIEW,
110109
Capability.V1_VIEW_EXISTS,
@@ -1452,10 +1451,19 @@ def test_load_table_200_loading_mode(
14521451

14531452

14541453
def test_load_table_honor_access_delegation(
1455-
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
1454+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
14561455
) -> None:
1456+
requests_mock.get(
1457+
f"{TEST_URI}v1/config",
1458+
json={
1459+
"defaults": {},
1460+
"overrides": {},
1461+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
1462+
},
1463+
status_code=200,
1464+
)
14571465
test_headers_with_remote_signing = {**TEST_HEADERS, "X-Iceberg-Access-Delegation": "remote-signing"}
1458-
rest_mock.get(
1466+
requests_mock.get(
14591467
f"{TEST_URI}v1/namespaces/fokko/tables/table",
14601468
json=example_table_metadata_with_snapshot_v1_rest_json,
14611469
status_code=200,
@@ -3110,9 +3118,20 @@ def test_resolve_storage_credentials_empty() -> None:
31103118
assert RestCatalog._resolve_storage_credentials([], None) == {}
31113119

31123120

3113-
def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
3121+
def test_load_table_with_storage_credentials(
3122+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
3123+
) -> None:
3124+
requests_mock.get(
3125+
f"{TEST_URI}v1/config",
3126+
json={
3127+
"defaults": {},
3128+
"overrides": {},
3129+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
3130+
},
3131+
status_code=200,
3132+
)
31143133
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
3115-
rest_mock.get(
3134+
requests_mock.get(
31163135
f"{TEST_URI}v1/namespaces/fokko/tables/table",
31173136
json={
31183137
"metadata-location": metadata_location,
@@ -3142,10 +3161,78 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me
31423161
assert table.io.properties["s3.access-key-id"] == "vended-key"
31433162
assert table.io.properties["s3.secret-access-key"] == "vended-secret"
31443163
assert table.io.properties["s3.session-token"] == "vended-token"
3164+
assert len(requests_mock.request_history) == 2
31453165

31463166

3147-
def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
3148-
rest_mock.get(
3167+
def test_load_table_loads_credentials_when_endpoint_supported(
3168+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
3169+
) -> None:
3170+
requests_mock.get(
3171+
f"{TEST_URI}v1/config",
3172+
json={
3173+
"defaults": {},
3174+
"overrides": {},
3175+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
3176+
},
3177+
status_code=200,
3178+
)
3179+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
3180+
requests_mock.get(
3181+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
3182+
json={
3183+
"metadata-location": metadata_location,
3184+
"metadata": example_table_metadata_with_snapshot_v1,
3185+
"config": {
3186+
"s3.access-key-id": "from-config",
3187+
"s3.secret-access-key": "from-config-secret",
3188+
},
3189+
},
3190+
status_code=200,
3191+
request_headers=TEST_HEADERS,
3192+
)
3193+
requests_mock.get(
3194+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3195+
json={
3196+
"storage-credentials": [
3197+
{
3198+
"prefix": "s3://warehouse/database/",
3199+
"config": {"s3.access-key-id": "short-prefix-key"},
3200+
},
3201+
{
3202+
"prefix": "s3://warehouse/database/table",
3203+
"config": {
3204+
"s3.access-key-id": "long-prefix-key",
3205+
"s3.secret-access-key": "long-prefix-secret",
3206+
},
3207+
},
3208+
],
3209+
},
3210+
status_code=200,
3211+
request_headers=TEST_HEADERS,
3212+
)
3213+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3214+
table = catalog.load_table(("fokko", "table"))
3215+
3216+
assert table.io.properties["s3.access-key-id"] == "long-prefix-key"
3217+
assert table.io.properties["s3.secret-access-key"] == "long-prefix-secret"
3218+
assert [request.url for request in requests_mock.request_history] == [
3219+
f"{TEST_URI}v1/config",
3220+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
3221+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3222+
]
3223+
3224+
3225+
def test_load_credentials_with_longest_prefix(requests_mock: Mocker) -> None:
3226+
requests_mock.get(
3227+
f"{TEST_URI}v1/config",
3228+
json={
3229+
"defaults": {},
3230+
"overrides": {},
3231+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
3232+
},
3233+
status_code=200,
3234+
)
3235+
requests_mock.get(
31493236
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
31503237
json={
31513238
"storage-credentials": [
@@ -3173,7 +3260,7 @@ def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
31733260
)
31743261

31753262
assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"}
3176-
assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"
3263+
assert requests_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"
31773264

31783265

31793266
def test_load_table_without_storage_credentials(

0 commit comments

Comments
 (0)