Skip to content
4 changes: 4 additions & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
ADLS_CLIENT_ID = "adls.client-id"
ADLS_CLIENT_SECRET = "adls.client-secret"
ADLS_ACCOUNT_HOST = "adls.account-host"
ADLS_BLOB_STORAGE_AUTHORITY = "adls.blob-storage-authority"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread
NikitaMatskevich marked this conversation as resolved.
ADLS_DFS_STORAGE_AUTHORITY = "adls.dfs-storage-authority"
ADLS_BLOB_STORAGE_SCHEME = "adls.blob-storage-scheme"
ADLS_DFS_STORAGE_SCHEME = "adls.dfs-storage-scheme"
GCS_TOKEN = "gcs.oauth2.token"
GCS_TOKEN_EXPIRES_AT_MS = "gcs.oauth2.token-expires-at"
GCS_PROJECT_ID = "gcs.project-id"
Expand Down
47 changes: 47 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@
)
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
ADLS_ACCOUNT_NAME,
ADLS_BLOB_STORAGE_AUTHORITY,
ADLS_BLOB_STORAGE_SCHEME,
ADLS_DFS_STORAGE_AUTHORITY,
ADLS_DFS_STORAGE_SCHEME,
ADLS_SAS_TOKEN,
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
Expand Down Expand Up @@ -197,6 +204,7 @@
MAP_VALUE_NAME = "value"
DOC = "doc"
UTC_ALIASES = {"UTC", "+00:00", "Etc/UTC", "Z"}
MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS = "20.0.0"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inline this at the function level

Copy link
Copy Markdown
Contributor Author

@NikitaMatskevich NikitaMatskevich Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I will have to keep in sync the version used in tests and this one... I can do it if it's ok for you

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think thats fine, we can just use

        if version.parse(pyarrow.__version__) < version.parse("20.0.0"):

This is technically a "public" variable and I dont want users to be able to import it.


T = TypeVar("T")

Expand Down Expand Up @@ -394,6 +402,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
elif scheme in {"gs", "gcs"}:
return self._initialize_gcs_fs()

elif scheme in {"abfs", "abfss", "wasb", "wasbs"}:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant find any pyarrow docs that indicates wasb and wasbs is supported.

return self._initialize_azure_fs()

elif scheme in {"file"}:
return self._initialize_local_fs()

Expand Down Expand Up @@ -475,6 +486,42 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:

return S3FileSystem(**client_kwargs)

def _initialize_azure_fs(self) -> FileSystem:
from packaging import version

if version.parse(pyarrow.__version__) < version.parse(MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS):
raise ImportError(
f"pyarrow version >= {MIN_PYARROW_VERSION_SUPPORTING_AZURE_FS} required for AzureFileSystem support, "
f"but found version {pyarrow.__version__}."
)

from pyarrow.fs import AzureFileSystem

client_kwargs: Dict[str, str] = {}

if account_name := self.properties.get(ADLS_ACCOUNT_NAME):
client_kwargs["account_name"] = account_name

if account_key := self.properties.get(ADLS_ACCOUNT_KEY):
client_kwargs["account_key"] = account_key

if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY):
client_kwargs["blob_storage_authority"] = blob_storage_authority

if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY):
client_kwargs["dfs_storage_authority"] = dfs_storage_authority

if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME):
client_kwargs["blob_storage_scheme"] = blob_storage_scheme

if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME):
client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme

if sas_token := self.properties.get(ADLS_SAS_TOKEN):
client_kwargs["sas_token"] = sas_token

return AzureFileSystem(**client_kwargs)
Comment thread
kevinjqliu marked this conversation as resolved.

def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
from pyarrow.fs import HadoopFileSystem

Expand Down
79 changes: 59 additions & 20 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
from pyiceberg.catalog.noop import NoopCatalog
from pyiceberg.expressions import BoundReference
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
ADLS_ACCOUNT_NAME,
ADLS_BLOB_STORAGE_AUTHORITY,
ADLS_BLOB_STORAGE_SCHEME,
ADLS_DFS_STORAGE_AUTHORITY,
ADLS_DFS_STORAGE_SCHEME,
GCS_PROJECT_ID,
GCS_SERVICE_HOST,
GCS_TOKEN,
Expand Down Expand Up @@ -348,6 +354,11 @@ def table_schema_with_all_types() -> Schema:
)


@pytest.fixture(params=["abfs", "abfss", "wasb", "wasbs"])
def adls_scheme(request: pytest.FixtureRequest) -> str:
return request.param


@pytest.fixture(scope="session")
def pyarrow_schema_simple_without_ids() -> "pa.Schema":
import pyarrow as pa
Expand Down Expand Up @@ -2088,6 +2099,26 @@ def fsspec_fileio_gcs(request: pytest.FixtureRequest) -> FsspecFileIO:
return fsspec.FsspecFileIO(properties=properties)


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture
def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand All @@ -2101,6 +2132,34 @@ def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
return PyArrowFileIO(properties=properties)


@pytest.fixture
def pyarrow_fileio_adls(request: pytest.FixtureRequest) -> Generator[Any, None, None]:
from azure.storage.blob import BlobServiceClient

from pyiceberg.io.pyarrow import PyArrowFileIO

azurite_url = request.config.getoption("--adls.endpoint")
azurite_scheme, azurite_authority = azurite_url.split("://", 1)

azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
ADLS_ACCOUNT_NAME: azurite_account_name,
ADLS_ACCOUNT_KEY: azurite_account_key,
ADLS_BLOB_STORAGE_AUTHORITY: azurite_authority,
ADLS_DFS_STORAGE_AUTHORITY: azurite_authority,
ADLS_BLOB_STORAGE_SCHEME: azurite_scheme,
ADLS_DFS_STORAGE_SCHEME: azurite_scheme,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("warehouse")
yield PyArrowFileIO(properties=properties)
bbs.delete_container("warehouse")
bbs.close()


def aws_credentials() -> None:
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
Expand Down Expand Up @@ -2162,26 +2221,6 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No
yield boto3.client("dynamodb", region_name="us-east-1")


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture(scope="session")
def empty_home_dir_path(tmp_path_factory: pytest.TempPathFactory) -> str:
home_path = str(tmp_path_factory.mktemp("home"))
Expand Down
Loading