Skip to content

Commit 299fc48

Browse files
committed
add iceberg datafusion integration
1 parent 9c99f32 commit 299fc48

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

pyiceberg/table/__init__.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,6 +1494,51 @@ def to_polars(self) -> pl.LazyFrame:
14941494

14951495
return pl.scan_iceberg(self)
14961496

1497+
def __datafusion_table_provider__(self) -> Any:
1498+
"""Return the DataFusion table provider PyCapsule interface.
1499+
1500+
To support DataFusion features such as push down filtering, this function will return a PyCapsule
1501+
interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
1502+
you should not need to call this function directly. Instead you can use ``register_table_provider`` in
1503+
the DataFusion SessionContext.
1504+
1505+
Returns:
1506+
A PyCapsule DataFusion TableProvider interface.
1507+
1508+
Example:
1509+
```python
1510+
from datafusion import SessionContext
1511+
from pyiceberg.catalog import load_catalog
1512+
import pyarrow as pa
1513+
catalog = load_catalog("catalog", type="in-memory")
1514+
catalog.create_namespace_if_not_exists("default")
1515+
data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
1516+
iceberg_table = catalog.create_table("default.test", schema=data.schema)
1517+
iceberg_table.append(data)
1518+
ctx = SessionContext()
1519+
ctx.register_table_provider("test", iceberg_table)
1520+
ctx.table("test").show()
1521+
```
1522+
Results in
1523+
```
1524+
DataFrame()
1525+
+---+---+
1526+
| x | y |
1527+
+---+---+
1528+
| 1 | 4 |
1529+
| 2 | 5 |
1530+
| 3 | 6 |
1531+
+---+---+
1532+
```
1533+
"""
1534+
from pyiceberg_core.datafusion import IcebergDataFusionTable
1535+
1536+
return IcebergDataFusionTable(
1537+
identifier=self.name(),
1538+
metadata_location=self.metadata_location,
1539+
file_io_properties=self.io.properties,
1540+
).__datafusion_table_provider__()
1541+
14971542

14981543
class StaticTable(Table):
14991544
"""Load a table directly from a metadata file (i.e., without using a catalog)."""

tests/table/test_datafusion.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
19+
from pathlib import Path
20+
21+
import pyarrow as pa
22+
import pytest
23+
from datafusion import SessionContext
24+
25+
from pyiceberg.catalog import Catalog, load_catalog
26+
27+
28+
@pytest.fixture(scope="session")
29+
def warehouse(tmp_path_factory: pytest.TempPathFactory) -> Path:
30+
return tmp_path_factory.mktemp("warehouse")
31+
32+
33+
@pytest.fixture(scope="session")
34+
def catalog(warehouse: Path) -> Catalog:
35+
catalog = load_catalog(
36+
"default",
37+
uri=f"sqlite:///{warehouse}/pyiceberg_catalog.db",
38+
warehouse=f"file://{warehouse}",
39+
)
40+
return catalog
41+
42+
43+
def test_datafusion_register_pyiceberg_table(catalog: Catalog, arrow_table_with_null: pa.Table) -> None:
44+
catalog.create_namespace_if_not_exists("default")
45+
iceberg_table = catalog.create_table_if_not_exists(
46+
"default.dataset",
47+
schema=arrow_table_with_null.schema,
48+
)
49+
iceberg_table.append(arrow_table_with_null)
50+
51+
ctx = SessionContext()
52+
ctx.register_table_provider("test", iceberg_table)
53+
54+
datafusion_table = ctx.table("test")
55+
assert datafusion_table is not None
56+
57+
assert datafusion_table.to_arrow_table().to_pylist() == iceberg_table.scan().to_arrow().to_pylist()
58+
59+
from pandas.testing import assert_frame_equal
60+
61+
assert_frame_equal(
62+
datafusion_table.to_arrow_table().to_pandas(),
63+
iceberg_table.scan().to_arrow().to_pandas(),
64+
)

0 commit comments

Comments
 (0)