Skip to content

EXPLAIN ANALYZE row counts and elapsed_compute inflated by partition count #1635

@andygrove

Description

@andygrove

Describe the bug

EXPLAIN ANALYZE reports operator metrics that are inflated by approximately the stage's partition count. The actual data flow and query results are correct — only the displayed metrics are wrong, which makes performance analysis misleading.

To Reproduce

Run any TPC-H query with EXPLAIN ANALYZE, e.g. Q3 against SF100 with --partitions 8:

EXPLAIN ANALYZE
select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority
from customer, orders, lineitem
where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey
  and o_orderdate < date '1995-03-15' and l_shipdate > date '1995-03-15'
group by l_orderkey, o_orderdate, o_shippriority
order by revenue desc, o_orderdate
limit 10;

Look at stage 5's lineitem ShuffleReaderExec:

ShuffleReaderExec: partitioning: Hash([l_orderkey@0], 8), metrics=[output_rows=2.58 B, elapsed_compute=69.33s, ...]

The lineitem shuffle (stage 4) actually wrote 323M rows. The reader cannot have emitted 2.58 B rows — that is ~8 × 323M, exactly the partition count multiplier. Same pattern shows up on every stage: each ShuffleReaderExec and the operators above it report partition_count × the true row count.

I verified the actual row counts by adding a counter inside CoalescedShuffleReaderStream::poll_next that sums every batch returned to downstream operators. With --partitions 8 the counter sums to ~320M (correct), while EXPLAIN ANALYZE reports 2.58 B (inflated by 8).

The query results themselves are correct — the inflation is purely a display artifact of how stage metrics are aggregated.

Expected behavior

output_rows displayed in EXPLAIN ANALYZE should equal the actual rows the operator emitted, summed across that stage's tasks (so for an N-task stage where each task emits R rows, the metric should be N×R, not N²×R).

Suspected location

The aggregation in ballista/scheduler/src/state/execution_stage.rs::combine_metrics_set repeatedly pushes each new task's metric values into the existing aggregated MetricsSet and then calls aggregate_by_name(). I suspect the previously-aggregated value is being summed back in on every task update, producing the partition_count × inflation. Other Count and Time metrics on operators in the same stage (e.g. elapsed_compute, build_time, join_time) appear similarly inflated.

Additional context

  • Reproduced against current main with single-executor standalone cluster
  • Inflation factor = stage's task count, consistent across --partitions 2 / 4 / 8 / 16 / 32
  • Final query results are unaffected — actual data flow through the operators is correct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions