Skip to content

Commit 0bfabfd

Browse files
committed
revert branch UNPICK
1 parent 9a4709d commit 0bfabfd

15 files changed

Lines changed: 811 additions & 685 deletions

File tree

Cargo.lock

Lines changed: 258 additions & 276 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
[package]
1919
name = "datafusion-python"
20-
version = "49.0.1"
20+
version = "49.0.0"
2121
homepage = "https://datafusion.apache.org/python"
2222
repository = "https://github.com/apache/datafusion-python"
2323
authors = ["Apache DataFusion <dev@datafusion.apache.org>"]
@@ -39,10 +39,10 @@ pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"]
3939
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
4040
pyo3-log = "0.12.4"
4141
arrow = { version = "55.1.0", features = ["pyarrow"] }
42-
datafusion = { version = "49.0.1", features = ["avro", "unicode_expressions"] }
43-
datafusion-substrait = { version = "49.0.1", optional = true }
44-
datafusion-proto = { version = "49.0.1" }
45-
datafusion-ffi = { version = "49.0.1" }
42+
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
43+
datafusion-substrait = { version = "49.0.2", optional = true }
44+
datafusion-proto = { version = "49.0.2" }
45+
datafusion-ffi = { version = "49.0.2" }
4646
prost = "0.13.1" # keep in line with `datafusion-substrait`
4747
uuid = { version = "1.18", features = ["v4"] }
4848
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ DataFusion's Python bindings can be used as a foundation for building new data s
4242
- Serialize and deserialize query plans in Substrait format.
4343
- Experimental support for transpiling SQL queries to DataFrame calls with Polars, Pandas, and cuDF.
4444

