Skip to content
510 changes: 306 additions & 204 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
ADLS_CLIENT_ID = "adls.client-id"
ADLS_CLIENT_SECRET = "adls.client-secret"
ADLS_ACCOUNT_HOST = "adls.account-host"
ADLS_BLOB_STORAGE_AUTHORITY = "adls.blob-storage-authority"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread
NikitaMatskevich marked this conversation as resolved.
ADLS_DFS_STORAGE_AUTHORITY = "adls.dfs-storage-authority"
ADLS_BLOB_STORAGE_SCHEME = "adls.blob-storage-scheme",
ADLS_DFS_STORAGE_SCHEME = "adls.dfs-storage-scheme",
GCS_TOKEN = "gcs.oauth2.token"
GCS_TOKEN_EXPIRES_AT_MS = "gcs.oauth2.token-expires-at"
GCS_PROJECT_ID = "gcs.project-id"
Expand Down
38 changes: 38 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@
)
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
ADLS_ACCOUNT_NAME,
ADLS_ACCOUNT_KEY,
ADLS_BLOB_STORAGE_AUTHORITY,
ADLS_DFS_STORAGE_AUTHORITY,
ADLS_BLOB_STORAGE_SCHEME,
ADLS_DFS_STORAGE_SCHEME,
ADLS_SAS_TOKEN,
AWS_ACCESS_KEY_ID,
AWS_REGION,
AWS_ROLE_ARN,
Expand Down Expand Up @@ -394,6 +401,9 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
elif scheme in {"gs", "gcs"}:
return self._initialize_gcs_fs()

elif scheme in {"abfs", "abfss", "wasb", "wasbs"}:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant find any pyarrow docs that indicates wasb and wasbs is supported.

return self._initialize_azure_fs()

elif scheme in {"file"}:
return self._initialize_local_fs()

Expand Down Expand Up @@ -475,6 +485,34 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:

return S3FileSystem(**client_kwargs)

def _initialize_azure_fs(self) -> FileSystem:
from pyarrow.fs import AzureFileSystem

client_kwargs: Dict[str, str] = {}

if account_name := self.properties.get(ADLS_ACCOUNT_NAME):
client_kwargs["account_name"] = account_name

if account_key := self.properties.get(ADLS_ACCOUNT_KEY):
client_kwargs["account_key"] = account_key

if blob_storage_authority := self.properties.get(ADLS_BLOB_STORAGE_AUTHORITY):
client_kwargs["blob_storage_authority"] = blob_storage_authority

if dfs_storage_authority := self.properties.get(ADLS_DFS_STORAGE_AUTHORITY):
client_kwargs["dfs_storage_authority"] = dfs_storage_authority

if blob_storage_scheme := self.properties.get(ADLS_BLOB_STORAGE_SCHEME):
client_kwargs["blob_storage_scheme"] = blob_storage_scheme

if dfs_storage_scheme := self.properties.get(ADLS_DFS_STORAGE_SCHEME):
client_kwargs["dfs_storage_scheme"] = dfs_storage_scheme

if sas_token := self.properties.get(ADLS_SAS_TOKEN):
client_kwargs["sas_token"] = sas_token

return AzureFileSystem(**client_kwargs)
Comment thread
kevinjqliu marked this conversation as resolved.

