Skip to content

Commit 0f80215

Browse files
committed
Load table credentials from credentials endpoint
1 parent 64d39b6 commit 0f80215

2 files changed

Lines changed: 133 additions & 13 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from typing import (
2222
TYPE_CHECKING,
2323
Any,
24+
cast,
2425
)
2526
from urllib.parse import quote, unquote
2627

@@ -233,6 +234,7 @@ class ScanPlanningMode(Enum):
233234

234235

235236
ACCESS_DELEGATION_DEFAULT = "vended-credentials"
237+
ACCESS_DELEGATION_HEADER = "X-Iceberg-Access-Delegation"
236238
AUTHORIZATION_HEADER = "Authorization"
237239
BEARER_PREFIX = "Bearer"
238240
CATALOG_SCOPE = "catalog"
@@ -486,6 +488,40 @@ def _resolve_storage_credentials(storage_credentials: list[StorageCredential], l
486488

487489
return best_match.config if best_match else {}
488490

491+
def _should_load_credentials_from_endpoint(self) -> bool:
492+
if Capability.V1_LOAD_CREDENTIALS not in self._supported_endpoints:
493+
return False
494+
495+
access_delegation = cast(str, self._session.headers.get(ACCESS_DELEGATION_HEADER, ""))
496+
# The spec encodes access delegation as a comma-separated list of mechanisms.
497+
# Load credentials only when vended-credentials is requested.
498+
return any(delegation.strip().lower() == ACCESS_DELEGATION_DEFAULT for delegation in access_delegation.split(","))
499+
500+
def _resolve_table_credentials_from_response_or_endpoint(
501+
self, identifier_tuple: tuple[str, ...], table_response: TableResponse
502+
) -> Properties:
503+
"""Resolve credentials from the table response or load them from the credentials endpoint.
504+
505+
Inline storage-credentials from the table response take precedence. When the response
506+
does not include credentials, the catalog advertises loadCredentials, and vended
507+
credentials are requested, load credentials from the table's /credentials endpoint.
508+
"""
509+
location = table_response.metadata_location or table_response.metadata.location
510+
# Java keeps storage credentials with FileIO for path-level selection. PyIceberg resolves
511+
# them to FileIO properties here, so it needs a location to choose a matching prefix.
512+
if not location:
513+
return {}
514+
515+
credential_config = self._resolve_storage_credentials(table_response.storage_credentials, location)
516+
if table_response.storage_credentials:
517+
return credential_config
518+
519+
if not self._should_load_credentials_from_endpoint():
520+
return {}
521+
522+
credentials_response = self._load_credentials(identifier_tuple)
523+
return self._resolve_storage_credentials(credentials_response.storage_credentials, location)
524+
489525
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
490526
merged_properties = {**self.properties, **properties}
491527
if self._auth_manager:
@@ -852,10 +888,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin
852888
session.mount(self.uri, SigV4Adapter(**self.properties))
853889

854890
def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
855-
# Per Iceberg spec: storage-credentials take precedence over config
856-
credential_config = self._resolve_storage_credentials(
857-
table_response.storage_credentials, table_response.metadata_location
858-
)
891+
credential_config = self._resolve_table_credentials_from_response_or_endpoint(identifier_tuple, table_response)
859892
return Table(
860893
identifier=identifier_tuple,
861894
metadata_location=table_response.metadata_location, # type: ignore
@@ -904,7 +937,7 @@ def _config_headers(self, session: Session) -> None:
904937
session.headers["Content-type"] = "application/json"
905938
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
906939
session.headers["X-Client-Version"] = f"PyIceberg {__version__}"
907-
session.headers.setdefault("X-Iceberg-Access-Delegation", ACCESS_DELEGATION_DEFAULT)
940+
session.headers.setdefault(ACCESS_DELEGATION_HEADER, ACCESS_DELEGATION_DEFAULT)
908941

909942
def _create_table(
910943
self,

tests/catalog/test_rest.py

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@
104104
Capability.V1_DELETE_TABLE,
105105
Capability.V1_RENAME_TABLE,
106106
Capability.V1_REGISTER_TABLE,
107-
Capability.V1_LOAD_CREDENTIALS,
108107
Capability.V1_LIST_VIEWS,
109108
Capability.V1_LOAD_VIEW,
110109
Capability.V1_VIEW_EXISTS,
@@ -1452,10 +1451,19 @@ def test_load_table_200_loading_mode(
14521451

14531452

14541453
def test_load_table_honor_access_delegation(
1455-
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
1454+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
14561455
) -> None:
1456+
requests_mock.get(
1457+
f"{TEST_URI}v1/config",
1458+
json={
1459+
"defaults": {},
1460+
"overrides": {},
1461+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
1462+
},
1463+
status_code=200,
1464+
)
14571465
test_headers_with_remote_signing = {**TEST_HEADERS, "X-Iceberg-Access-Delegation": "remote-signing"}
1458-
rest_mock.get(
1466+
requests_mock.get(
14591467
f"{TEST_URI}v1/namespaces/fokko/tables/table",
14601468
json=example_table_metadata_with_snapshot_v1_rest_json,
14611469
status_code=200,
@@ -3114,9 +3122,20 @@ def test_resolve_storage_credentials_empty() -> None:
31143122
assert RestCatalog._resolve_storage_credentials([], None) == {}
31153123

31163124

3117-
def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
3125+
def test_load_table_with_storage_credentials(
3126+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
3127+
) -> None:
3128+
requests_mock.get(
3129+
f"{TEST_URI}v1/config",
3130+
json={
3131+
"defaults": {},
3132+
"overrides": {},
3133+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
3134+
},
3135+
status_code=200,
3136+
)
31183137
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
3119-
rest_mock.get(
3138+
requests_mock.get(
31203139
f"{TEST_URI}v1/namespaces/fokko/tables/table",
31213140
json={
31223141
"metadata-location": metadata_location,
@@ -3146,10 +3165,78 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me
31463165
assert table.io.properties["s3.access-key-id"] == "vended-key"
31473166
assert table.io.properties["s3.secret-access-key"] == "vended-secret"
31483167
assert table.io.properties["s3.session-token"] == "vended-token"
3168+
assert len(requests_mock.request_history) == 2
31493169

31503170

3151-
def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
3152-
rest_mock.get(
3171+
def test_load_table_loads_credentials_when_endpoint_supported(
3172+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
3173+
) -> None:
3174+
requests_mock.get(
3175+
f"{TEST_URI}v1/config",
3176+
json={
3177+
"defaults": {},
3178+
"overrides": {},
3179+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
3180+
},
3181+
status_code=200,
3182+
)
3183+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
3184+
requests_mock.get(
3185+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
3186+
json={
3187+
"metadata-location": metadata_location,
3188+
"metadata": example_table_metadata_with_snapshot_v1,
3189+
"config": {
3190+
"s3.access-key-id": "from-config",
3191+
"s3.secret-access-key": "from-config-secret",
3192+
},
3193+
},
3194+
status_code=200,
3195+
request_headers=TEST_HEADERS,
3196+
)
3197+
requests_mock.get(
3198+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3199+
json={
3200+
"storage-credentials": [
3201+
{
3202+
"prefix": "s3://warehouse/database/",
3203+
"config": {"s3.access-key-id": "short-prefix-key"},
3204+
},
3205+
{
3206+
"prefix": "s3://warehouse/database/table",
3207+
"config": {
3208+
"s3.access-key-id": "long-prefix-key",
3209+
"s3.secret-access-key": "long-prefix-secret",
3210+
},
3211+
},
3212+
],
3213+
},
3214+
status_code=200,
3215+
request_headers=TEST_HEADERS,
3216+
)
3217+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3218+
table = catalog.load_table(("fokko", "table"))
3219+
3220+
assert table.io.properties["s3.access-key-id"] == "long-prefix-key"
3221+
assert table.io.properties["s3.secret-access-key"] == "long-prefix-secret"
3222+
assert [request.url for request in requests_mock.request_history] == [
3223+
f"{TEST_URI}v1/config",
3224+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
3225+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3226+
]
3227+
3228+
3229+
def test_load_credentials_with_longest_prefix(requests_mock: Mocker) -> None:
3230+
requests_mock.get(
3231+
f"{TEST_URI}v1/config",
3232+
json={
3233+
"defaults": {},
3234+
"overrides": {},
3235+
"endpoints": [str(endpoint) for endpoint in [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS]],
3236+
},
3237+
status_code=200,
3238+
)
3239+
requests_mock.get(
31533240
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
31543241
json={
31553242
"storage-credentials": [
@@ -3177,7 +3264,7 @@ def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
31773264
)
31783265

31793266
assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"}
3180-
assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"
3267+
assert requests_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"
31813268

31823269

31833270
def test_load_table_without_storage_credentials(

0 commit comments

Comments
 (0)