Skip to content

Commit f9a2d80

Browse files
feat: add metrics collection for execution plans
Adds ExecutionPlan.metrics() and ExecutionPlan.collect_metrics() for accessing query execution metrics after running queries. - New Metric and MetricsSet classes expose individual metrics and aggregations - ExecutionPlan.execute_collect() populates metrics during execution - ExecutionPlan.collect_metrics() walks plan tree to gather all metrics - Proper None handling for timestamp conversions in Rust - Concise docstrings and test assertions Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 4cd5674 commit f9a2d80

File tree

6 files changed

+372
-4
lines changed

6 files changed

+372
-4
lines changed

python/datafusion/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
from .expr import Expr, WindowFrame
5656
from .io import read_avro, read_csv, read_json, read_parquet
5757
from .options import CsvReadOptions
58-
from .plan import ExecutionPlan, LogicalPlan
58+
from .plan import ExecutionPlan, LogicalPlan, Metric, MetricsSet
5959
from .record_batch import RecordBatch, RecordBatchStream
6060
from .user_defined import (
6161
Accumulator,
@@ -85,6 +85,8 @@
8585
"Expr",
8686
"InsertOp",
8787
"LogicalPlan",
88+
"Metric",
89+
"MetricsSet",
8890
"ParquetColumnOptions",
8991
"ParquetWriterOptions",
9092
"RecordBatch",

python/datafusion/plan.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
__all__ = [
3030
"ExecutionPlan",
3131
"LogicalPlan",
32+
"Metric",
33+
"MetricsSet",
3234
]
3335

3436

@@ -151,3 +153,111 @@ def to_proto(self) -> bytes:
151153
Tables created in memory from record batches are currently not supported.
152154
"""
153155
return self._raw_plan.to_proto()
156+
157+
def execute_collect(self, ctx: SessionContext) -> list[Any]:
158+
"""Execute and collect results. Populates metrics on this plan."""
159+
return self._raw_plan.execute_collect(ctx.ctx)
160+
161+
def metrics(self) -> MetricsSet | None:
162+
"""Return metrics for this plan node after execution, or None if unavailable."""
163+
raw = self._raw_plan.metrics()
164+
if raw is None:
165+
return None
166+
return MetricsSet(raw)
167+
168+
def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
169+
"""Walk the plan tree and collect metrics from all operators.
170+
171+
Returns a list of (operator_name, MetricsSet) tuples.
172+
"""
173+
result: list[tuple[str, MetricsSet]] = []
174+
175+
def _walk(node: ExecutionPlan) -> None:
176+
ms = node.metrics()
177+
if ms is not None:
178+
result.append((node.display(), ms))
179+
for child in node.children():
180+
_walk(child)
181+
182+
_walk(self)
183+
return result
184+
185+
186+
class MetricsSet:
187+
"""A set of metrics for a single execution plan operator.
188+
189+
Provides both individual metric access and convenience aggregations
190+
across partitions.
191+
"""
192+
193+
def __init__(self, raw: df_internal.MetricsSet) -> None:
194+
"""This constructor should not be called by the end user."""
195+
self._raw = raw
196+
197+
def metrics(self) -> list[Metric]:
198+
"""Return all individual metrics in this set."""
199+
return [Metric(m) for m in self._raw.metrics()]
200+
201+
@property
202+
def output_rows(self) -> int | None:
203+
"""Sum of output_rows across all partitions."""
204+
return self._raw.output_rows()
205+
206+
@property
207+
def elapsed_compute(self) -> int | None:
208+
"""Sum of elapsed_compute across all partitions, in nanoseconds."""
209+
return self._raw.elapsed_compute()
210+
211+
@property
212+
def spill_count(self) -> int | None:
213+
"""Sum of spill_count across all partitions."""
214+
return self._raw.spill_count()
215+
216+
@property
217+
def spilled_bytes(self) -> int | None:
218+
"""Sum of spilled_bytes across all partitions."""
219+
return self._raw.spilled_bytes()
220+
221+
@property
222+
def spilled_rows(self) -> int | None:
223+
"""Sum of spilled_rows across all partitions."""
224+
return self._raw.spilled_rows()
225+
226+
def sum_by_name(self, name: str) -> int | None:
227+
"""Return the sum of metrics matching the given name."""
228+
return self._raw.sum_by_name(name)
229+
230+
def __repr__(self) -> str:
231+
"""Return a string representation of the metrics set."""
232+
return repr(self._raw)
233+
234+
235+
class Metric:
236+
"""A single execution metric with name, value, partition, and labels."""
237+
238+
def __init__(self, raw: df_internal.Metric) -> None:
239+
"""This constructor should not be called by the end user."""
240+
self._raw = raw
241+
242+
@property
243+
def name(self) -> str:
244+
"""The name of this metric (e.g. ``output_rows``)."""
245+
return self._raw.name
246+
247+
@property
248+
def value(self) -> int | None:
249+
"""The numeric value of this metric, or None for non-numeric types."""
250+
return self._raw.value
251+
252+
@property
253+
def partition(self) -> int | None:
254+
"""The partition this metric applies to, or None if global."""
255+
return self._raw.partition
256+
257+
def labels(self) -> dict[str, str]:
258+
"""Return the labels associated with this metric."""
259+
return self._raw.labels()
260+
261+
def __repr__(self) -> str:
262+
"""Return a string representation of the metric."""
263+
return repr(self._raw)

python/tests/test_plans.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@
1616
# under the License.
1717

1818
import pytest
19-
from datafusion import ExecutionPlan, LogicalPlan, SessionContext
19+
from datafusion import (
20+
ExecutionPlan,
21+
LogicalPlan,
22+
Metric,
23+
MetricsSet,
24+
SessionContext,
25+
)
2026

2127

2228
# Note: We must use CSV because memory tables are currently not supported for
@@ -40,3 +46,87 @@ def test_logical_plan_to_proto(ctx, df) -> None:
4046
execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
4147

4248
assert str(original_execution_plan) == str(execution_plan)
49+
50+
51+
def test_execution_plan_metrics() -> None:
52+
ctx = SessionContext()
53+
ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
54+
df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
55+
56+
plan = df.execution_plan()
57+
plan.execute_collect(ctx)
58+
59+
found_metrics = False
60+
61+
def _check(node):
62+
nonlocal found_metrics
63+
ms = node.metrics()
64+
if ms is not None and ms.output_rows is not None and ms.output_rows > 0:
65+
found_metrics = True
66+
for child in node.children():
67+
_check(child)
68+
69+
_check(plan)
70+
assert found_metrics
71+
72+
73+
def test_metric_properties() -> None:
74+
ctx = SessionContext()
75+
ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
76+
df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
77+
78+
plan = df.execution_plan()
79+
plan.execute_collect(ctx)
80+
81+
for _, ms in plan.collect_metrics():
82+
for metric in ms.metrics():
83+
assert isinstance(metric, Metric)
84+
assert isinstance(metric.name, str)
85+
assert len(metric.name) > 0
86+
assert metric.partition is None or isinstance(metric.partition, int)
87+
assert isinstance(metric.labels(), dict)
88+
return
89+
pytest.skip("No metrics found")
90+
91+
92+
def test_metrics_tree_walk() -> None:
93+
ctx = SessionContext()
94+
ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'a'), (4, 'b')")
95+
df = ctx.sql("SELECT column2, COUNT(*) FROM t GROUP BY column2")
96+
97+
plan = df.execution_plan()
98+
plan.execute_collect(ctx)
99+
100+
results = plan.collect_metrics()
101+
assert len(results) >= 2
102+
for name, ms in results:
103+
assert isinstance(name, str)
104+
assert isinstance(ms, MetricsSet)
105+
106+
107+
def test_no_metrics_before_execution() -> None:
108+
ctx = SessionContext()
109+
ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
110+
df = ctx.sql("SELECT * FROM t")
111+
plan = df.execution_plan()
112+
ms = plan.metrics()
113+
assert ms is None or ms.output_rows is None or ms.output_rows == 0
114+
115+
116+
def test_metrics_repr() -> None:
117+
ctx = SessionContext()
118+
ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
119+
df = ctx.sql("SELECT * FROM t")
120+
121+
plan = df.execution_plan()
122+
plan.execute_collect(ctx)
123+
124+
for _, ms in plan.collect_metrics():
125+
r = repr(ms)
126+
assert isinstance(r, str)
127+
for metric in ms.metrics():
128+
mr = repr(metric)
129+
assert isinstance(mr, str)
130+
assert len(mr) > 0
131+
return
132+
pytest.skip("No metrics found")

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub mod errors;
4343
pub mod expr;
4444
#[allow(clippy::borrow_deref_ref)]
4545
mod functions;
46+
pub mod metrics;
4647
mod options;
4748
pub mod physical_plan;
4849
mod pyarrow_filter_expression;
@@ -96,6 +97,8 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
9697
m.add_class::<udtf::PyTableFunction>()?;
9798
m.add_class::<config::PyConfig>()?;
9899
m.add_class::<sql::logical::PyLogicalPlan>()?;
100+
m.add_class::<metrics::PyMetricsSet>()?;
101+
m.add_class::<metrics::PyMetric>()?;
99102
m.add_class::<physical_plan::PyExecutionPlan>()?;
100103
m.add_class::<record_batch::PyRecordBatch>()?;
101104
m.add_class::<record_batch::PyRecordBatchStream>()?;

0 commit comments

Comments
 (0)