Skip to content

Commit e6fe55b

Browse files
committed
Initialize daft-paimon connector
1 parent c960676 commit e6fe55b

16 files changed

Lines changed: 3170 additions & 1 deletion

docs/content/pypaimon/daft.md

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
---
2+
title: "Daft"
3+
weight: 4
4+
type: docs
5+
aliases:
6+
- /pypaimon/daft.html
7+
---
8+
9+
<!--
10+
Licensed to the Apache Software Foundation (ASF) under one
11+
or more contributor license agreements. See the NOTICE file
12+
distributed with this work for additional information
13+
regarding copyright ownership. The ASF licenses this file
14+
to you under the Apache License, Version 2.0 (the
15+
"License"); you may not use this file except in compliance
16+
with the License. You may obtain a copy of the License at
17+
18+
http://www.apache.org/licenses/LICENSE-2.0
19+
20+
Unless required by applicable law or agreed to in writing,
21+
software distributed under the License is distributed on an
22+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23+
KIND, either express or implied. See the License for the
24+
specific language governing permissions and limitations
25+
under the License.
26+
-->
27+
28+
# Daft
29+
30+
[Daft](https://www.getdaft.io/) is a distributed DataFrame engine for Python.
31+
32+
This requires `daft` to be installed:
33+
34+
```bash
35+
pip install pypaimon[daft]
36+
```
37+
38+
`pypaimon.daft` exposes a top-level `read_paimon` / `write_paimon` API that
39+
takes a table identifier and catalog options directly.
40+
41+
## Read
42+
43+
### `read_paimon` (recommended)
44+
45+
```python
46+
from pypaimon.daft import read_paimon
47+
48+
df = read_paimon(
49+
"database_name.table_name",
50+
catalog_options={"warehouse": "/path/to/warehouse"},
51+
)
52+
53+
df.show()
54+
```
55+
56+
`read_paimon` opens its own catalog and resolves the table in a single call.
57+
58+
The returned DataFrame is lazy. Use standard Daft operations for filtering,
59+
projection, and limit — they are automatically pushed down into the Paimon scan
60+
via Daft's DataSource protocol:
61+
62+
```python
63+
import daft
64+
65+
df = read_paimon(
66+
"database_name.table_name",
67+
catalog_options={"warehouse": "/path/to/warehouse"},
68+
)
69+
70+
# Filter pushdown (partition pruning + file-level skipping)
71+
df = df.where(daft.col("dt") == "2024-01-01")
72+
73+
# Projection pushdown (only requested columns are read from disk)
74+
df = df.select("id", "name")
75+
76+
# Limit pushdown
77+
df = df.limit(100)
78+
79+
df.show()
80+
```
81+
82+
**Time travel:**
83+
84+
```python
85+
# Read a specific snapshot.
86+
df = read_paimon(
87+
"database_name.table_name",
88+
catalog_options={"warehouse": "/path/to/warehouse"},
89+
snapshot_id=42,
90+
)
91+
92+
# Read a tagged snapshot.
93+
df = read_paimon(
94+
"database_name.table_name",
95+
catalog_options={"warehouse": "/path/to/warehouse"},
96+
tag_name="release-2026-04",
97+
)
98+
```
99+
100+
`snapshot_id` and `tag_name` are mutually exclusive.
101+
102+
**Parameters:**
103+
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
104+
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
105+
e.g. `{"warehouse": "/path/to/warehouse"}`.
106+
- `snapshot_id`: optional snapshot id to time-travel to. Mutually
107+
exclusive with `tag_name`.
108+
- `tag_name`: optional tag name to time-travel to. Mutually
109+
exclusive with `snapshot_id`.
110+
- `io_config`: optional Daft `IOConfig` for accessing object storage.
111+
If `None`, will be inferred from the catalog options.
112+
113+
For tables on object stores, credentials are inferred from the catalog options
114+
automatically, or you can pass an explicit `IOConfig`:
115+
116+
```python
117+
from daft.io import IOConfig, S3Config
118+
119+
df = read_paimon(
120+
"my_db.my_table",
121+
catalog_options={
122+
"warehouse": "s3://my-bucket/warehouse",
123+
"fs.s3.accessKeyId": "...",
124+
"fs.s3.accessKeySecret": "...",
125+
},
126+
)
127+
df.show()
128+
```
129+
130+
**Features:**
131+
- Append-only tables with Parquet format use Daft's native high-performance Parquet reader.
132+
- Primary-key tables that require LSM merge fall back to pypaimon's built-in reader.
133+
- Partition pruning, predicate pushdown, projection pushdown, and limit pushdown are all supported.
134+
135+
## Write
136+
137+
### `write_paimon` (recommended)
138+
139+
```python
140+
import daft
141+
from pypaimon.daft import write_paimon
142+
143+
df = daft.from_pydict({
144+
"id": [1, 2, 3],
145+
"name": ["alice", "bob", "charlie"],
146+
"dt": ["2024-01-01", "2024-01-01", "2024-01-01"],
147+
})
148+
149+
write_paimon(
150+
df,
151+
"database_name.table_name",
152+
catalog_options={"warehouse": "/path/to/warehouse"},
153+
)
154+
```
155+
156+
`write_paimon` opens its own catalog, resolves the table, and commits the
157+
write through Daft's DataSink API.
158+
159+
**Overwrite mode:**
160+
161+
```python
162+
write_paimon(
163+
df,
164+
"database_name.table_name",
165+
catalog_options={"warehouse": "/path/to/warehouse"},
166+
mode="overwrite",
167+
)
168+
```
169+
170+
**Parameters:**
171+
- `df`: the Daft DataFrame to write.
172+
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
173+
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
174+
- `mode`: write mode — `"append"` (default) or `"overwrite"`.
175+
176+
## Catalog Abstraction
177+
178+
Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` interfaces:
179+
180+
```python
181+
import pypaimon
182+
from pypaimon.daft import PaimonCatalog
183+
184+
inner = pypaimon.CatalogFactory.create({"warehouse": "/path/to/warehouse"})
185+
catalog = PaimonCatalog(inner, name="my_paimon")
186+
187+
# Browse
188+
catalog.list_namespaces()
189+
catalog.list_tables()
190+
191+
# Read / write through catalog
192+
table = catalog.get_table("my_db.my_table")
193+
df = table.read()
194+
table.append(df)
195+
table.overwrite(df)
196+
```
197+
198+
You can also wrap a single table directly:
199+
200+
```python
201+
from pypaimon.daft import PaimonTable
202+
203+
inner_table = inner.get_table("my_db.my_table")
204+
table = PaimonTable(inner_table)
205+
df = table.read()
206+
```
207+
208+
### Creating Tables
209+
210+
```python
211+
import daft
212+
from daft.io.partitioning import PartitionField
213+
214+
schema = daft.from_pydict({"id": [1], "name": ["a"], "dt": ["2024-01-01"]}).schema()
215+
dt_field = schema["dt"]
216+
partition_fields = [PartitionField.create(dt_field)]
217+
218+
table = catalog.create_table(
219+
"my_db.new_table",
220+
schema,
221+
partition_fields=partition_fields,
222+
)
223+
224+
# Primary-key table
225+
table = catalog.create_table(
226+
"my_db.pk_table",
227+
schema,
228+
properties={"primary_keys": ["id", "dt"]},
229+
partition_fields=partition_fields,
230+
)
231+
```
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
from pypaimon.daft.daft_paimon import read_paimon, write_paimon
20+
21+
__all__ = ["read_paimon", "write_paimon", "PaimonCatalog", "PaimonTable"]
22+
23+
24+
def __getattr__(name):
25+
if name in ("PaimonCatalog", "PaimonTable"):
26+
from pypaimon.daft.daft_catalog import PaimonCatalog, PaimonTable
27+
28+
globals()["PaimonCatalog"] = PaimonCatalog
29+
globals()["PaimonTable"] = PaimonTable
30+
return globals()[name]
31+
raise AttributeError(f"module 'pypaimon.daft' has no attribute {name!r}")
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
"""Utilities for deserializing Paimon BlobDescriptor bytes into FileReference arrays."""
20+
21+
from __future__ import annotations
22+
23+
import struct
24+
25+
import pyarrow as pa
26+
27+
FILE_PHYSICAL_TYPE = pa.struct(
28+
[
29+
pa.field("url", pa.large_utf8()),
30+
pa.field("io_config", pa.large_binary()),
31+
pa.field("offset", pa.int64()),
32+
pa.field("length", pa.int64()),
33+
]
34+
)
35+
36+
37+
def _deserialize_one(data: bytes) -> tuple[str, int, int]:
38+
"""Deserialize a single BlobDescriptor -> (url, offset, length)."""
39+
pos = 0
40+
version = data[pos]
41+
pos += 1
42+
43+
if version > 1:
44+
pos += 8 # skip magic
45+
46+
uri_len = struct.unpack_from("<I", data, pos)[0]
47+
pos += 4
48+
49+
uri = data[pos:pos + uri_len].decode("utf-8")
50+
pos += uri_len
51+
52+
offset = struct.unpack_from("<q", data, pos)[0]
53+
pos += 8
54+
55+
length = struct.unpack_from("<q", data, pos)[0]
56+
return uri, offset, length
57+
58+
59+
def blob_column_to_file_array(column: pa.Array) -> pa.Array:
60+
"""Convert a large_binary column of serialized BlobDescriptors to a FileReference-compatible struct."""
61+
urls: list[str | None] = []
62+
offsets: list[int | None] = []
63+
lengths: list[int | None] = []
64+
65+
for value in column:
66+
if value is None or not value.is_valid:
67+
urls.append(None)
68+
offsets.append(None)
69+
lengths.append(None)
70+
else:
71+
raw = value.as_py()
72+
uri, off, length = _deserialize_one(raw)
73+
urls.append(uri)
74+
offsets.append(off)
75+
lengths.append(length)
76+
77+
n = len(urls)
78+
return pa.StructArray.from_arrays(
79+
[
80+
pa.array(urls, type=pa.large_utf8()),
81+
pa.nulls(n, type=pa.large_binary()),
82+
pa.array(offsets, type=pa.int64()),
83+
pa.array(lengths, type=pa.int64()),
84+
],
85+
names=["url", "io_config", "offset", "length"],
86+
)

0 commit comments

Comments
 (0)