Skip to content

Commit 17cd113

Browse files
author
JannicCutura
committed
feat: Add S3 access point support for cross-account table access
Add support for accessing Iceberg tables via S3 access points, enabling cross-account access scenarios where organizations enforce access point usage instead of direct bucket access. Changes: - Add S3_ACCESS_POINT_PREFIX config constant (s3.access-point.<bucket>) - Implement _resolve_s3_access_point() in PyArrowFileIO - Implement _resolve_s3_access_point() in FsspecFileIO - Add 12 unit tests (6 per FileIO implementation) Configuration: s3.access-point.<bucket-name> = <access-point-alias> The access point alias (format: <name>-<account-id>-s3alias) is used transparently in place of the bucket name when accessing S3 objects.
1 parent d62b360 commit 17cd113

File tree

5 files changed

+305
-40
lines changed

5 files changed

+305
-40
lines changed

pyiceberg/io/__init__.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@
3131
from abc import ABC, abstractmethod
3232
from io import SEEK_SET
3333
from types import TracebackType
34-
from typing import (
35-
Protocol,
36-
runtime_checkable,
37-
)
34+
from typing import Protocol, runtime_checkable
3835
from urllib.parse import urlparse
3936

4037
from pyiceberg.typedef import EMPTY_DICT, Properties
@@ -67,6 +64,8 @@
6764
S3_ROLE_SESSION_NAME = "s3.role-session-name"
6865
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
6966
S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl"
67+
# Prefix for per-bucket access point config: s3.access-point.<bucket> = <access-point-alias>
68+
S3_ACCESS_POINT_PREFIX = "s3.access-point."
7069
HDFS_HOST = "hdfs.host"
7170
HDFS_PORT = "hdfs.port"
7271
HDFS_USER = "hdfs.user"

pyiceberg/io/fsspec.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
HF_ENDPOINT,
7070
HF_TOKEN,
7171
S3_ACCESS_KEY_ID,
72+
S3_ACCESS_POINT_PREFIX,
7273
S3_ANONYMOUS,
7374
S3_CONNECT_TIMEOUT,
7475
S3_ENDPOINT,
@@ -419,6 +420,29 @@ def __init__(self, properties: Properties):
419420
self._thread_locals = threading.local()
420421
super().__init__(properties=properties)
421422

423+
def _resolve_s3_access_point(self, scheme: str, bucket: str) -> str | None:
424+
"""Resolve S3 access point alias for a bucket if configured.
425+
426+
For cross-account access, S3 paths need to use access point aliases instead of bucket names.
427+
Access point aliases work like bucket names and are in the format: <name>-<account-id>-s3alias
428+
Config format: s3.access-point.<bucket-name> = <access-point-alias>
429+
430+
Args:
431+
scheme: The URI scheme (s3, s3a, s3n)
432+
bucket: The bucket name from the original URI
433+
434+
Returns:
435+
The access point alias if configured, None otherwise
436+
"""
437+
if scheme not in {"s3", "s3a", "s3n"}:
438+
return None
439+
440+
access_point_key = f"{S3_ACCESS_POINT_PREFIX}{bucket}"
441+
if access_point_alias := self.properties.get(access_point_key):
442+
logger.debug("Resolving bucket '%s' to access point alias: %s", bucket, access_point_alias)
443+
return access_point_alias
444+
return None
445+
422446
def new_input(self, location: str) -> FsspecInputFile:
423447
"""Get an FsspecInputFile instance to read bytes from the file at the given location.
424448
@@ -430,7 +454,13 @@ def new_input(self, location: str) -> FsspecInputFile:
430454
"""
431455
uri = urlparse(location)
432456
fs = self.get_fs(uri.scheme)
433-
return FsspecInputFile(location=location, fs=fs)
457+
458+
# Resolve S3 access point if configured
459+
resolved_location = location
460+
if access_point_alias := self._resolve_s3_access_point(uri.scheme, uri.netloc):
461+
resolved_location = f"{uri.scheme}://{access_point_alias}{uri.path}"
462+
463+
return FsspecInputFile(location=resolved_location, fs=fs)
434464

