Skip to content

Commit 9cb70c8

Browse files
committed
Add REST load credentials support
1 parent 55887b4 commit 9cb70c8

2 files changed

Lines changed: 88 additions & 0 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from __future__ import annotations
1818

1919
from collections import deque
20+
from collections.abc import Sequence
2021
from enum import Enum
2122
from typing import (
2223
TYPE_CHECKING,
@@ -147,6 +148,7 @@ class Endpoints:
147148
create_table: str = "namespaces/{namespace}/tables"
148149
register_table: str = "namespaces/{namespace}/register"
149150
load_table: str = "namespaces/{namespace}/tables/{table}"
151+
load_credentials: str = "namespaces/{namespace}/tables/{table}/credentials"
150152
update_table: str = "namespaces/{namespace}/tables/{table}"
151153
drop_table: str = "namespaces/{namespace}/tables/{table}"
152154
table_exists: str = "namespaces/{namespace}/tables/{table}"
@@ -181,6 +183,7 @@ class Capability:
181183
V1_DELETE_TABLE = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_table}")
182184
V1_RENAME_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.rename_table}")
183185
V1_REGISTER_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.register_table}")
186+
V1_LOAD_CREDENTIALS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_credentials}")
184187

185188
V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_views}")
186189
V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_view}")
@@ -293,6 +296,10 @@ class TableResponse(IcebergBaseModel):
293296
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list)
294297

295298

299+
class LoadCredentialsResponse(IcebergBaseModel):
300+
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials")
301+
302+
296303
class ViewResponse(IcebergBaseModel):
297304
metadata_location: str | None = Field(alias="metadata-location", default=None)
298305
metadata: ViewMetadata
@@ -480,6 +487,12 @@ def _resolve_storage_credentials(storage_credentials: list[StorageCredential], l
480487

481488
return best_match.config if best_match else {}
482489

490+
@staticmethod
491+
def _format_referenced_by(referenced_by: str | Sequence[str]) -> str:
492+
if isinstance(referenced_by, str):
493+
return referenced_by
494+
return ",".join(referenced_by)
495+
483496
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
484497
merged_properties = {**self.properties, **properties}
485498
if self._auth_manager:
@@ -545,6 +558,43 @@ def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> Sca
545558

546559
return ScanTasks.model_validate_json(response.text)
547560

561+
@retry(**_RETRY_ARGS)
562+
def _load_credentials(
563+
self,
564+
identifier: str | Identifier,
565+
plan_id: str | None = None,
566+
referenced_by: str | Sequence[str] | None = None,
567+
) -> LoadCredentialsResponse:
568+
"""Load raw vended storage credentials for a table."""
569+
self._check_endpoint(Capability.V1_LOAD_CREDENTIALS)
570+
params: dict[str, str] = {}
571+
if plan_id is not None:
572+
params["planId"] = plan_id
573+
if referenced_by is not None:
574+
params["referenced-by"] = self._format_referenced_by(referenced_by)
575+
576+
response = self._session.get(
577+
self.url(Endpoints.load_credentials, prefixed=True, **self._split_identifier_for_path(identifier)),
578+
params=params,
579+
)
580+
try:
581+
response.raise_for_status()
582+
except HTTPError as exc:
583+
_handle_non_200_response(exc, {404: NoSuchTableError})
584+
585+
return LoadCredentialsResponse.model_validate_json(response.text)
586+
587+
def load_credentials(
588+
self,
589+
identifier: str | Identifier,
590+
location: str,
591+
plan_id: str | None = None,
592+
referenced_by: str | Sequence[str] | None = None,
593+
) -> Properties:
594+
"""Load vended storage credentials and return the best match for a location."""
595+
credentials_response = self._load_credentials(identifier, plan_id=plan_id, referenced_by=referenced_by)
596+
return self._resolve_storage_credentials(credentials_response.storage_credentials, location)
597+
548598
def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> list[FileScanTask]:
549599
"""Plan a table scan and return FileScanTasks.
550600

tests/catalog/test_rest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
Capability.V1_DELETE_TABLE,
105105
Capability.V1_RENAME_TABLE,
106106
Capability.V1_REGISTER_TABLE,
107+
Capability.V1_LOAD_CREDENTIALS,
107108
Capability.V1_LIST_VIEWS,
108109
Capability.V1_LOAD_VIEW,
109110
Capability.V1_VIEW_EXISTS,
@@ -3147,6 +3148,43 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me
31473148
assert table.io.properties["s3.session-token"] == "vended-token"
31483149

31493150

3151+
def test_load_credentials_with_longest_prefix_and_query_params(rest_mock: Mocker) -> None:
3152+
rest_mock.get(
3153+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3154+
json={
3155+
"storage-credentials": [
3156+
{
3157+
"prefix": "s3://warehouse/database/",
3158+
"config": {"s3.access-key-id": "short-prefix-key"},
3159+
},
3160+
{
3161+
"prefix": "s3://warehouse/database/table",
3162+
"config": {
3163+
"s3.access-key-id": "long-prefix-key",
3164+
"s3.secret-access-key": "long-prefix-secret",
3165+
},
3166+
},
3167+
],
3168+
},
3169+
status_code=200,
3170+
request_headers=TEST_HEADERS,
3171+
)
3172+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3173+
3174+
credentials = catalog.load_credentials(
3175+
("fokko", "table"),
3176+
"s3://warehouse/database/table/data/file.parquet",
3177+
plan_id="plan-123",
3178+
referenced_by=["prod.view", "prod.inner_view"],
3179+
)
3180+
3181+
assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"}
3182+
assert (
3183+
rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials?planId=plan-123"
3184+
"&referenced-by=prod.view%2Cprod.inner_view"
3185+
)
3186+
3187+
31503188
def test_load_table_without_storage_credentials(
31513189
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
31523190
) -> None:

0 commit comments

Comments
 (0)