Skip to content

Commit 996cd58

Browse files
committed
Load table credentials from credentials endpoint
1 parent 64d39b6 commit 996cd58

2 files changed

Lines changed: 104 additions & 18 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ class ScanPlanningMode(Enum):
233233

234234

235235
ACCESS_DELEGATION_DEFAULT = "vended-credentials"
236+
ACCESS_DELEGATION_HEADER = "X-Iceberg-Access-Delegation"
236237
AUTHORIZATION_HEADER = "Authorization"
237238
BEARER_PREFIX = "Bearer"
238239
CATALOG_SCOPE = "catalog"
@@ -486,6 +487,34 @@ def _resolve_storage_credentials(storage_credentials: list[StorageCredential], l
486487

487488
return best_match.config if best_match else {}
488489

490+
def _should_load_credentials_from_endpoint(self) -> bool:
491+
if Capability.V1_LOAD_CREDENTIALS not in self._supported_endpoints:
492+
return False
493+
494+
access_delegation = self._session.headers.get(ACCESS_DELEGATION_HEADER, "")
495+
if isinstance(access_delegation, bytes):
496+
access_delegation = access_delegation.decode()
497+
498+
return any(delegation.strip().lower() == ACCESS_DELEGATION_DEFAULT for delegation in access_delegation.split(","))
499+
500+
def _resolve_or_load_table_credentials(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Properties:
501+
"""Resolve credentials from the table response or load them from the credentials endpoint.
502+
503+
Inline storage-credentials from the table response take precedence. When the response
504+
does not include credentials, the catalog advertises loadCredentials, and vended
505+
credentials are requested, load credentials from the table's /credentials endpoint.
506+
"""
507+
location = table_response.metadata_location or table_response.metadata.location
508+
credential_config = self._resolve_storage_credentials(table_response.storage_credentials, location)
509+
if table_response.storage_credentials or not location:
510+
return credential_config
511+
512+
if not self._should_load_credentials_from_endpoint():
513+
return {}
514+
515+
credentials_response = self._load_credentials(identifier_tuple)
516+
return self._resolve_storage_credentials(credentials_response.storage_credentials, location)
517+
489518
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
490519
merged_properties = {**self.properties, **properties}
491520
if self._auth_manager:
@@ -852,10 +881,7 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin
852881
session.mount(self.uri, SigV4Adapter(**self.properties))
853882

854883
def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
855-
# Per Iceberg spec: storage-credentials take precedence over config
856-
credential_config = self._resolve_storage_credentials(
857-
table_response.storage_credentials, table_response.metadata_location
858-
)
884+
credential_config = self._resolve_or_load_table_credentials(identifier_tuple, table_response)
859885
return Table(
860886
identifier=identifier_tuple,
861887
metadata_location=table_response.metadata_location, # type: ignore
@@ -904,7 +930,7 @@ def _config_headers(self, session: Session) -> None:
904930
session.headers["Content-type"] = "application/json"
905931
session.headers["User-Agent"] = f"PyIceberg/{__version__}"
906932
session.headers["X-Client-Version"] = f"PyIceberg {__version__}"
907-
session.headers.setdefault("X-Iceberg-Access-Delegation", ACCESS_DELEGATION_DEFAULT)
933+
session.headers.setdefault(ACCESS_DELEGATION_HEADER, ACCESS_DELEGATION_DEFAULT)
908934

909935
def _create_table(
910936
self,

tests/catalog/test_rest.py

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@
104104
Capability.V1_DELETE_TABLE,
105105
Capability.V1_RENAME_TABLE,
106106
Capability.V1_REGISTER_TABLE,
107-
Capability.V1_LOAD_CREDENTIALS,
108107
Capability.V1_LIST_VIEWS,
109108
Capability.V1_LOAD_VIEW,
110109
Capability.V1_VIEW_EXISTS,
@@ -162,17 +161,22 @@ def example_view_metadata_rest_json(example_view_metadata_v1: dict[str, Any]) ->
162161
}
163162

164163

164+
def _mock_config_response(requests_mock: Mocker, endpoints: list[Endpoint] | None = None) -> None:
165+
endpoints = TEST_SUPPORTED_ENDPOINTS if endpoints is None else endpoints
166+
requests_mock.get(
167+
f"{TEST_URI}v1/config",
168+
json={"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for endpoint in endpoints]},
169+
status_code=200,
170+
)
171+
172+
165173
@pytest.fixture
166174
def rest_mock(requests_mock: Mocker) -> Mocker:
167175
"""Takes the default requests_mock and adds the config endpoint to it
168176
169177
This endpoint is called when initializing the rest catalog
170178
"""
171-
requests_mock.get(
172-
f"{TEST_URI}v1/config",
173-
json={"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for endpoint in TEST_SUPPORTED_ENDPOINTS]},
174-
status_code=200,
175-
)
179+
_mock_config_response(requests_mock)
176180
return requests_mock
177181

178182

@@ -1452,10 +1456,11 @@ def test_load_table_200_loading_mode(
14521456

14531457

14541458
def test_load_table_honor_access_delegation(
1455-
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
1459+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
14561460
) -> None:
1461+
_mock_config_response(requests_mock, [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS])
14571462
test_headers_with_remote_signing = {**TEST_HEADERS, "X-Iceberg-Access-Delegation": "remote-signing"}
1458-
rest_mock.get(
1463+
requests_mock.get(
14591464
f"{TEST_URI}v1/namespaces/fokko/tables/table",
14601465
json=example_table_metadata_with_snapshot_v1_rest_json,
14611466
status_code=200,
@@ -3114,9 +3119,12 @@ def test_resolve_storage_credentials_empty() -> None:
31143119
assert RestCatalog._resolve_storage_credentials([], None) == {}
31153120

31163121

3117-
def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
3122+
def test_load_table_with_storage_credentials(
3123+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
3124+
) -> None:
3125+
_mock_config_response(requests_mock, [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS])
31183126
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
3119-
rest_mock.get(
3127+
requests_mock.get(
31203128
f"{TEST_URI}v1/namespaces/fokko/tables/table",
31213129
json={
31223130
"metadata-location": metadata_location,
@@ -3146,10 +3154,62 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me
31463154
assert table.io.properties["s3.access-key-id"] == "vended-key"
31473155
assert table.io.properties["s3.secret-access-key"] == "vended-secret"
31483156
assert table.io.properties["s3.session-token"] == "vended-token"
3157+
assert len(requests_mock.request_history) == 2
31493158

31503159

3151-
def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
3152-
rest_mock.get(
3160+
def test_load_table_loads_credentials_when_endpoint_supported(
3161+
requests_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
3162+
) -> None:
3163+
_mock_config_response(requests_mock, [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS])
3164+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
3165+
requests_mock.get(
3166+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
3167+
json={
3168+
"metadata-location": metadata_location,
3169+
"metadata": example_table_metadata_with_snapshot_v1,
3170+
"config": {
3171+
"s3.access-key-id": "from-config",
3172+
"s3.secret-access-key": "from-config-secret",
3173+
},
3174+
},
3175+
status_code=200,
3176+
request_headers=TEST_HEADERS,
3177+
)
3178+
requests_mock.get(
3179+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3180+
json={
3181+
"storage-credentials": [
3182+
{
3183+
"prefix": "s3://warehouse/database/",
3184+
"config": {"s3.access-key-id": "short-prefix-key"},
3185+
},
3186+
{
3187+
"prefix": "s3://warehouse/database/table",
3188+
"config": {
3189+
"s3.access-key-id": "long-prefix-key",
3190+
"s3.secret-access-key": "long-prefix-secret",
3191+
},
3192+
},
3193+
],
3194+
},
3195+
status_code=200,
3196+
request_headers=TEST_HEADERS,
3197+
)
3198+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3199+
table = catalog.load_table(("fokko", "table"))
3200+
3201+
assert table.io.properties["s3.access-key-id"] == "long-prefix-key"
3202+
assert table.io.properties["s3.secret-access-key"] == "long-prefix-secret"
3203+
assert [request.url for request in requests_mock.request_history] == [
3204+
f"{TEST_URI}v1/config",
3205+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
3206+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3207+
]
3208+
3209+
3210+
def test_load_credentials_with_longest_prefix(requests_mock: Mocker) -> None:
3211+
_mock_config_response(requests_mock, [*TEST_SUPPORTED_ENDPOINTS, Capability.V1_LOAD_CREDENTIALS])
3212+
requests_mock.get(
31533213
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
31543214
json={
31553215
"storage-credentials": [
@@ -3177,7 +3237,7 @@ def test_load_credentials_with_longest_prefix(rest_mock: Mocker) -> None:
31773237
)
31783238

31793239
assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"}
3180-
assert rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"
3240+
assert requests_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials"
31813241

31823242

31833243
def test_load_table_without_storage_credentials(

0 commit comments

Comments
 (0)