Skip to content

Commit 0a80c0c

Browse files
feat: initial commit add support for metrics in pydatafusion
1 parent 4cd5674 commit 0a80c0c

File tree

6 files changed

+411
-4
lines changed

6 files changed

+411
-4
lines changed

python/datafusion/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
from .expr import Expr, WindowFrame
5656
from .io import read_avro, read_csv, read_json, read_parquet
5757
from .options import CsvReadOptions
58-
from .plan import ExecutionPlan, LogicalPlan
58+
from .plan import ExecutionPlan, LogicalPlan, Metric, MetricsSet, collect_metrics
5959
from .record_batch import RecordBatch, RecordBatchStream
6060
from .user_defined import (
6161
Accumulator,
@@ -85,6 +85,8 @@
8585
"Expr",
8686
"InsertOp",
8787
"LogicalPlan",
88+
"Metric",
89+
"MetricsSet",
8890
"ParquetColumnOptions",
8991
"ParquetWriterOptions",
9092
"RecordBatch",
@@ -100,6 +102,7 @@
100102
"WindowUDF",
101103
"catalog",
102104
"col",
105+
"collect_metrics",
103106
"column",
104107
"common",
105108
"configure_formatter",

python/datafusion/plan.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
__all__ = [
3030
"ExecutionPlan",
3131
"LogicalPlan",
32+
"Metric",
33+
"MetricsSet",
34+
"collect_metrics",
3235
]
3336

3437

@@ -151,3 +154,130 @@ def to_proto(self) -> bytes:
151154
Tables created in memory from record batches are currently not supported.
152155
"""
153156
return self._raw_plan.to_proto()
157+
158+
def execute_collect(self, ctx: SessionContext) -> list[Any]:
159+
"""Execute this plan and collect results as a list of RecordBatches.
160+
161+
After calling this method, :meth:`metrics` will return populated metrics
162+
for this plan and its children.
163+
164+
Args:
165+
ctx: The session context to use for execution.
166+
"""
167+
return self._raw_plan.execute_collect(ctx.ctx)
168+
169+
def metrics(self) -> MetricsSet | None:
170+
"""Return the metrics for this plan node, or None if not available.
171+
172+
Metrics are only available after the plan has been executed via
173+
:meth:`execute_collect`.
174+
"""
175+
raw = self._raw_plan.metrics()
176+
if raw is None:
177+
return None
178+
return MetricsSet(raw)
179+
180+
181+
class MetricsSet:
182+
"""A set of metrics for a single execution plan operator.
183+
184+
Provides both individual metric access and convenience aggregations
185+
across partitions.
186+
"""
187+
188+
def __init__(self, raw: df_internal.MetricsSet) -> None:
189+
"""This constructor should not be called by the end user."""
190+
self._raw = raw
191+
192+
def metrics(self) -> list[Metric]:
193+
"""Return all individual metrics in this set."""
194+
return [Metric(m) for m in self._raw.metrics()]
195+
196+
@property
197+
def output_rows(self) -> int | None:
198+
"""Sum of output_rows across all partitions."""
199+
return self._raw.output_rows()
200+
201+
@property
202+
def elapsed_compute(self) -> int | None:
203+
"""Sum of elapsed_compute across all partitions, in nanoseconds."""
204+
return self._raw.elapsed_compute()
205+
206+
@property
207+
def spill_count(self) -> int | None:
208+
"""Sum of spill_count across all partitions."""
209+
return self._raw.spill_count()
210+
211+
@property
212+
def spilled_bytes(self) -> int | None:
213+
"""Sum of spilled_bytes across all partitions."""
214+
return self._raw.spilled_bytes()
215+
216+
@property
217+
def spilled_rows(self) -> int | None:
218+
"""Sum of spilled_rows across all partitions."""
219+
return self._raw.spilled_rows()
220+
221+
def sum_by_name(self, name: str) -> int | None:
222+
"""Return the sum of metrics matching the given name."""
223+
return self._raw.sum_by_name(name)
224+
225+
def __repr__(self) -> str:
226+
"""Return a string representation of the metrics set."""
227+
return repr(self._raw)
228+
229+
def __str__(self) -> str:
230+
"""Return a human-readable string of the metrics set."""
231+
return str(self._raw)
232+
233+
234+
class Metric:
235+
"""A single execution metric with name, value, partition, and labels."""
236+
237+
def __init__(self, raw: df_internal.Metric) -> None:
238+
"""This constructor should not be called by the end user."""
239+
self._raw = raw
240+
241+
@property
242+
def name(self) -> str:
243+
"""The name of this metric (e.g. ``output_rows``)."""
244+
return self._raw.name
245+
246+
@property
247+
def value(self) -> int | None:
248+
"""The numeric value of this metric, or None for non-numeric types."""
249+
return self._raw.value
250+
251+
@property
252+
def partition(self) -> int | None:
253+
"""The partition this metric applies to, or None if global."""
254+
return self._raw.partition
255+
256+
def labels(self) -> dict[str, str]:
257+
"""Return the labels associated with this metric."""
258+
return self._raw.labels()
259+
260+
def __repr__(self) -> str:
261+
"""Return a string representation of the metric."""
262+
return repr(self._raw)
263+
264+
265+
def collect_metrics(
266+
plan: ExecutionPlan,
267+
) -> list[tuple[str, MetricsSet]]:
268+
"""Walk an execution plan tree and collect metrics from all operators.
269+
270+
Returns a list of ``(operator_display_name, MetricsSet)`` tuples for every
271+
operator that has metrics available.
272+
"""
273+
result: list[tuple[str, MetricsSet]] = []
274+
275+
def _walk(node: ExecutionPlan) -> None:
276+
ms = node.metrics()
277+
if ms is not None:
278+
result.append((node.display(), ms))
279+
for child in node.children():
280+
_walk(child)
281+
282+
_walk(plan)
283+
return result

python/tests/test_plans.py

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,14 @@
1616
# under the License.
1717

1818
import pytest
19-
from datafusion import ExecutionPlan, LogicalPlan, SessionContext
19+
from datafusion import (
20+
ExecutionPlan,
21+
LogicalPlan,
22+
Metric,
23+
MetricsSet,
24+
SessionContext,
25+
collect_metrics,
26+
)
2027

2128

2229
# Note: We must use CSV because memory tables are currently not supported for
@@ -40,3 +47,97 @@ def test_logical_plan_to_proto(ctx, df) -> None:
4047
execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)
4148

4249
assert str(original_execution_plan) == str(execution_plan)
50+
51+
52+
def test_execution_plan_metrics() -> None:
53+
ctx = SessionContext()
54+
ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
55+
df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
56+
57+
plan = df.execution_plan()
58+
plan.execute_collect(ctx)
59+
60+
# Walk the tree to find a node with metrics
61+
found_metrics = False
62+
63+
def _check(node):
64+
nonlocal found_metrics
65+
ms = node.metrics()
66+
if ms is not None and ms.output_rows is not None and ms.output_rows > 0:
67+
found_metrics = True
68+
for child in node.children():
69+
_check(child)
70+
71+
_check(plan)
72+
assert found_metrics, "Expected at least one operator with output_rows > 0"
73+
74+
75+
def test_metric_properties() -> None:
76+
ctx = SessionContext()
77+
ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
78+
df = ctx.sql("SELECT * FROM t WHERE column1 > 1")
79+
80+
plan = df.execution_plan()
81+
plan.execute_collect(ctx)
82+
83+
for _, ms in collect_metrics(plan):
84+
for metric in ms.metrics():
85+
assert isinstance(metric, Metric)
86+
assert isinstance(metric.name, str)
87+
assert len(metric.name) > 0
88+
# partition is either None or an int
89+
assert metric.partition is None or isinstance(metric.partition, int)
90+
assert isinstance(metric.labels(), dict)
91+
return # only need to check one
92+
pytest.skip("No metrics found on any operator")
93+
94+
95+
def test_metrics_tree_walk() -> None:
96+
ctx = SessionContext()
97+
ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'a'), (4, 'b')")
98+
df = ctx.sql("SELECT column2, COUNT(*) FROM t GROUP BY column2")
99+
100+
plan = df.execution_plan()
101+
plan.execute_collect(ctx)
102+
103+
results = collect_metrics(plan)
104+
# An aggregation query should have multiple operators with metrics
105+
assert len(results) >= 2, (
106+
f"Expected multiple operators with metrics, got {len(results)}: "
107+
f"{[name for name, _ in results]}"
108+
)
109+
for name, ms in results:
110+
assert isinstance(name, str)
111+
assert isinstance(ms, MetricsSet)
112+
113+
114+
def test_no_metrics_before_execution() -> None:
115+
ctx = SessionContext()
116+
ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
117+
df = ctx.sql("SELECT * FROM t")
118+
# Get execution plan WITHOUT executing
119+
plan = df.execution_plan()
120+
# Before execution, metrics should be None or have zero rows
121+
ms = plan.metrics()
122+
if ms is not None:
123+
rows = ms.output_rows
124+
assert rows is None or rows == 0
125+
126+
127+
def test_metrics_repr() -> None:
128+
ctx = SessionContext()
129+
ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
130+
df = ctx.sql("SELECT * FROM t")
131+
132+
plan = df.execution_plan()
133+
plan.execute_collect(ctx)
134+
135+
for _, ms in collect_metrics(plan):
136+
r = repr(ms)
137+
assert isinstance(r, str)
138+
for metric in ms.metrics():
139+
mr = repr(metric)
140+
assert isinstance(mr, str)
141+
assert len(mr) > 0
142+
return
143+
pytest.skip("No metrics found on any operator")

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub mod errors;
4343
pub mod expr;
4444
#[allow(clippy::borrow_deref_ref)]
4545
mod functions;
46+
pub mod metrics;
4647
mod options;
4748
pub mod physical_plan;
4849
mod pyarrow_filter_expression;
@@ -96,6 +97,8 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> {
9697
m.add_class::<udtf::PyTableFunction>()?;
9798
m.add_class::<config::PyConfig>()?;
9899
m.add_class::<sql::logical::PyLogicalPlan>()?;
100+
m.add_class::<metrics::PyMetricsSet>()?;
101+
m.add_class::<metrics::PyMetric>()?;
99102
m.add_class::<physical_plan::PyExecutionPlan>()?;
100103
m.add_class::<record_batch::PyRecordBatch>()?;
101104
m.add_class::<record_batch::PyRecordBatchStream>()?;

0 commit comments

Comments
 (0)