435465
def new_output(self, location: str) -> FsspecOutputFile:
436466
"""Get an FsspecOutputFile instance to write bytes to the file at the given location.
@@ -443,7 +473,13 @@ def new_output(self, location: str) -> FsspecOutputFile:
443473
"""
444474
uri = urlparse(location)
445475
fs = self.get_fs(uri.scheme)
446-
return FsspecOutputFile(location=location, fs=fs)
476+
477+
# Resolve S3 access point if configured
478+
resolved_location = location
479+
if access_point_alias := self._resolve_s3_access_point(uri.scheme, uri.netloc):
480+
resolved_location = f"{uri.scheme}://{access_point_alias}{uri.path}"
481+
482+
return FsspecOutputFile(location=resolved_location, fs=fs)
447483

448484
def delete(self, location: str | InputFile | OutputFile) -> None:
449485
"""Delete the file at the given location.
@@ -460,7 +496,13 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
460496

461497
uri = urlparse(str_location)
462498
fs = self.get_fs(uri.scheme)
463-
fs.rm(str_location)
499+
500+
# Resolve S3 access point if configured
501+
resolved_location = str_location
502+
if access_point_alias := self._resolve_s3_access_point(uri.scheme, uri.netloc):
503+
resolved_location = f"{uri.scheme}://{access_point_alias}{uri.path}"
504+
505+
fs.rm(resolved_location)
464506

465507
def get_fs(self, scheme: str) -> AbstractFileSystem:
466508
"""Get a filesystem for a specific scheme, cached per thread."""

pyiceberg/io/pyarrow.py

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,7 @@
4242
from dataclasses import dataclass
4343
from enum import Enum
4444
from functools import lru_cache, singledispatch
45-
from typing import (
46-
TYPE_CHECKING,
47-
Any,
48-
Generic,
49-
TypeVar,
50-
cast,
51-
)
45+
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
5246
from urllib.parse import urlparse
5347

5448
import pyarrow as pa
@@ -58,22 +52,13 @@
5852
import pyarrow.parquet as pq
5953
from pyarrow import ChunkedArray
6054
from pyarrow._s3fs import S3RetryStrategy
61-
from pyarrow.fs import (
62-
FileInfo,
63-
FileSystem,
64-
FileType,
65-
)
55+
from pyarrow.fs import FileInfo, FileSystem, FileType
6656

6757
from pyiceberg.conversions import to_bytes
6858
from pyiceberg.exceptions import ResolveError
6959
from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundTerm, Not, Or
7060
from pyiceberg.expressions.literals import Literal
71-
from pyiceberg.expressions.visitors import (
72-
BoundBooleanExpressionVisitor,
73-
bind,
74-
extract_field_ids,
75-
translate_column_names,
76-
)
61+
from pyiceberg.expressions.visitors import BoundBooleanExpressionVisitor, bind, extract_field_ids, translate_column_names
7762
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
7863
from pyiceberg.io import (
7964
ADLS_ACCOUNT_KEY,
@@ -101,6 +86,7 @@
10186
HDFS_PORT,
10287
HDFS_USER,
10388
S3_ACCESS_KEY_ID,
89+
S3_ACCESS_POINT_PREFIX,
10490
S3_ANONYMOUS,
10591
S3_CONNECT_TIMEOUT,
10692
S3_ENDPOINT,
@@ -120,11 +106,7 @@
120106
OutputFile,
121107
OutputStream,
122108
)
123-
from pyiceberg.manifest import (
124-
DataFile,
125-
DataFileContent,
126-
FileFormat,
127-
)
109+
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
128110
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value
129111
from pyiceberg.schema import (
130112
PartnerAccessor,
@@ -607,6 +589,33 @@ def _initialize_gcs_fs(self) -> FileSystem:
607589
def _initialize_local_fs(self) -> FileSystem:
608590
return PyArrowLocalFileSystem()
609591

592+
def _resolve_s3_access_point(self, scheme: str, netloc: str, path_suffix: str) -> tuple[str, str]:
593+
"""Resolve S3 access point alias for a bucket if configured.
594+
595+
For cross-account access, S3 paths need to use access point aliases instead of bucket names.
596+
Access point aliases work like bucket names and are in the format: <name>-<account-id>-s3alias
597+
Config format: s3.access-point.<bucket-name> = <access-point-alias>
598+
599+
Args:
600+
scheme: The URI scheme (s3, s3a, s3n)
601+
netloc: The bucket name from the original URI
602+
path_suffix: The path within the bucket (without bucket name)
603+
604+
Returns:
605+
Tuple of (resolved_netloc, resolved_path) where netloc may be replaced with access point alias
606+
"""
607+
if scheme not in {"s3", "s3a", "s3n"}:
608+
return netloc, f"{netloc}{path_suffix}"
609+
610+
# Check for access point alias configuration for this bucket
611+
access_point_key = f"{S3_ACCESS_POINT_PREFIX}{netloc}"
612+
if access_point_alias := self.properties.get(access_point_key):
613+
logger.debug("Resolving bucket '%s' to access point alias: %s", netloc, access_point_alias)
614+
# Replace bucket with access point alias in the path
615+
return access_point_alias, f"{access_point_alias}{path_suffix}"
616+
617+
return netloc, f"{netloc}{path_suffix}"
618+
610619
def new_input(self, location: str) -> PyArrowFile:
611620
"""Get a PyArrowFile instance to read bytes from the file at the given location.
612621
@@ -616,11 +625,14 @@ def new_input(self, location: str) -> PyArrowFile:
616625
Returns:
617626
PyArrowFile: A PyArrowFile instance for the given location.
618627
"""
619-
scheme, netloc, path = self.parse_location(location, self.properties)
628+
scheme, netloc, _ = self.parse_location(location, self.properties)
629+
# For S3, resolve access point ARN if configured
630+
uri = urlparse(location)
631+
resolved_netloc, resolved_path = self._resolve_s3_access_point(scheme, netloc, uri.path)
620632
return PyArrowFile(
621-
fs=self.fs_by_scheme(scheme, netloc),
633+
fs=self.fs_by_scheme(scheme, resolved_netloc),
622634
location=location,
623-
path=path,
635+
path=resolved_path,
624636
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
625637
)
626638

@@ -633,11 +645,14 @@ def new_output(self, location: str) -> PyArrowFile:
633645
Returns:
634646
PyArrowFile: A PyArrowFile instance for the given location.
635647
"""
636-
scheme, netloc, path = self.parse_location(location, self.properties)
648+
scheme, netloc, _ = self.parse_location(location, self.properties)
649+
# For S3, resolve access point ARN if configured
650+
uri = urlparse(location)
651+
resolved_netloc, resolved_path = self._resolve_s3_access_point(scheme, netloc, uri.path)
637652
return PyArrowFile(
638-
fs=self.fs_by_scheme(scheme, netloc),
653+
fs=self.fs_by_scheme(scheme, resolved_netloc),
639654
location=location,
640-
path=path,
655+
path=resolved_path,
641656
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
642657
)
643658

