Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def delete(self, location: Union[str, InputFile, OutputFile]) -> None:
"s3a": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"s3n": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"gs": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO],
"file": [ARROW_FILE_IO, FSSPEC_FILE_IO],
"hdfs": [ARROW_FILE_IO],
"abfs": [FSSPEC_FILE_IO],
"abfss": [FSSPEC_FILE_IO],
Expand Down
3 changes: 2 additions & 1 deletion pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def s3v4_rest_signer(properties: Properties, request: AWSRequest, **_: Any) -> A


def _file(_: Properties) -> LocalFileSystem:
return LocalFileSystem()
return LocalFileSystem(auto_mkdir=True)


def _s3(properties: Properties) -> AbstractFileSystem:
Expand Down Expand Up @@ -173,6 +173,7 @@ def _adlfs(properties: Properties) -> AbstractFileSystem:


SCHEME_TO_FS = {
"": _file,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow defaults scheme to file when no scheme is present.

return "file", uri.netloc, os.path.abspath(location)

we essentially do the same here

"file": _file,
"s3": _s3,
"s3a": _s3,
Expand Down
11 changes: 8 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@
T = TypeVar("T")


class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile:
# In LocalFileSystem, parent directories must be first created before opening an output stream
self.create_dir(os.path.dirname(path), recursive=True)
return super().open_output_stream(path, *args, **kwargs)


class PyArrowFile(InputFile, OutputFile):
"""A combined InputFile and OutputFile implementation that uses a pyarrow filesystem to generate pyarrow.lib.NativeFile instances.

Expand Down Expand Up @@ -378,9 +385,7 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste

return GcsFileSystem(**gcs_kwargs)
elif scheme == "file":
from pyarrow.fs import LocalFileSystem

return LocalFileSystem()
return PyArrowLocalFileSystem()
else:
raise ValueError(f"Unrecognized filesystem type in URI: {scheme}")

Expand Down
62 changes: 60 additions & 2 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pathlib import Path
from typing import Generator, List

import pyarrow as pa
import pytest
from pytest import TempPathFactory
from pytest_lazyfixture import lazy_fixture
Expand All @@ -35,7 +36,10 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
NullOrder,
SortDirection,
Expand Down Expand Up @@ -80,7 +84,7 @@ def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, None, None]:
@pytest.fixture(scope="module")
def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": "sqlite:////tmp/sql-catalog.db",
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog("test_sql_catalog", **props)
Expand All @@ -92,7 +96,7 @@ def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]:
@pytest.fixture(scope="module")
def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": "sqlite:////tmp/sql-catalog.db",
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
}
catalog = SqlCatalog("test_sql_catalog", **props)
Expand All @@ -102,6 +106,19 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, No
catalog.destroy_tables()


@pytest.fixture(scope="module")
def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None, None]:
props = {
"uri": f"sqlite:////{warehouse}/sql-catalog.db",
"warehouse": f"file://{warehouse}",
PY_IO_IMPL: FSSPEC_FILE_IO,
}
catalog = SqlCatalog("test_sql_catalog", **props)
catalog.create_tables()
yield catalog
catalog.destroy_tables()


def test_creation_with_no_uri() -> None:
with pytest.raises(NoSuchPropertyException):
SqlCatalog("test_ddb_catalog", not_uri="unused")
Expand Down Expand Up @@ -722,6 +739,47 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_i
assert new_schema.find_field("b").field_type == IntegerType()


@pytest.mark.parametrize(
'catalog',
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
lazy_fixture('catalog_sqlite_without_rowcount'),
lazy_fixture('catalog_sqlite_fsspec'),
],
)
def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None:
database_name, _table_name = random_identifier
catalog.create_namespace(database_name)
table = catalog.create_table(random_identifier, table_schema_simple)

df = pa.Table.from_pydict(
{
"foo": ["a"],
"bar": [1],
"baz": [True],
},
schema=schema_to_pyarrow(table_schema_simple),
)

table.append(df)

# new snapshot is written in APPEND mode
assert len(table.metadata.snapshots) == 1
assert table.metadata.snapshots[0].snapshot_id == table.metadata.current_snapshot_id
assert table.metadata.snapshots[0].parent_snapshot_id is None
assert table.metadata.snapshots[0].sequence_number == 1
assert table.metadata.snapshots[0].summary is not None
assert table.metadata.snapshots[0].summary.operation == Operation.APPEND
assert table.metadata.snapshots[0].summary['added-data-files'] == '1'
assert table.metadata.snapshots[0].summary['added-records'] == '1'
assert table.metadata.snapshots[0].summary['total-data-files'] == '1'
assert table.metadata.snapshots[0].summary['total-records'] == '1'

# read back the data
assert df == table.scan().to_arrow()


@pytest.mark.parametrize(
'catalog',
[
Expand Down