Skip to content

[WIP]feat(aqe): port Spark's OptimizeSkewedJoin#1779

Draft
wirybeaver wants to merge 4 commits into
apache:mainfrom
wirybeaver:optimize-skewed-join
Draft

[WIP]feat(aqe): port Spark's OptimizeSkewedJoin#1779
wirybeaver wants to merge 4 commits into
apache:mainfrom
wirybeaver:optimize-skewed-join

Conversation

@wirybeaver

@wirybeaver wirybeaver commented May 26, 2026

Copy link
Copy Markdown

Summary

Ports Spark's OptimizeSkewedJoin AQE rule into Ballista. When a shuffle partition is disproportionately large (skewed), the rule splits it into smaller sub-ranges and replicates the matching partition on the other side of the join, so the work that was a single straggler task becomes N parallel tasks.

Replaces the file-list sharding approach (#1718), which had to bail on any HashPartitioned / SinglePartition consumer — effectively all joins and FinalPartitioned aggregates.

Design

Spark Ballista
OptimizeSkewedJoin OptimizeSkewedJoinRule in state/aqe/optimizer_rule/
MapOutputStatistics ExchangeExec::shuffle_partitions() + partition_stats.num_bytes()
PartialReducerPartitionSpec(reducerIdx, [startMap, endMap)) SkewJoinShard { upstream_idx, start_map_idx, end_map_idx }
Cartesian pair-up across legs Pre-computed Vec<SkewJoinShard> of length N on both legs

Rule is opt-in: SET ballista.planner.skew_join.enabled = true. Defaults match Spark (factor=5.0, threshold=256 MiB, advisory=64 MiB).

Current limitations

  • v1 only handles single-join stages. Bails when a stage subtree has more than one binary join. Per-join iteration is future work.
  • Mutually exclusive with DataFusion's dynamic-filter pushdown. The skew rewrite breaks the hash-colocation invariant that dynamic filters assume. A guard enforces SET datafusion.optimizer.enable_dynamic_filter_pushdown = false.
  • Split granularity is per upstream task, not per row group. Single-file tables produce 1 upstream task per partition — nothing to subdivide. Tables must span multiple files. (Spark doesn't have this limitation because it splits large files into multiple scan tasks automatically.)

End-to-end verification

Setup

cd /home/user/optimize-skewed-join
cargo build --release -p ballista-scheduler -p ballista-executor -p ballista-cli

# Scheduler with AQE debug logging
RUST_LOG=info,ballista_scheduler::state::aqe=debug \
  ./target/release/ballista-scheduler --bind-port 50050 > /tmp/scheduler.log 2>&1 &
sleep 3

# Executor
RUST_LOG=info \
  ./target/release/ballista-executor \
    --scheduler-host localhost --scheduler-port 50050 \
    --bind-port 50051 --concurrent-tasks 4 > /tmp/executor.log 2>&1 &

Generate synthetic skewed data — 20 files, ~90% of rows have key=0:

mkdir -p /tmp/skewed_dir
for i in $(seq 0 19); do
  duckdb -c "
    COPY (
      SELECT (random() < 0.9)::int AS key, range AS val
      FROM range(50000)
    ) TO '/tmp/skewed_dir/part_${i}.parquet' (FORMAT parquet);
  "
done

duckdb -c "
  COPY (SELECT range AS key, range AS val FROM range(100))
  TO '/tmp/dim.parquet' (FORMAT parquet);
"

Note: ballista-cli must be invoked with --host localhost --port 50050. Without these flags it runs queries in-process and the scheduler is never contacted.

Baseline (rule OFF)

cat > /tmp/run-baseline.sql << 'EOSQL'
SET ballista.planner.adaptive.enabled = true;
SET ballista.planner.skew_join.enabled = false;
SET datafusion.optimizer.enable_dynamic_filter_pushdown = false;

CREATE EXTERNAL TABLE fact STORED AS PARQUET LOCATION '/tmp/skewed_dir/';
CREATE EXTERNAL TABLE dim  STORED AS PARQUET LOCATION '/tmp/dim.parquet';

SELECT dim.val, count(*) FROM fact JOIN dim ON fact.key = dim.key GROUP BY dim.val;
EOSQL

./target/release/ballista-cli --host localhost --port 50050 \
  -f /tmp/run-baseline.sql > /tmp/baseline.out 2>&1

Rewritten (rule ON)

cat > /tmp/run-rewritten.sql << 'EOSQL'
SET ballista.planner.adaptive.enabled = true;
SET ballista.planner.skew_join.enabled = true;
SET ballista.planner.skew_join.skewed_partition_factor = 1.5;
SET ballista.planner.skew_join.skewed_partition_threshold_bytes = 1024;
SET ballista.planner.skew_join.advisory_partition_bytes = 1024;
SET datafusion.optimizer.enable_dynamic_filter_pushdown = false;

CREATE EXTERNAL TABLE fact STORED AS PARQUET LOCATION '/tmp/skewed_dir/';
CREATE EXTERNAL TABLE dim  STORED AS PARQUET LOCATION '/tmp/dim.parquet';

SELECT dim.val, count(*) FROM fact JOIN dim ON fact.key = dim.key GROUP BY dim.val;
EOSQL

./target/release/ballista-cli --host localhost --port 50050 \
  -f /tmp/run-rewritten.sql > /tmp/rewritten.out 2>&1

Verify

# Results match
diff <(grep -E '^\|' /tmp/baseline.out | sort) \
     <(grep -E '^\|' /tmp/rewritten.out | sort)

# Rule attached
grep -a 'attaching SkewJoinPlan' /tmp/scheduler.log

Results

attaching SkewJoinPlan: M=16 K'=34
  left_skewed=[…, true, …, true, …]   (partitions 3 and 5)
  left_sizes=[…, 10817136, …, 1207824, …]
  right_skewed=all false               (dim table — 100 rows)
  • Correctness: rows identical between baseline and rewritten.
  • Skew detection: 2 of 16 partitions flagged (10.8 MB and 1.2 MB vs near-zero for the rest).
  • Split: 34 downstream join tasks instead of 16. Non-skewed side replicated for each sub-range.
  • Mutual-exclusion guard: verified separately — enable_dynamic_filter_pushdown=true triggers bail as expected.

Supersedes #1718.

🤖 Generated with Claude Code

wirybeaver and others added 4 commits May 26, 2026 22:43
Lays the foundation for porting Spark's OptimizeSkewedJoin AQE rule. This
commit adds only data — no rule, no plan-tree integration, no behavior
change. Subsequent commits wire the carrier into ShuffleReaderExec / the
adapter (C2) and add the rule itself (C3).

- `BallistaConfig` knobs (all opt-in via `skew_join.enabled=false`):
  - `skew_join.skewed_partition_factor` (5.0) — Spark's
    `skewedPartitionFactor`
  - `skew_join.skewed_partition_threshold_bytes` (256 MiB) — Spark's
    `skewedPartitionThresholdInBytes`
  - `skew_join.advisory_partition_bytes` (64 MiB) — target sub-shard size
    for the bin-packer; decoupled from `coalesce.target_partition_bytes`
    so the two rules tune independently
  - `skew_join.small_partition_factor` (0.2) — tail-merge factor from
    Spark's `splitSizeListByTargetSize` legacy
- `SessionConfigExt` accessor pairs for each knob (mirrors the existing
  coalesce-config accessor pattern).
- `SkewJoinPlan` / `SkewJoinShard` carrier types in shuffle_reader.rs,
  mirroring `CoalescePlan` / `PartitionGroup`. `SkewJoinShard` encodes
  Spark's `PartialReducerPartitionSpec(reducerIdx, startMapIdx,
  endMapIdx)` semantics. Plan attaches to both legs of a join's
  alignment group; `shards.len() == K'` matches across legs so the
  planner builds matched join inputs by zipping the two legs.

`cargo check --workspace` clean.
…rExec / adapter

Adds the plumbing layer that lets a future rule attach a per-stage skew-join
decision and have the adapter materialize it into a shuffle reader with
per-mapper sub-range slicing. Still no rule and no production behavior
change — `set_skew_join` is only called by the new unit tests.

- `ExchangeExec`: new `skew_join` slot (mirror of the existing `coalesce`
  slot). `set_skew_join` / `skew_join()` accessors, threaded through
  `with_new_children`, and rendered in EXPLAIN as `skew_join=K' of M`.
- `ShuffleReaderExec`: new `skew_join: Option<SkewJoinPlan>` field plus a
  `try_new_skew_join` constructor mirroring `try_new_coalesced`. Debug
  asserts on K'-shape invariants. Threaded through `with_work_dir`,
  `with_client_pool`, `with_new_children`, and EXPLAIN.
- `BallistaAdapter::transform_children`: extended from 2-arm to 4-arm
  match on `(coalesce, skew_join)`:
  - `(Some, Some)` → exec_err — mutually exclusive by construction
  - `(Some, None)` → existing coalesce path
  - `(None, Some(sp))` → slice each upstream's `Vec<PartitionLocation>`
    to the shard's `[start_map_idx, end_map_idx)` window, preserve hash
    partitioning width at K' (the eventual `is_skew_join` flag on the
    join op in C4 is what relaxes "same key in one partition")
  - `(None, None)` → unchanged

