Skip to content

Commit 69a6e5b

Browse files
committed
Add REST load credentials support
1 parent 55887b4 commit 69a6e5b

2 files changed

Lines changed: 81 additions & 0 deletions

File tree

pyiceberg/catalog/rest/__init__.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ class Endpoints:
147147
create_table: str = "namespaces/{namespace}/tables"
148148
register_table: str = "namespaces/{namespace}/register"
149149
load_table: str = "namespaces/{namespace}/tables/{table}"
150+
load_credentials: str = "namespaces/{namespace}/tables/{table}/credentials"
150151
update_table: str = "namespaces/{namespace}/tables/{table}"
151152
drop_table: str = "namespaces/{namespace}/tables/{table}"
152153
table_exists: str = "namespaces/{namespace}/tables/{table}"
@@ -181,6 +182,7 @@ class Capability:
181182
V1_DELETE_TABLE = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_table}")
182183
V1_RENAME_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.rename_table}")
183184
V1_REGISTER_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.register_table}")
185+
V1_LOAD_CREDENTIALS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_credentials}")
184186

185187
V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_views}")
186188
V1_LOAD_VIEW = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_view}")
@@ -293,6 +295,10 @@ class TableResponse(IcebergBaseModel):
293295
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list)
294296

295297

298+
class LoadCredentialsResponse(IcebergBaseModel):
299+
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials")
300+
301+
296302
class ViewResponse(IcebergBaseModel):
297303
metadata_location: str | None = Field(alias="metadata-location", default=None)
298304
metadata: ViewMetadata
@@ -545,6 +551,43 @@ def _fetch_scan_tasks(self, identifier: str | Identifier, plan_task: str) -> Sca
545551

546552
return ScanTasks.model_validate_json(response.text)
547553

554+
@retry(**_RETRY_ARGS)
555+
def _load_credentials(
556+
self,
557+
identifier: str | Identifier,
558+
plan_id: str | None = None,
559+
referenced_by: str | None = None,
560+
) -> LoadCredentialsResponse:
561+
"""Load raw vended storage credentials for a table."""
562+
self._check_endpoint(Capability.V1_LOAD_CREDENTIALS)
563+
params: dict[str, str] = {}
564+
if plan_id is not None:
565+
params["planId"] = plan_id
566+
if referenced_by is not None:
567+
params["referenced-by"] = referenced_by
568+
569+
response = self._session.get(
570+
self.url(Endpoints.load_credentials, prefixed=True, **self._split_identifier_for_path(identifier)),
571+
params=params,
572+
)
573+
try:
574+
response.raise_for_status()
575+
except HTTPError as exc:
576+
_handle_non_200_response(exc, {404: NoSuchTableError})
577+
578+
return LoadCredentialsResponse.model_validate_json(response.text)
579+
580+
def load_credentials(
581+
self,
582+
identifier: str | Identifier,
583+
location: str,
584+
plan_id: str | None = None,
585+
referenced_by: str | None = None,
586+
) -> Properties:
587+
"""Load vended storage credentials and return the best match for a location."""
588+
credentials_response = self._load_credentials(identifier, plan_id=plan_id, referenced_by=referenced_by)
589+
return self._resolve_storage_credentials(credentials_response.storage_credentials, location)
590+
548591
def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) -> list[FileScanTask]:
549592
"""Plan a table scan and return FileScanTasks.
550593

tests/catalog/test_rest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
Capability.V1_DELETE_TABLE,
105105
Capability.V1_RENAME_TABLE,
106106
Capability.V1_REGISTER_TABLE,
107+
Capability.V1_LOAD_CREDENTIALS,
107108
Capability.V1_LIST_VIEWS,
108109
Capability.V1_LOAD_VIEW,
109110
Capability.V1_VIEW_EXISTS,
@@ -3147,6 +3148,43 @@ def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_me
31473148
assert table.io.properties["s3.session-token"] == "vended-token"
31483149

31493150

3151+
def test_load_credentials_with_longest_prefix_and_query_params(rest_mock: Mocker) -> None:
3152+
rest_mock.get(
3153+
f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials",
3154+
json={
3155+
"storage-credentials": [
3156+
{
3157+
"prefix": "s3://warehouse/database/",
3158+
"config": {"s3.access-key-id": "short-prefix-key"},
3159+
},
3160+
{
3161+
"prefix": "s3://warehouse/database/table",
3162+
"config": {
3163+
"s3.access-key-id": "long-prefix-key",
3164+
"s3.secret-access-key": "long-prefix-secret",
3165+
},
3166+
},
3167+
],
3168+
},
3169+
status_code=200,
3170+
request_headers=TEST_HEADERS,
3171+
)
3172+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
3173+
3174+
credentials = catalog.load_credentials(
3175+
("fokko", "table"),
3176+
"s3://warehouse/database/table/data/file.parquet",
3177+
plan_id="plan-123",
3178+
referenced_by="prod.view,prod.inner_view",
3179+
)
3180+
3181+
assert credentials == {"s3.access-key-id": "long-prefix-key", "s3.secret-access-key": "long-prefix-secret"}
3182+
assert (
3183+
rest_mock.last_request.url == f"{TEST_URI}v1/namespaces/fokko/tables/table/credentials?planId=plan-123"
3184+
"&referenced-by=prod.view%2Cprod.inner_view"
3185+
)
3186+
3187+
31503188
def test_load_table_without_storage_credentials(
31513189
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
31523190
) -> None:

0 commit comments

Comments
 (0)