Skip to content

Commit a8623c2

Browse files
author
ShreyeshArangath
committed
merge and address comments
1 parent 30ec047 commit a8623c2

File tree

5 files changed

+170
-84
lines changed

5 files changed

+170
-84
lines changed

crates/core/src/metrics.rs

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::collections::HashMap;
1919
use std::sync::Arc;
2020

21-
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Metric};
21+
use datafusion::physical_plan::metrics::{MetricValue, MetricsSet, Metric, Timestamp};
2222
use pyo3::prelude::*;
2323

2424
#[pyclass(frozen, name = "MetricsSet", module = "datafusion")]
@@ -81,6 +81,32 @@ impl PyMetric {
8181
pub fn new(metric: Arc<Metric>) -> Self {
8282
Self { metric }
8383
}
84+
85+
fn timestamp_to_pyobject<'py>(
86+
py: Python<'py>,
87+
ts: &Timestamp,
88+
) -> PyResult<Option<Bound<'py, PyAny>>> {
89+
match ts.value() {
90+
Some(dt) => {
91+
let nanos = dt.timestamp_nanos_opt().ok_or_else(|| {
92+
PyErr::new::<pyo3::exceptions::PyOverflowError, _>(
93+
"timestamp out of range",
94+
)
95+
})?;
96+
let datetime_mod = py.import("datetime")?;
97+
let datetime_cls = datetime_mod.getattr("datetime")?;
98+
let tz_utc = datetime_mod.getattr("timezone")?.getattr("utc")?;
99+
let secs = nanos / 1_000_000_000;
100+
let micros = (nanos % 1_000_000_000) / 1_000;
101+
let result = datetime_cls.call_method1(
102+
"fromtimestamp",
103+
(secs as f64 + micros as f64 / 1_000_000.0, tz_utc),
104+
)?;
105+
Ok(Some(result))
106+
}
107+
None => Ok(None),
108+
}
109+
}
84110
}
85111

86112
#[pymethods]
@@ -90,62 +116,30 @@ impl PyMetric {
90116
self.metric.value().name().to_string()
91117
}
92118