Tests in `adapter.rs::tests`:
- `adapter_slices_skew_join_shards_by_map_range`: builds a 3×4 synthetic
  upstream, splits upstream 0 two-way, upstream 1 four-way, leaves
  upstream 2 as passthrough; asserts K'=7 with the exact
  `(upstream_partition_id, map_partition_id)` pairs per output shard.
- `adapter_errors_when_both_coalesce_and_skew_join_set`: regression
  guard for the mutual-exclusion invariant.
- `adapter_guards_out_of_bounds_skew_join_indices`: out-of-bounds
  `upstream_idx` errors clearly; `end_map_idx > inner.len()` clamps;
  `start >= end` yields an empty shard.

`cargo check --workspace` clean; full AQE suite (52 tests) passes.
…te matching side

Ports Spark's OptimizeSkewedJoin into Ballista's AQE pipeline. When one side
of a binary join has a partition whose bytes exceed BOTH
factor * median(per-side sizes) AND an absolute threshold, the rule splits
that side's per-mapper byte vector into contiguous [start_map_idx, end_map_idx)
ranges (bin-packed near advisory_partition_bytes via the coalesce module's
split_size_list_by_target_size helper) and cartesian-pairs them against the
other side's ranges. Both sides' resulting paired SkewJoinPlans land on the
two leaf ExchangeExecs via the carrier slots from C2; the adapter consumes
them to build K'-partition ShuffleReaderExecs with per-mapper sub-range
slicing.