45+
For tips on tuning parallelism, see
46+
[Maximizing CPU Usage](docs/source/user-guide/configuration.rst#maximizing-cpu-usage)
47+
in the configuration guide.
48+
4549
## Example Usage
4650

4751
The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results
@@ -227,6 +231,8 @@ and for `uv run` commands the additional parameter `--no-project`
227231
```bash
228232
# fetch this repo
229233
git clone git@github.com:apache/datafusion-python.git
234+
# cd to the repo root
235+
cd datafusion-python/
230236
# create the virtual enviornment
231237
uv sync --dev --no-install-package datafusion
232238
# activate the environment
@@ -238,6 +244,8 @@ Bootstrap (`pip`):
238244
```bash
239245
# fetch this repo
240246
git clone git@github.com:apache/datafusion-python.git
247+
# cd to the repo root
248+
cd datafusion-python/
241249
# prepare development environment (used to build wheel / install in development)
242250
python3 -m venv .venv
243251
# activate the venv

benchmarks/max_cpu_usage.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Benchmark script showing how to maximize CPU usage.
18+
19+
This script demonstrates one example of tuning DataFusion for improved parallelism
20+
and CPU utilization. It uses synthetic in-memory data and performs simple aggregation
21+
operations to showcase the impact of partitioning configuration.
22+
23+
IMPORTANT: This is a simplified example designed to illustrate partitioning concepts.
24+
Actual performance in your applications may vary significantly based on many factors:
25+
26+
- Type of table providers (Parquet files, CSV, databases, etc.)
27+
- I/O operations and storage characteristics (local disk, network, cloud storage)
28+
- Query complexity and operation types (joins, window functions, complex expressions)
29+
- Data distribution and size characteristics
30+
- Memory available and hardware specifications
31+
- Network latency for distributed data sources
32+
33+
It is strongly recommended that you create similar benchmarks tailored to your specific:
34+
- Hardware configuration
35+
- Data sources and formats
36+
- Typical query patterns and workloads
37+
- Performance requirements
38+
39+
This will give you more accurate insights into how DataFusion configuration options
40+
will affect your particular use case.
41+
"""
42+
43+
from __future__ import annotations
44+
45+
import argparse
46+
import multiprocessing
47+
import time
48+
49+
import pyarrow as pa
50+
from datafusion import SessionConfig, SessionContext, col
51+
from datafusion import functions as f
52+
53+
54+
def main(num_rows: int, partitions: int) -> None:
55+
"""Run a simple aggregation after repartitioning.
56+
57+
This function demonstrates basic partitioning concepts using synthetic data.
58+
Real-world performance will depend on your specific data sources, query types,
59+
and system configuration.
60+
"""
61+
# Create some example data (synthetic in-memory data for demonstration)
62+
# Note: Real applications typically work with files, databases, or other
63+
# data sources that have different I/O and distribution characteristics
64+
array = pa.array(range(num_rows))
65+
batch = pa.record_batch([array], names=["a"])
66+
67+
# Configure the session to use a higher target partition count and
68+
# enable automatic repartitioning.
69+
config = (
70+
SessionConfig()
71+
.with_target_partitions(partitions)
72+
.with_repartition_joins(enabled=True)
73+
.with_repartition_aggregations(enabled=True)
74+
.with_repartition_windows(enabled=True)
75+
)
76+
ctx = SessionContext(config)
77+
78+
# Register the input data and repartition manually to ensure that all
79+
# partitions are used.
80+
df = ctx.create_dataframe([[batch]]).repartition(partitions)
81+
82+
start = time.time()
83+
df = df.aggregate([], [f.sum(col("a"))])
84+
df.collect()
85+
end = time.time()
86+
87+
print(
88+
f"Processed {num_rows} rows using {partitions} partitions in {end - start:.3f}s"
89+
)
90+
91+
92+
if __name__ == "__main__":
93+
parser = argparse.ArgumentParser(description=__doc__)
94+
parser.add_argument(
95+
"--rows",
96+
type=int,
97+
default=1_000_000,
98+
help="Number of rows in the generated dataset",
99+
)
100+
parser.add_argument(
101+
"--partitions",
102+
type=int,
103+
default=multiprocessing.cpu_count(),
104+
help="Target number of partitions to use",
105+
)
106+
args = parser.parse_args()
107+
main(args.rows, args.partitions)

dev/changelog/49.0.0.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# Apache DataFusion Python 49.0.0 Changelog
21+
22+
This release consists of 16 commits from 7 contributors. See credits at the end of this changelog for more information.
23+
24+
**Fixed bugs:**
25+
26+
- fix(build): Include build.rs in published crates [#1199](https://github.com/apache/datafusion-python/pull/1199) (colinmarc)
27+
28+
**Other:**
29+
30+
- 48.0.0 Release [#1175](https://github.com/apache/datafusion-python/pull/1175) (timsaucer)
31+
- Update CI rules [#1188](https://github.com/apache/datafusion-python/pull/1188) (timsaucer)
32+
- Fix Python UDAF Accumulator Interface example to Properly Handle State and Updates with List[Array] Types [#1192](https://github.com/apache/datafusion-python/pull/1192) (kosiew)
33+
- chore: Upgrade datafusion to version 49 [#1200](https://github.com/apache/datafusion-python/pull/1200) (nuno-faria)
34+
- Update how to dev instructions [#1179](https://github.com/apache/datafusion-python/pull/1179) (ntjohnson1)
35+
- build(deps): bump object_store from 0.12.2 to 0.12.3 [#1189](https://github.com/apache/datafusion-python/pull/1189) (dependabot[bot])
36+
- build(deps): bump uuid from 1.17.0 to 1.18.0 [#1202](https://github.com/apache/datafusion-python/pull/1202) (dependabot[bot])
37+
- build(deps): bump async-trait from 0.1.88 to 0.1.89 [#1203](https://github.com/apache/datafusion-python/pull/1203) (dependabot[bot])
38+
- build(deps): bump slab from 0.4.10 to 0.4.11 [#1205](https://github.com/apache/datafusion-python/pull/1205) (dependabot[bot])
39+
- Improved window and aggregate function signature [#1187](https://github.com/apache/datafusion-python/pull/1187) (timsaucer)
40+
- Optional improvements in verification instructions [#1183](https://github.com/apache/datafusion-python/pull/1183) (paleolimbot)
41+
- Improve `show()` output for empty DataFrames [#1208](https://github.com/apache/datafusion-python/pull/1208) (kosiew)
42+
- build(deps): bump actions/download-artifact from 4 to 5 [#1201](https://github.com/apache/datafusion-python/pull/1201) (dependabot[bot])
43+
- build(deps): bump url from 2.5.4 to 2.5.7 [#1210](https://github.com/apache/datafusion-python/pull/1210) (dependabot[bot])
44+
- build(deps): bump actions/checkout from 4 to 5 [#1204](https://github.com/apache/datafusion-python/pull/1204) (dependabot[bot])
45+
46+
## Credits
47+
48+
Thank you to everyone who contributed to this release. Here is a breakdown of commits (PRs merged) per contributor.
49+
50+
```
51+
7 dependabot[bot]
52+
3 Tim Saucer
53+
2 kosiew
54+
1 Colin Marc
55+
1 Dewey Dunnington
56+
1 Nick
57+
1 Nuno Faria
58+
```
59+
60+
Thank you also to everyone who contributed in other ways such as filing issues, reviewing PRs, and providing feedback on this release.
61+

docs/source/user-guide/configuration.rst

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,141 @@ a :py:class:`~datafusion.context.SessionConfig` and :py:class:`~datafusion.conte
4646
ctx = SessionContext(config, runtime)
4747
print(ctx)
4848
49+
Maximizing CPU Usage
50+
--------------------
4951

50-
You can read more about available :py:class:`~datafusion.context.SessionConfig` options in the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
52+
DataFusion uses partitions to parallelize work. For small queries the
53+
default configuration (number of CPU cores) is often sufficient, but to
54+
fully utilize available hardware you can tune how many partitions are
55+
created and when DataFusion will repartition data automatically.
56+
57+
Configure a ``SessionContext`` with a higher partition count:
58+
59+
.. code-block:: python
60+
61+
from datafusion import SessionConfig, SessionContext
62+
63+
# allow up to 16 concurrent partitions
64+
config = SessionConfig().with_target_partitions(16)
65+
ctx = SessionContext(config)
66+
67+
Automatic repartitioning for joins, aggregations, window functions and
68+
other operations can be enabled to increase parallelism:
69+
70+
.. code-block:: python
71+
72+
config = (
73+
SessionConfig()
74+
.with_target_partitions(16)
75+
.with_repartition_joins(True)
76+
.with_repartition_aggregations(True)
77+
.with_repartition_windows(True)
78+
)
79+
80+
Manual repartitioning is available on DataFrames when you need precise
81+
control:
82+
83+
.. code-block:: python
84+
85+
from datafusion import col
86+
87+
df = ctx.read_parquet("data.parquet")
88+
89+
# Evenly divide into 16 partitions
90+
df = df.repartition(16)
91+
92+
# Or partition by the hash of a column
93+
df = df.repartition_by_hash(col("a"), num=16)
94+
95+
result = df.collect()
96+
97+
98+
Benchmark Example
99+
^^^^^^^^^^^^^^^^^
100+
101+
The repository includes a benchmark script that demonstrates how to maximize CPU usage
102+
with DataFusion. The :code:`benchmarks/max_cpu_usage.py` script shows a practical example
103+
of configuring DataFusion for optimal parallelism.
104+
105+
You can run the benchmark script to see the impact of different configuration settings:
106+
107+
.. code-block:: bash
108+
109+
# Run with default settings (uses all CPU cores)
110+
python benchmarks/max_cpu_usage.py
111+
112+
# Run with specific number of rows and partitions
113+
python benchmarks/max_cpu_usage.py --rows 5000000 --partitions 16
114+
115+
# See all available options
116+
python benchmarks/max_cpu_usage.py --help
117+
118+
Here's an example showing the performance difference between single and multiple partitions:
119+
120+
.. code-block:: bash
121+
122+
# Single partition - slower processing
123+
$ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 1
124+
Processed 10000000 rows using 1 partitions in 0.107s
125+
126+
# Multiple partitions - faster processing
127+
$ python benchmarks/max_cpu_usage.py --rows=10000000 --partitions 10
128+
Processed 10000000 rows using 10 partitions in 0.038s
129+
130+
This example demonstrates nearly 3x performance improvement (0.107s vs 0.038s) when using
131+
10 partitions instead of 1, showcasing how proper partitioning can significantly improve
132+
CPU utilization and query performance.
133+
134+
The script demonstrates several key optimization techniques:
135+
136+
1. **Higher target partition count**: Uses :code:`with_target_partitions()` to set the number of concurrent partitions
137+
2. **Automatic repartitioning**: Enables repartitioning for joins, aggregations, and window functions
138+
3. **Manual repartitioning**: Uses :code:`repartition()` to ensure all partitions are utilized
139+
4. **CPU-intensive operations**: Performs aggregations that can benefit from parallelization
140+
141+
The benchmark creates synthetic data and measures the time taken to perform a sum aggregation
142+
across the specified number of partitions. This helps you understand how partition configuration
143+
affects performance on your specific hardware.
144+
145+
Important Considerations
146+
""""""""""""""""""""""""
147+
148+
The provided benchmark script demonstrates partitioning concepts using synthetic in-memory data
149+
and simple aggregation operations. While useful for understanding basic configuration principles,
150+
actual performance in production environments may vary significantly based on numerous factors:
151+
152+
**Data Sources and I/O Characteristics:**
153+
154+
- **Table providers**: Performance differs greatly between Parquet files, CSV files, databases, and cloud storage
155+
- **Storage type**: Local SSD, network-attached storage, and cloud storage have vastly different characteristics
156+
- **Network latency**: Remote data sources introduce additional latency considerations
157+
- **File sizes and distribution**: Large files may benefit differently from partitioning than many small files
158+
159+
**Query and Workload Characteristics:**
160+
161+
- **Operation complexity**: Simple aggregations versus complex joins, window functions, or nested queries
162+
- **Data distribution**: Skewed data may not partition evenly, affecting parallel efficiency
163+
- **Memory usage**: Large datasets may require different memory management strategies
164+
- **Concurrent workloads**: Multiple queries running simultaneously affect resource allocation
165+
166+
**Hardware and Environment Factors:**
167+
168+
- **CPU architecture**: Different processors have varying parallel processing capabilities
169+
- **Available memory**: Limited RAM may require different optimization strategies
170+
- **System load**: Other applications competing for resources affect DataFusion performance
171+
172+
**Recommendations for Production Use:**
173+
174+
To optimize DataFusion for your specific use case, it is strongly recommended to:
175+
176+
1. **Create custom benchmarks** using your actual data sources, formats, and query patterns
177+
2. **Test with representative data volumes** that match your production workloads
178+
3. **Measure end-to-end performance** including data loading, processing, and result handling
179+
4. **Evaluate different configuration combinations** for your specific hardware and workload
180+
5. **Monitor resource utilization** (CPU, memory, I/O) to identify bottlenecks in your environment
181+
182+
This approach will provide more accurate insights into how DataFusion configuration options
183+
will impact your particular applications and infrastructure.
184+
185+
For more information about available :py:class:`~datafusion.context.SessionConfig` options, see the `rust DataFusion Configuration guide <https://arrow.apache.org/datafusion/user-guide/configs.html>`_,
51186
and about :code:`RuntimeEnvBuilder` options in the rust `online API documentation <https://docs.rs/datafusion/latest/datafusion/execution/runtime_env/struct.RuntimeEnvBuilder.html>`_.

docs/source/user-guide/dataframe/index.rst

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -145,31 +145,10 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152-
PyArrow Streaming
153-
-----------------
154-
155-
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156-
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157-
Earlier versions eagerly converted the entire DataFrame when exporting to
158-
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159-
are produced lazily so you can process arbitrarily large results without
160-
out-of-memory errors.
161-
162-
.. code-block:: python
163-
164-
import pyarrow as pa
165-
166-
# Create a PyArrow RecordBatchReader without materializing all batches
167-
reader = pa.RecordBatchReader._import_from_c(df.__arrow_c_stream__())
168-
for batch in reader:
169-
... # process each batch as it is produced
170-
171-
See :doc:`../io/arrow` for additional details on the Arrow interface.
172-
173152
HTML Rendering
174153
--------------
175154

0 commit comments

Comments
 (0)