93-
/// Returns the numeric value of this metric as a `usize`, or `None` when the
94-
/// value is not representable as an integer.
95-
///
96-
/// # Note
97-
/// `StartTimestamp` and `EndTimestamp` metrics are returned as nanoseconds
98-
/// since the Unix epoch (via `timestamp_nanos_opt`), which may overflow
99-
/// a `usize` on 32-bit platforms or return `None` if the timestamp is out
100-
/// of range. Non-numeric metric variants (unrecognised future variants)
101-
/// also return `None`.
102119
#[getter]
103-
fn value(&self) -> Option<usize> {
120+
fn value<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyAny>>> {
104121
match self.metric.value() {
105-
MetricValue::OutputRows(c) => Some(c.value()),
106-
MetricValue::OutputBytes(c) => Some(c.value()),
107-
MetricValue::ElapsedCompute(t) => Some(t.value()),
108-
MetricValue::SpillCount(c) => Some(c.value()),
109-
MetricValue::SpilledBytes(c) => Some(c.value()),
110-
MetricValue::SpilledRows(c) => Some(c.value()),
111-
MetricValue::CurrentMemoryUsage(g) => Some(g.value()),
112-
MetricValue::Count { count, .. } => Some(count.value()),
113-
MetricValue::Gauge { gauge, .. } => Some(gauge.value()),
114-
MetricValue::Time { time, .. } => Some(time.value()),
115-
MetricValue::StartTimestamp(ts) => {
116-
ts.value().and_then(|dt| dt.timestamp_nanos_opt().map(|n| n as usize))
117-
}
118-
MetricValue::EndTimestamp(ts) => {
119-
ts.value().and_then(|dt| dt.timestamp_nanos_opt().map(|n| n as usize))
122+
MetricValue::OutputRows(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
123+
MetricValue::OutputBytes(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
124+
MetricValue::ElapsedCompute(t) => Ok(Some(t.value().into_pyobject(py)?.into_any())),
125+
MetricValue::SpillCount(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
126+
MetricValue::SpilledBytes(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
127+
MetricValue::SpilledRows(c) => Ok(Some(c.value().into_pyobject(py)?.into_any())),
128+
MetricValue::CurrentMemoryUsage(g) => Ok(Some(g.value().into_pyobject(py)?.into_any())),
129+
MetricValue::Count { count, .. } => Ok(Some(count.value().into_pyobject(py)?.into_any())),
130+
MetricValue::Gauge { gauge, .. } => Ok(Some(gauge.value().into_pyobject(py)?.into_any())),
131+
MetricValue::Time { time, .. } => Ok(Some(time.value().into_pyobject(py)?.into_any())),
132+
MetricValue::StartTimestamp(ts) | MetricValue::EndTimestamp(ts) => {
133+
Self::timestamp_to_pyobject(py, ts)
120134
}
121-
_ => None,
135+
_ => Ok(None),
122136
}
123137
}
124138

125-
/// Returns the value as a Python `datetime` for `StartTimestamp` / `EndTimestamp`
126-
/// metrics, or `None` for all other metric types.
127139
fn value_as_datetime<'py>(&self, py: Python<'py>) -> PyResult<Option<Bound<'py, PyAny>>> {
128140
match self.metric.value() {
129141
MetricValue::StartTimestamp(ts) | MetricValue::EndTimestamp(ts) => {
130-
match ts.value() {
131-
Some(dt) => {
132-
let nanos = dt.timestamp_nanos_opt()
133-
.ok_or_else(|| PyErr::new::<pyo3::exceptions::PyOverflowError, _>(
134-
"timestamp out of range"
135-
))?;
136-
let datetime_mod = py.import("datetime")?;
137-
let datetime_cls = datetime_mod.getattr("datetime")?;
138-
let tz_utc = datetime_mod.getattr("timezone")?.getattr("utc")?;
139-
let secs = nanos / 1_000_000_000;
140-
let micros = (nanos % 1_000_000_000) / 1_000;
141-
let result = datetime_cls.call_method1(
142-
"fromtimestamp",
143-
(secs as f64 + micros as f64 / 1_000_000.0, tz_utc),
144-
)?;
145-
Ok(Some(result))
146-
}
147-
None => Ok(None),
148-
}
142+
Self::timestamp_to_pyobject(py, ts)
149143
}
150144
_ => Ok(None),
151145
}

crates/core/src/physical_plan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ impl PyExecutionPlan {
9797
Ok(Self::new(plan))
9898
}
9999

100-
/// Returns metrics for this plan node after execution, or None if unavailable.
101100
pub fn metrics(&self) -> Option<PyMetricsSet> {
102101
self.plan.metrics().map(PyMetricsSet::new)
103102
}

docs/source/user-guide/dataframe/execution-metrics.rst

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,27 @@ Execution is triggered by any of the terminal operations:
5353
- :py:meth:`~datafusion.DataFrame.collect`
5454
- :py:meth:`~datafusion.DataFrame.collect_partitioned`
5555
- :py:meth:`~datafusion.DataFrame.execute_stream`
56+
(metrics are available once the stream has been fully consumed)
5657
- :py:meth:`~datafusion.DataFrame.execute_stream_partitioned`
58+
(metrics are available once all partition streams have been fully consumed)
5759

5860
Calling :py:meth:`~datafusion.ExecutionPlan.collect_metrics` before execution
59-
will return entries with empty (or ``None``) metric sets because the operators
60-
have not run yet.
61+
returns an empty list or entries whose values are ``None`` / ``0``.
62+
63+
.. note::
64+
65+
**display() does not populate metrics.**
66+
When a DataFrame is displayed in a notebook (e.g. via ``display(df)`` or
67+
automatic ``repr`` output), DataFusion runs a *limited* internal execution
68+
to fetch preview rows. This internal execution does **not** cache the
69+
physical plan used, so :py:meth:`~datafusion.ExecutionPlan.collect_metrics`
70+
will not reflect the display execution. To access metrics you must call
71+
one of the terminal operations listed above.
72+
73+
If you call :py:meth:`~datafusion.DataFrame.collect` (or another terminal
74+
operation) multiple times on the same DataFrame, each call creates a fresh
75+
physical plan. Metrics from :py:meth:`~datafusion.DataFrame.execution_plan`
76+
always reflect the **most recent** execution.
6177

6278
Reading the Physical Plan Tree
6379
--------------------------------
@@ -72,6 +88,27 @@ The ``operator_name`` string returned by
7288
the node, for example ``"FilterExec: column1@0 > 1"``. This is the same string
7389
you would see when calling ``plan.display()``.
7490

91+
Aggregated vs Per-Partition Metrics
92+
------------------------------------
93+
94+
DataFusion executes each operator across one or more **partitions** in
95+
parallel. The :py:class:`~datafusion.MetricsSet` convenience properties
96+
(``output_rows``, ``elapsed_compute``, etc.) automatically **sum** the named
97+
metric across all partitions, giving a single aggregate value.
98+
99+
To inspect individual partitions — for example to detect data skew where one
100+
partition processes far more rows than others — iterate over the raw
101+
:py:class:`~datafusion.Metric` objects:
102+
103+
.. code-block:: python
104+
105+
for metric in metrics_set.metrics():
106+
print(f" partition={metric.partition} {metric.name}={metric.value}")
107+
108+
The ``partition`` property is a 0-based index (``0``, ``1``, …) identifying
109+
which parallel slot processed this metric. It is ``None`` for metrics that
110+
apply globally (not tied to a specific partition).
111+
75112
Available Metrics
76113
-----------------
77114

@@ -87,15 +124,19 @@ The following metrics are directly accessible as properties on
87124
* - ``output_rows``
88125
- Number of rows emitted by the operator (summed across partitions).
89126
* - ``elapsed_compute``
90-
- CPU time in nanoseconds spent inside the operator's execute loop
91-
(summed across partitions).
127+
- Wall-clock CPU time **in nanoseconds** spent inside the operator's
128+
compute loop, excluding I/O wait. Useful for identifying which
129+
operators are most expensive (summed across partitions).
92130
* - ``spill_count``
93-
- Number of spill-to-disk events due to memory pressure (summed across
131+
- Number of spill-to-disk events triggered by memory pressure. This is
132+
a unitless count of events, not a measure of data volume (summed across
94133
partitions).
95134
* - ``spilled_bytes``
96-
- Total bytes written to disk during spills (summed across partitions).
135+
- Total bytes written to disk during spill events (summed across
136+
partitions).
97137
* - ``spilled_rows``
98-
- Total rows written to disk during spills (summed across partitions).
138+
- Total rows written to disk during spill events (summed across
139+
partitions).
99140

100141
Any metric not listed above can be accessed via
101142
:py:meth:`~datafusion.MetricsSet.sum_by_name`, or by iterating over the raw
@@ -106,17 +147,24 @@ Labels
106147
------
107148

108149
A :py:class:`~datafusion.Metric` may carry *labels*: key/value pairs that
109-
provide additional context. For example, some operators tag their output
110-
metrics with an ``output_type`` label to distinguish between intermediate and
111-
final output:
150+
provide additional context. Labels are operator-specific; most metrics have
151+
an empty label dict.
152+
153+
Some operators tag their metrics with labels to distinguish variants. For
154+
example, a ``HashAggregateExec`` may record separate ``output_rows`` metrics
155+
for intermediate and final output:
112156

113157
.. code-block:: python
114158
115159
for metric in metrics_set.metrics():
116160
print(metric.name, metric.labels())
117161
# output_rows {'output_type': 'final'}
162+
# output_rows {'output_type': 'intermediate'}
118163
119-
Labels are operator-specific; most metrics have no labels.
164+
When summing by name (via :py:attr:`~datafusion.MetricsSet.output_rows` or
165+
:py:meth:`~datafusion.MetricsSet.sum_by_name`), **all** metrics with that
166+
name are summed regardless of labels. To filter by label, iterate over the
167+
raw :py:class:`~datafusion.Metric` objects directly.
120168

121169
End-to-End Example
122170
------------------

python/datafusion/plan.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,20 +287,23 @@ def name(self) -> str:
287287
return self._raw.name
288288

289289
@property
290-
def value(self) -> int | None:
291-
"""The numeric value of this metric, or ``None`` when not representable.
292-
293-
``None`` is returned for metric types whose value has not yet been set
294-
(e.g. ``StartTimestamp`` / ``EndTimestamp`` before the operator runs)
295-
and for any metric variant whose value cannot be expressed as an integer.
296-
Timestamp metrics, when available, are returned as nanoseconds since the
297-
Unix epoch.
290+
def value(self) -> int | datetime.datetime | None:
291+
"""The value of this metric.
292+
293+
Returns an ``int`` for counters, gauges, and time-based metrics
294+
(nanoseconds), a :py:class:`~datetime.datetime` (UTC) for
295+
``start_timestamp`` / ``end_timestamp`` metrics, or ``None``
296+
when the value has not been set or is not representable.
298297
"""
299298
return self._raw.value
300299

301300
@property
302301
def value_as_datetime(self) -> datetime.datetime | None:
303-
"""The value as a UTC datetime for timestamp metrics, or ``None``."""
302+
"""The value as a UTC :py:class:`~datetime.datetime` for timestamp metrics.
303+
304+
Returns ``None`` for all non-timestamp metrics and for timestamp
305+
metrics whose value has not been set (e.g. before execution).
306+
"""
304307
return self._raw.value_as_datetime()
305308

306309
@property

0 commit comments

Comments
 (0)