def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
from pyarrow.fs import HadoopFileSystem

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pyparsing = ">=3.1.0,<4.0.0"
zstandard = ">=0.13.0,<1.0.0"
tenacity = ">=8.2.3,<10.0.0"
pyroaring = ">=1.0.0,<2.0.0"
pyarrow = { version = ">=17.0.0,<21.0.0", optional = true }
pyarrow = { version = ">=20.0.0,<21.0.0", optional = true }
Comment thread
NikitaMatskevich marked this conversation as resolved.
Outdated
pandas = { version = ">=1.0.0,<3.0.0", optional = true }
duckdb = { version = ">=0.5.0,<2.0.0", optional = true }
ray = [
Expand Down
80 changes: 59 additions & 21 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
GCS_SERVICE_HOST,
GCS_TOKEN,
GCS_TOKEN_EXPIRES_AT_MS,
ADLS_ACCOUNT_NAME,
ADLS_ACCOUNT_KEY,
ADLS_BLOB_STORAGE_AUTHORITY,
ADLS_DFS_STORAGE_SCHEME,
ADLS_BLOB_STORAGE_SCHEME,
ADLS_DFS_STORAGE_AUTHORITY,
fsspec,
load_file_io,
)
Expand Down Expand Up @@ -348,6 +354,11 @@ def table_schema_with_all_types() -> Schema:
)


@pytest.fixture(params=["abfss", "wasbs"])
Comment thread
NikitaMatskevich marked this conversation as resolved.
Outdated
def adls_scheme(request):
return request.param


@pytest.fixture(scope="session")
def pyarrow_schema_simple_without_ids() -> "pa.Schema":
import pyarrow as pa
Expand Down Expand Up @@ -2089,7 +2100,27 @@ def fsspec_fileio_gcs(request: pytest.FixtureRequest) -> FsspecFileIO:


@pytest.fixture
def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture
def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> 'PyArrowFileIO':
from pyiceberg.io.pyarrow import PyArrowFileIO

properties = {
Expand All @@ -2101,6 +2132,33 @@ def pyarrow_fileio_gcs(request: pytest.FixtureRequest) -> "PyArrowFileIO":
return PyArrowFileIO(properties=properties)


@pytest.fixture
def pyarrow_fileio_adls(request: pytest.FixtureRequest) -> Generator[Any, None, None]:
from azure.storage.blob import BlobServiceClient
from pyiceberg.io.pyarrow import PyArrowFileIO

azurite_url = request.config.getoption("--adls.endpoint")
azurite_scheme, azurite_authority = azurite_url.split('://', 1)

azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
ADLS_ACCOUNT_NAME: azurite_account_name,
ADLS_ACCOUNT_KEY: azurite_account_key,
ADLS_BLOB_STORAGE_AUTHORITY: azurite_authority,
ADLS_DFS_STORAGE_AUTHORITY: azurite_authority,
ADLS_BLOB_STORAGE_SCHEME: azurite_scheme,
ADLS_DFS_STORAGE_SCHEME: azurite_scheme,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("warehouse")
yield PyArrowFileIO(properties=properties)
bbs.delete_container("warehouse")
bbs.close()


def aws_credentials() -> None:
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
Expand Down Expand Up @@ -2162,26 +2220,6 @@ def fixture_dynamodb(_aws_credentials: None) -> Generator[boto3.client, None, No
yield boto3.client("dynamodb", region_name="us-east-1")


@pytest.fixture
def adls_fsspec_fileio(request: pytest.FixtureRequest) -> Generator[FsspecFileIO, None, None]:
from azure.storage.blob import BlobServiceClient

azurite_url = request.config.getoption("--adls.endpoint")
azurite_account_name = request.config.getoption("--adls.account-name")
azurite_account_key = request.config.getoption("--adls.account-key")
azurite_connection_string = f"DefaultEndpointsProtocol=http;AccountName={azurite_account_name};AccountKey={azurite_account_key};BlobEndpoint={azurite_url}/{azurite_account_name};"
properties = {
"adls.connection-string": azurite_connection_string,
"adls.account-name": azurite_account_name,
}

bbs = BlobServiceClient.from_connection_string(conn_str=azurite_connection_string)
bbs.create_container("tests")
yield fsspec.FsspecFileIO(properties=properties)
bbs.delete_container("tests")
bbs.close()


@pytest.fixture(scope="session")
def empty_home_dir_path(tmp_path_factory: pytest.TempPathFactory) -> str:
home_path = str(tmp_path_factory.mktemp("home"))
Expand Down
Loading
Loading