@@ -655,11 +670,14 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
655670
an AWS error code 15.
656671
"""
657672
str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
658-
scheme, netloc, path = self.parse_location(str_location, self.properties)
659-
fs = self.fs_by_scheme(scheme, netloc)
673+
scheme, netloc, _ = self.parse_location(str_location, self.properties)
674+
# For S3, resolve access point ARN if configured
675+
uri = urlparse(str_location)
676+
resolved_netloc, resolved_path = self._resolve_s3_access_point(scheme, netloc, uri.path)
677+
fs = self.fs_by_scheme(scheme, resolved_netloc)
660678

661679
try:
662-
fs.delete_file(path)
680+
fs.delete_file(resolved_path)
663681
except FileNotFoundError:
664682
raise
665683
except PermissionError:

tests/io/test_fsspec.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,107 @@ def test_fsspec_unified_session_properties() -> None:
391391
)
392392

393393

394+
def test_fsspec_s3_access_point_resolution_with_config() -> None:
395+
"""Test that S3 bucket names are resolved to access point aliases when configured."""
396+
from pyiceberg.io import S3_ACCESS_POINT_PREFIX
397+
398+
bucket_name = "my-bucket"
399+
access_point_alias = "my-access-point-abc123-s3alias"
400+
properties = {
401+
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
402+
}
403+
404+
fileio = FsspecFileIO(properties=properties)
405+
406+
# Test _resolve_s3_access_point directly
407+
result = fileio._resolve_s3_access_point("s3", bucket_name)
408+
409+
assert result == access_point_alias
410+
411+
412+
def test_fsspec_s3_access_point_resolution_without_config() -> None:
413+
"""Test that S3 bucket names return None when no access point is configured."""
414+
bucket_name = "my-bucket"
415+
fileio = FsspecFileIO(properties={})
416+
417+
result = fileio._resolve_s3_access_point("s3", bucket_name)
418+
419+
assert result is None
420+
421+
422+
def test_fsspec_s3_access_point_resolution_non_s3_scheme() -> None:
423+
"""Test that non-S3 schemes are not affected by access point configuration."""
424+
from pyiceberg.io import S3_ACCESS_POINT_PREFIX
425+
426+
bucket_name = "my-bucket"
427+
access_point_alias = "my-access-point-abc123-s3alias"
428+
properties = {
429+
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
430+
}
431+
432+
fileio = FsspecFileIO(properties=properties)
433+
434+
# Test with non-S3 scheme (should return None)
435+
result = fileio._resolve_s3_access_point("gs", bucket_name)
436+
437+
assert result is None
438+
439+
440+
def test_fsspec_s3_access_point_resolution_s3a_scheme() -> None:
441+
"""Test that s3a scheme also resolves access points."""
442+
from pyiceberg.io import S3_ACCESS_POINT_PREFIX
443+
444+
bucket_name = "my-bucket"
445+
access_point_alias = "my-access-point-abc123-s3alias"
446+
properties = {
447+
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
448+
}
449+
450+
fileio = FsspecFileIO(properties=properties)
451+
452+
result = fileio._resolve_s3_access_point("s3a", bucket_name)
453+
454+
assert result == access_point_alias
455+
456+
457+
def test_fsspec_s3_access_point_new_input_uses_resolved_location() -> None:
458+
"""Test that new_input uses the resolved access point location."""
459+
from pyiceberg.io import S3_ACCESS_POINT_PREFIX
460+
461+
bucket_name = "my-bucket"
462+
access_point_alias = "my-access-point-abc123-s3alias"
463+
properties = {
464+
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
465+
"s3.region": "us-east-1",
466+
}
467+
468+
with mock.patch("s3fs.S3FileSystem"):
469+
fileio = FsspecFileIO(properties=properties)
470+
input_file = fileio.new_input(f"s3://{bucket_name}/path/to/file.parquet")
471+
472+
# The location should be rewritten to use the access point alias
473+
assert input_file.location == f"s3://{access_point_alias}/path/to/file.parquet"
474+
475+
476+
def test_fsspec_s3_access_point_new_output_uses_resolved_location() -> None:
477+
"""Test that new_output uses the resolved access point location."""
478+
from pyiceberg.io import S3_ACCESS_POINT_PREFIX
479+
480+
bucket_name = "my-bucket"
481+
access_point_alias = "my-access-point-abc123-s3alias"
482+
properties = {
483+
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
484+
"s3.region": "us-east-1",
485+
}
486+
487+
with mock.patch("s3fs.S3FileSystem"):
488+
fileio = FsspecFileIO(properties=properties)
489+
output_file = fileio.new_output(f"s3://{bucket_name}/path/to/file.parquet")
490+
491+
# The location should be rewritten to use the access point alias
492+
assert output_file.location == f"s3://{access_point_alias}/path/to/file.parquet"
493+
494+
394495
@pytest.mark.adls
395496
def test_fsspec_new_input_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
396497
"""Test creating a new input file from an fsspec file-io"""

0 commit comments

Comments
 (0)