Apply Spark's per-join-type split-side allowlist (Inner: both, Left/LeftSemi/
LeftAnti/LeftMark: left only, Right: right only, Full: neither). v1 handles
exactly one binary join per stage subtree; multi-join stages bail (Spark
iterates per-join, a follow-up can do the same). Default off via
ballista.planner.skew_join.enabled.

Wired after CoalescePartitionsRule in planner.rs::actionable_stages() — the
two carriers are mutually exclusive, so running coalesce first lets skew-join
idempotently short-circuit on already-claimed leaves.

Algorithm helpers (skew_join/algorithm.rs): is_skewed dual-guard detection,
map_ranges_for_upstream bin-packer (reuses coalesce's helper), pair_shards
cartesian product, robust_median. 11 unit tests cover detection edges,
bin-pack output, and the four cartesian shapes (split×split,
split×passthrough, passthrough×split, passthrough×passthrough).

Integration tests (state/aqe/test/skew_join_rule.rs): 6 end-to-end cases
through AdaptivePlanner — single-side skew on SMJ and HashJoin(Partitioned),
disabled, below-threshold guard, uniform-bytes guard, and a final-stage
shuffle-reader check that K'=7 partitions flow through the adapter into
the runnable plan.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…ynamic-filter mutual exclusion

After investigating whether DataFusion's SortMergeJoinExec needs an
is_skew_join flag analogous to Spark's, the answer is: no flag is needed
on the join itself (DF's SMJ has no global per-key invariant the rewrite
violates). But two correctness gaps in the adapter / rule pipeline did
surface, and this commit closes both.

B-fix (adapter, always-on): the skew_join adapter arm now returns
Partitioning::UnknownPartitioning(K') instead of Hash(keys, K'). Splitting
a hash bucket across multiple downstream partitions means the same key now
lives in multiple partitions, so the data is not truly hash-partitioned at
K' — claiming Hash would mislead downstream operators (other joins, hash
aggregates, DataFusion's per-partition dynamic-filter routing) into trust
they shouldn't have. UnknownPartitioning is the honest declaration; within
the current stage the join above still works because each task receives a
properly-paired (left-shard, right-shard) bundle.

D-fix (rule, mutual exclusion): DataFusion 53.1's HashJoin dynamic filter
pushdown builds a CASE expression keyed on hash(join_keys) % K' that routes
each probe row to one partition's bounds. The skew rewrite intentionally
violates that hash-co-location invariant, so the routed CASE would filter
out probe rows whose matches live in a different partition → silent wrong
results. The default for optimizer.enable_join_dynamic_filter_pushdown in
DF 53.1 is true, so this is a today-problem, not future-proofing.

Picked mutual exclusion (option 3 of 4) for v1: when the DF option is on,
the rule refuses to fire with a clear log line. Users opt into one or the
other, not both. Alternatives — documentation only (rejected, DF default
is on), plan-mutation to clear dynamic filters (more invasive), and
upstream DF change to make per-partition CASE skew-aware (tracked at
~/mydocs/datafusion/aqe-tasks/11-skew-compatible-hash-join-dynamic-filter.md
as a follow-up) — are all documented in the rule's source comment.

Tests: existing five fire/non-fire skew_join tests pre-set
enable_join_dynamic_filter_pushdown=false (via a comment in
skew_join_context); one new test should_bail_when_dynamic_filter_pushdown_enabled
covers the mutual-exclusion bail path. The shuffle-reader integration test
now expects UnknownPartitioning(7) instead of Hash([c@1], 7), with a
docstring explaining why the rewrite must declare unknown partitioning.

All 70 AQE tests pass (63 prior + 6 from C3 + 1 new in C4).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@martin-g

martin-g commented May 27, 2026

Copy link
Copy Markdown
Member

@wirybeaver Please resolve the conflicts!
Update: Sorry, I didn't notice that it is still a draft!

@wirybeaver wirybeaver changed the title feat(aqe): port Spark's OptimizeSkewedJoin [WIP]feat(aqe): port Spark's OptimizeSkewedJoin May 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants