Skip to content

Commit 25ddb2f

Browse files
committed
add iceberg datafusion integration
1 parent f507dbd commit 25ddb2f

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

pyiceberg/table/__init__.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1428,6 +1428,51 @@ def to_polars(self) -> pl.LazyFrame:
14281428

14291429
return pl.scan_iceberg(self)
14301430

1431+
def __datafusion_table_provider__(self) -> Any:
1432+
"""Return the DataFusion table provider PyCapsule interface.
1433+
1434+
To support DataFusion features such as push down filtering, this function will return a PyCapsule
1435+
interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
1436+
you should not need to call this function directly. Instead you can use ``register_table_provider`` in
1437+
the DataFusion SessionContext.
1438+
1439+
Returns:
1440+
A PyCapsule DataFusion TableProvider interface.
1441+
1442+
Example:
1443+
```python
1444+
from datafusion import SessionContext
1445+
from pyiceberg.catalog import load_catalog
1446+
import pyarrow as pa
1447+
catalog = load_catalog("catalog", type="in-memory")
1448+
catalog.create_namespace_if_not_exists("default")
1449+
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
1450+
iceberg_table = catalog.create_table("default.test", schema=data.schema)
1451+
iceberg_table.append(data)
1452+
ctx = SessionContext()
1453+
ctx.register_table_provider("test", iceberg_table)
1454+
ctx.table("test").show()
1455+
```
1456+
Results in
1457+
```
1458+
DataFrame()
1459+
+---+---+
1460+
| x | y |
1461+
+---+---+
1462+
| 1 | 4 |
1463+
| 2 | 5 |
1464+
| 3 | 6 |
1465+
+---+---+
1466+
```
1467+
"""
1468+
from pyiceberg_core.datafusion import IcebergDataFusionTable
1469+
1470+
return IcebergDataFusionTable(
1471+
identifier=self.name(),
1472+
metadata_location=self.metadata_location,
1473+
file_io_properties=self.io.properties,
1474+
).__datafusion_table_provider__()
1475+
14311476

14321477
class StaticTable(Table):
14331478
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

tests/table/test_datafusion.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
from pathlib import Path
20+
21+
import pyarrow as pa
22+
import pytest
23+
from datafusion import SessionContext
24+
25+
from pyiceberg.catalog import Catalog, load_catalog
26+
27+
28+
@pytest.fixture(scope="session")
29+
def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path:
30+
return tmp_path_factory.mktemp("warehouse")
31+
32+
33+
@pytest.fixture(scope="session")
34+
def catalog(warehouse: Path) -> Catalog:
35+
catalog = load_catalog(
36+
"default",
37+
uri=f"sqlite:///{warehouse}/pyiceberg_catalog.db",
38+
warehouse=f"file://{warehouse}",
39+
)
40+
return catalog
41+
42+
43+
def test_datafusion_register_pyiceberg_table(catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
44+
catalog.create_namespace_if_not_exists("default")
45+
iceberg_table = catalog.create_table_if_not_exists(
46+
"default.dataset",
47+
schema=arrow_table_with_null.schema,
48+
)
49+
iceberg_table.append(arrow_table_with_null)
50+
51+
ctx = SessionContext()
52+
ctx.register_table_provider("test", iceberg_table)
53+
54+
datafusion_table = ctx.table("test")
55+
assert datafusion_table is not None
56+
57+
assert datafusion_table.to_arrow_table().to_pylist() == iceberg_table.scan().to_arrow().to_pylist()
58+
59+
from pandas.testing import assert_frame_equal
60+
61+
assert_frame_equal(
62+
datafusion_table.to_arrow_table().to_pandas(),
63+
iceberg_table.scan().to_arrow().to_pandas(),
64+
)

0 commit comments

Comments
 (0)