Describe the bug
EXPLAIN ANALYZE reports operator metrics that are inflated by approximately the stage's partition count. The actual data flow and query results are correct — only the displayed metrics are wrong, which makes performance analysis misleading.
To Reproduce
Run any TPC-H query with EXPLAIN ANALYZE, e.g. Q3 against SF100 with --partitions 8:
EXPLAIN ANALYZE
select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority
from customer, orders, lineitem
where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey
and o_orderdate < date '1995-03-15' and l_shipdate > date '1995-03-15'
group by l_orderkey, o_orderdate, o_shippriority
order by revenue desc, o_orderdate
limit 10;
Look at stage 5's lineitem ShuffleReaderExec:
ShuffleReaderExec: partitioning: Hash([l_orderkey@0], 8), metrics=[output_rows=2.58 B, elapsed_compute=69.33s, ...]
The lineitem shuffle (stage 4) actually wrote 323M rows. The reader cannot have emitted 2.58 B rows — that is ~8 × 323M, exactly the partition count multiplier. Same pattern shows up on every stage: each ShuffleReaderExec and the operators above it report partition_count × the true row count.
I verified the actual row counts by adding a counter inside CoalescedShuffleReaderStream::poll_next that sums every batch returned to downstream operators. With --partitions 8 the counter sums to ~320M (correct), while EXPLAIN ANALYZE reports 2.58 B (inflated by 8).
The query results themselves are correct — the inflation is purely a display artifact of how stage metrics are aggregated.
Expected behavior
output_rows displayed in EXPLAIN ANALYZE should equal the actual rows the operator emitted, summed across that stage's tasks (so for an N-task stage where each task emits R rows, the metric should be N×R, not N²×R).
Suspected location
The aggregation in ballista/scheduler/src/state/execution_stage.rs::combine_metrics_set repeatedly pushes each new task's metric values into the existing aggregated MetricsSet and then calls aggregate_by_name(). I suspect the previously-aggregated value is being summed back in on every task update, producing the partition_count × inflation. Other Count and Time metrics on operators in the same stage (e.g. elapsed_compute, build_time, join_time) appear similarly inflated.
Additional context
- Reproduced against current
main with single-executor standalone cluster
- Inflation factor = stage's task count, consistent across
--partitions 2 / 4 / 8 / 16 / 32
- Final query results are unaffected — actual data flow through the operators is correct
Describe the bug
EXPLAIN ANALYZEreports operator metrics that are inflated by approximately the stage's partition count. The actual data flow and query results are correct — only the displayed metrics are wrong, which makes performance analysis misleading.To Reproduce
Run any TPC-H query with
EXPLAIN ANALYZE, e.g. Q3 against SF100 with--partitions 8:Look at stage 5's lineitem
ShuffleReaderExec:The lineitem shuffle (stage 4) actually wrote 323M rows. The reader cannot have emitted 2.58 B rows — that is
~8 × 323M, exactly the partition count multiplier. Same pattern shows up on every stage: eachShuffleReaderExecand the operators above it reportpartition_count ×the true row count.I verified the actual row counts by adding a counter inside
CoalescedShuffleReaderStream::poll_nextthat sums every batch returned to downstream operators. With--partitions 8the counter sums to ~320M (correct), while EXPLAIN ANALYZE reports 2.58 B (inflated by 8).The query results themselves are correct — the inflation is purely a display artifact of how stage metrics are aggregated.
Expected behavior
output_rowsdisplayed in EXPLAIN ANALYZE should equal the actual rows the operator emitted, summed across that stage's tasks (so for an N-task stage where each task emits R rows, the metric should be N×R, not N²×R).Suspected location
The aggregation in
ballista/scheduler/src/state/execution_stage.rs::combine_metrics_setrepeatedly pushes each new task's metric values into the existing aggregatedMetricsSetand then callsaggregate_by_name(). I suspect the previously-aggregated value is being summed back in on every task update, producing thepartition_count ×inflation. Other Count and Time metrics on operators in the same stage (e.g.elapsed_compute,build_time,join_time) appear similarly inflated.Additional context
mainwith single-executor standalone cluster--partitions 2 / 4 / 8 / 16 / 32