From 3dfcdd43ad191484a4e5bb1f79125111362a6543 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Tue, 26 May 2026 22:43:16 +0000 Subject: [PATCH 1/4] feat(aqe): config knobs and carrier types for OptimizeSkewedJoin port MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lays the foundation for porting Spark's OptimizeSkewedJoin AQE rule. This commit adds only data — no rule, no plan-tree integration, no behavior change. Subsequent commits wire the carrier into ShuffleReaderExec / the adapter (C2) and add the rule itself (C3). - `BallistaConfig` knobs (all opt-in via `skew_join.enabled=false`): - `skew_join.skewed_partition_factor` (5.0) — Spark's `skewedPartitionFactor` - `skew_join.skewed_partition_threshold_bytes` (256 MiB) — Spark's `skewedPartitionThresholdInBytes` - `skew_join.advisory_partition_bytes` (64 MiB) — target sub-shard size for the bin-packer; decoupled from `coalesce.target_partition_bytes` so the two rules tune independently - `skew_join.small_partition_factor` (0.2) — tail-merge factor from Spark's `splitSizeListByTargetSize` legacy - `SessionConfigExt` accessor pairs for each knob (mirrors the existing coalesce-config accessor pattern). - `SkewJoinPlan` / `SkewJoinShard` carrier types in shuffle_reader.rs, mirroring `CoalescePlan` / `PartitionGroup`. `SkewJoinShard` encodes Spark's `PartialReducerPartitionSpec(reducerIdx, startMapIdx, endMapIdx)` semantics. Plan attaches to both legs of a join's alignment group; `shards.len() == K'` matches across legs so the planner builds matched join inputs by zipping the two legs. `cargo check --workspace` clean. --- ballista/core/src/config.rs | 95 ++++++++++++ ballista/core/src/execution_plans/mod.rs | 4 +- .../src/execution_plans/shuffle_reader.rs | 38 +++++ ballista/core/src/extension.rs | 139 +++++++++++++++++- 4 files changed, 274 insertions(+), 2 deletions(-) diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index fbd0de7082..b8bc56f301 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -101,6 +101,31 @@ pub const BALLISTA_COALESCE_SMALL_PARTITION_FACTOR: &str = pub const BALLISTA_COALESCE_MERGED_PARTITION_FACTOR: &str = "ballista.planner.coalesce.merged_partition_factor"; +/// Configuration key to enable AQE optimize-skewed-join rule (Spark's +/// `OptimizeSkewedJoin`). Disabled by default — opt in for workloads where +/// per-key skew creates straggler tasks on join inputs. +pub const BALLISTA_SKEW_JOIN_ENABLED: &str = "ballista.planner.skew_join.enabled"; +/// Multiplier over the per-side median below which a partition is not +/// considered skewed. Mirrors Spark's +/// `spark.sql.adaptive.skewJoin.skewedPartitionFactor`. +pub const BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR: &str = + "ballista.planner.skew_join.skewed_partition_factor"; +/// Absolute byte threshold below which a partition is never split, even when +/// the ratio-to-median qualifies. Mirrors Spark's +/// `spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes`. +pub const BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES: &str = + "ballista.planner.skew_join.skewed_partition_threshold_bytes"; +/// Advisory target sub-shard byte size used by the bin-packer when splitting +/// a skewed partition's per-mapper outputs. Decoupled from the coalesce +/// target so the two rules can be tuned independently. +pub const BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES: &str = + "ballista.planner.skew_join.advisory_partition_bytes"; +/// Tail-merge factor: a trailing sub-shard whose size is below +/// `small_partition_factor * advisory` folds back into its predecessor +/// (Spark's `splitSizeListByTargetSize` legacy). +pub const BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR: &str = + "ballista.planner.skew_join.small_partition_factor"; + /// Result type for configuration parsing operations. pub type ParseResult = result::Result; use std::sync::LazyLock; @@ -220,6 +245,47 @@ static CONFIG_ENTRIES: LazyLock> = LazyLock::new(|| DataType::Float64, Some("1.2".to_string()), ), + ConfigEntry::new( + BALLISTA_SKEW_JOIN_ENABLED.to_string(), + "Enables the AQE optimize-skewed-join rule (Spark's \ + OptimizeSkewedJoin). Disabled by default — opt in for workloads \ + where per-key skew creates straggler tasks on join inputs." + .to_string(), + DataType::Boolean, + Some(false.to_string()), + ), + ConfigEntry::new( + BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR.to_string(), + "Multiplier over per-side median below which a partition is not \ + considered skewed (Spark's skewedPartitionFactor)." + .to_string(), + DataType::Float64, + Some("5.0".to_string()), + ), + ConfigEntry::new( + BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES.to_string(), + "Absolute byte threshold below which a partition is never split \ + (Spark's skewedPartitionThresholdInBytes)." + .to_string(), + DataType::UInt64, + Some((256 * 1024 * 1024_usize).to_string()), + ), + ConfigEntry::new( + BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES.to_string(), + "Advisory target sub-shard byte size used by the bin-packer when \ + splitting a skewed partition." + .to_string(), + DataType::UInt64, + Some((64 * 1024 * 1024_usize).to_string()), + ), + ConfigEntry::new( + BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR.to_string(), + "Tail-merge factor for the skew-join sub-shard bin-packer \ + (Spark legacy)." + .to_string(), + DataType::Float64, + Some("0.2".to_string()), + ), ]; entries .into_iter() @@ -450,6 +516,35 @@ impl BallistaConfig { self.get_float_setting(BALLISTA_COALESCE_MERGED_PARTITION_FACTOR) } + /// Returns whether the AQE optimize-skewed-join rule is enabled. + pub fn skew_join_enabled(&self) -> bool { + self.get_bool_setting(BALLISTA_SKEW_JOIN_ENABLED) + } + + /// Returns the skewed-partition factor (Spark's + /// `skewedPartitionFactor`). + pub fn skew_join_skewed_partition_factor(&self) -> f64 { + self.get_float_setting(BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR) + } + + /// Returns the absolute skewed-partition byte threshold + /// (Spark's `skewedPartitionThresholdInBytes`). + pub fn skew_join_skewed_partition_threshold_bytes(&self) -> u64 { + self.get_usize_setting(BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES) + as u64 + } + + /// Returns the advisory target sub-shard byte size used by the bin-packer + /// when splitting a skewed partition. + pub fn skew_join_advisory_partition_bytes(&self) -> u64 { + self.get_usize_setting(BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES) as u64 + } + + /// Returns the tail-merge factor for the skew-join sub-shard bin-packer. + pub fn skew_join_small_partition_factor(&self) -> f64 { + self.get_float_setting(BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR) + } + /// Should client employ pull or push job tracking strategy pub fn client_pull(&self) -> bool { self.get_bool_setting(BALLISTA_CLIENT_PULL) diff --git a/ballista/core/src/execution_plans/mod.rs b/ballista/core/src/execution_plans/mod.rs index ae46fad687..8e31fe41d9 100644 --- a/ballista/core/src/execution_plans/mod.rs +++ b/ballista/core/src/execution_plans/mod.rs @@ -31,7 +31,9 @@ use std::path::{Path, PathBuf}; use datafusion::common::exec_err; pub use distributed_explain_analyze::DistributedExplainAnalyzeExec; pub use distributed_query::DistributedQueryExec; -pub use shuffle_reader::{CoalescePlan, PartitionGroup, ShuffleReaderExec}; +pub use shuffle_reader::{ + CoalescePlan, PartitionGroup, ShuffleReaderExec, SkewJoinPlan, SkewJoinShard, +}; pub use shuffle_reader::{stats_for_partition, stats_for_partitions}; pub use shuffle_writer::DEFAULT_SHUFFLE_CHANNEL_CAPACITY; pub use shuffle_writer::ShuffleWriterExec; diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index fff5783503..fc8b4fbae1 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -93,6 +93,44 @@ pub struct PartitionGroup { pub upstream_indices: Vec, } +/// Skew-join plan attached to the two leaf `ShuffleReaderExec`s of a join +/// stage by the AQE `OptimizeSkewedJoinRule`. +/// +/// Spark analog: the rewrite that replaces both legs' `AQEShuffleReadExec` +/// with a paired list of `PartialReducerPartitionSpec`s. K' (the post-rewrite +/// partition count) equals `shards.len()` and matches across legs by +/// construction — the planner builds matched join inputs by zipping the two +/// legs' shard lists. +/// +/// Note: `Default` is intentionally NOT derived. Callers must construct +/// explicitly to keep "absent skew-join rewrite" (`Option::None`) semantically +/// distinct from "empty plan". +#[derive(Debug, Clone, PartialEq)] +pub struct SkewJoinPlan { + /// Original upstream partition count (M) before the rewrite. Same value + /// on both legs of the join (alignment invariant; enforced by the rule). + pub upstream_partition_count: u32, + /// Per-output-partition shard descriptors. Length K' is the same on both + /// legs after rewrite. + pub shards: Vec, +} + +/// One output partition's read window for the skew-join rewrite. +/// +/// Spark analog: `PartialReducerPartitionSpec(reducerIndex, startMapIndex, +/// endMapIndex, dataSize)`. A non-split (passthrough) shard uses +/// `start_map_idx = 0` and `end_map_idx = num_maps_for_idx`, equivalent to +/// reading the full mapper-output range for that reducer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SkewJoinShard { + /// Upstream partition (= original reducer index `i`) this shard reads from. + pub upstream_idx: u32, + /// Inclusive lower bound of the upstream mapper-output index range to read. + pub start_map_idx: u32, + /// Exclusive upper bound of the upstream mapper-output index range to read. + pub end_map_idx: u32, +} + /// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec /// being executed by an executor #[derive(Debug, Clone)] diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index a9da57c786..95694bc3cc 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -21,7 +21,10 @@ use crate::config::{ BALLISTA_COALESCE_MERGED_PARTITION_FACTOR, BALLISTA_COALESCE_SMALL_PARTITION_FACTOR, BALLISTA_COALESCE_TARGET_PARTITION_BYTES, BALLISTA_JOB_NAME, BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ, BALLISTA_SHUFFLE_READER_MAX_REQUESTS, - BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, BALLISTA_STANDALONE_PARALLELISM, + BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT, BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES, + BALLISTA_SKEW_JOIN_ENABLED, BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR, + BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES, + BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR, BALLISTA_STANDALONE_PARALLELISM, BallistaConfig, }; use crate::planner::BallistaQueryPlanner; @@ -267,6 +270,37 @@ pub trait SessionConfigExt { fn ballista_coalesce_merged_partition_factor(&self) -> f64; /// Sets the merged-partition early-flush factor (Spark legacy). fn with_ballista_coalesce_merged_partition_factor(self, factor: f64) -> Self; + + /// Returns whether the AQE optimize-skewed-join rule is enabled. + fn ballista_skew_join_enabled(&self) -> bool; + /// Sets whether the AQE optimize-skewed-join rule is enabled. + fn with_ballista_skew_join_enabled(self, enabled: bool) -> Self; + + /// Returns the skewed-partition factor (Spark's + /// `skewedPartitionFactor`). + fn ballista_skew_join_skewed_partition_factor(&self) -> f64; + /// Sets the skewed-partition factor. + fn with_ballista_skew_join_skewed_partition_factor(self, factor: f64) -> Self; + + /// Returns the absolute skewed-partition byte threshold + /// (Spark's `skewedPartitionThresholdInBytes`). + fn ballista_skew_join_skewed_partition_threshold_bytes(&self) -> u64; + /// Sets the absolute skewed-partition byte threshold. + fn with_ballista_skew_join_skewed_partition_threshold_bytes( + self, + bytes: u64, + ) -> Self; + + /// Returns the advisory target sub-shard byte size used by the bin-packer + /// when splitting a skewed partition. + fn ballista_skew_join_advisory_partition_bytes(&self) -> u64; + /// Sets the advisory target sub-shard byte size. + fn with_ballista_skew_join_advisory_partition_bytes(self, bytes: u64) -> Self; + + /// Returns the tail-merge factor for the skew-join sub-shard bin-packer. + fn ballista_skew_join_small_partition_factor(&self) -> f64; + /// Sets the tail-merge factor for the skew-join sub-shard bin-packer. + fn with_ballista_skew_join_small_partition_factor(self, factor: f64) -> Self; } /// [SessionConfigHelperExt] is set of [SessionConfig] extension methods @@ -682,6 +716,109 @@ impl SessionConfigExt for SessionConfig { .set_str(BALLISTA_COALESCE_MERGED_PARTITION_FACTOR, &s) } } + + fn ballista_skew_join_enabled(&self) -> bool { + self.options() + .extensions + .get::() + .map(|c| c.skew_join_enabled()) + .unwrap_or_else(|| BallistaConfig::default().skew_join_enabled()) + } + + fn with_ballista_skew_join_enabled(self, enabled: bool) -> Self { + if self.options().extensions.get::().is_some() { + self.set_bool(BALLISTA_SKEW_JOIN_ENABLED, enabled) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_bool(BALLISTA_SKEW_JOIN_ENABLED, enabled) + } + } + + fn ballista_skew_join_skewed_partition_factor(&self) -> f64 { + self.options() + .extensions + .get::() + .map(|c| c.skew_join_skewed_partition_factor()) + .unwrap_or_else(|| { + BallistaConfig::default().skew_join_skewed_partition_factor() + }) + } + + fn with_ballista_skew_join_skewed_partition_factor(self, factor: f64) -> Self { + let s = factor.to_string(); + if self.options().extensions.get::().is_some() { + self.set_str(BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR, &s) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_str(BALLISTA_SKEW_JOIN_SKEWED_PARTITION_FACTOR, &s) + } + } + + fn ballista_skew_join_skewed_partition_threshold_bytes(&self) -> u64 { + self.options() + .extensions + .get::() + .map(|c| c.skew_join_skewed_partition_threshold_bytes()) + .unwrap_or_else(|| { + BallistaConfig::default().skew_join_skewed_partition_threshold_bytes() + }) + } + + fn with_ballista_skew_join_skewed_partition_threshold_bytes( + self, + bytes: u64, + ) -> Self { + if self.options().extensions.get::().is_some() { + self.set_usize( + BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES, + bytes as usize, + ) + } else { + self.with_option_extension(BallistaConfig::default()).set_usize( + BALLISTA_SKEW_JOIN_SKEWED_PARTITION_THRESHOLD_BYTES, + bytes as usize, + ) + } + } + + fn ballista_skew_join_advisory_partition_bytes(&self) -> u64 { + self.options() + .extensions + .get::() + .map(|c| c.skew_join_advisory_partition_bytes()) + .unwrap_or_else(|| { + BallistaConfig::default().skew_join_advisory_partition_bytes() + }) + } + + fn with_ballista_skew_join_advisory_partition_bytes(self, bytes: u64) -> Self { + if self.options().extensions.get::().is_some() { + self.set_usize(BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES, bytes as usize) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_usize(BALLISTA_SKEW_JOIN_ADVISORY_PARTITION_BYTES, bytes as usize) + } + } + + fn ballista_skew_join_small_partition_factor(&self) -> f64 { + self.options() + .extensions + .get::() + .map(|c| c.skew_join_small_partition_factor()) + .unwrap_or_else(|| { + BallistaConfig::default().skew_join_small_partition_factor() + }) + } + + fn with_ballista_skew_join_small_partition_factor(self, factor: f64) -> Self { + let s = factor.to_string(); + if self.options().extensions.get::().is_some() { + self.set_str(BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR, &s) + } else { + self.with_option_extension(BallistaConfig::default()) + .set_str(BALLISTA_SKEW_JOIN_SMALL_PARTITION_FACTOR, &s) + } + } } impl SessionConfigHelperExt for SessionConfig { From a70a1011a75cabb2717ef9fd7c6306e36ee51cb4 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Tue, 26 May 2026 22:55:22 +0000 Subject: [PATCH 2/4] feat(aqe): wire SkewJoinPlan carrier into ExchangeExec / ShuffleReaderExec / adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the plumbing layer that lets a future rule attach a per-stage skew-join decision and have the adapter materialize it into a shuffle reader with per-mapper sub-range slicing. Still no rule and no production behavior change — `set_skew_join` is only called by the new unit tests. - `ExchangeExec`: new `skew_join` slot (mirror of the existing `coalesce` slot). `set_skew_join` / `skew_join()` accessors, threaded through `with_new_children`, and rendered in EXPLAIN as `skew_join=K' of M`. - `ShuffleReaderExec`: new `skew_join: Option` field plus a `try_new_skew_join` constructor mirroring `try_new_coalesced`. Debug asserts on K'-shape invariants. Threaded through `with_work_dir`, `with_client_pool`, `with_new_children`, and EXPLAIN. - `BallistaAdapter::transform_children`: extended from 2-arm to 4-arm match on `(coalesce, skew_join)`: - `(Some, Some)` → exec_err — mutually exclusive by construction - `(Some, None)` → existing coalesce path - `(None, Some(sp))` → slice each upstream's `Vec` to the shard's `[start_map_idx, end_map_idx)` window, preserve hash partitioning width at K' (the eventual `is_skew_join` flag on the join op in C4 is what relaxes "same key in one partition") - `(None, None)` → unchanged Tests in `adapter.rs::tests`: - `adapter_slices_skew_join_shards_by_map_range`: builds a 3×4 synthetic upstream, splits upstream 0 two-way, upstream 1 four-way, leaves upstream 2 as passthrough; asserts K'=7 with the exact `(upstream_partition_id, map_partition_id)` pairs per output shard. - `adapter_errors_when_both_coalesce_and_skew_join_set`: regression guard for the mutual-exclusion invariant. - `adapter_guards_out_of_bounds_skew_join_indices`: out-of-bounds `upstream_idx` errors clearly; `end_map_idx > inner.len()` clamps; `start >= end` yields an empty shard. `cargo check --workspace` clean; full AQE suite (52 tests) passes. --- .../src/execution_plans/shuffle_reader.rs | 76 +++++ ballista/scheduler/src/state/aqe/adapter.rs | 293 +++++++++++++++++- .../scheduler/src/state/aqe/execution_plan.rs | 41 ++- 3 files changed, 406 insertions(+), 4 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index fc8b4fbae1..0cd5d67aeb 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -155,6 +155,14 @@ pub struct ShuffleReaderExec { /// is responsible for pre-concatenating the M-shape upstream /// `Vec>` into K-shape before invoking `try_new_coalesced`. pub coalesce: Option, + /// Optional skew-join metadata produced by `OptimizeSkewedJoinRule`. + /// `None` means the reader is not part of a skew-join rewrite. When `Some`, + /// `partition.len()` equals `skew_join.shards.len()` (= K', the post-rewrite + /// partition count); the rule/adapter is responsible for slicing each + /// upstream `Vec` to the corresponding shard's + /// `[start_map_idx, end_map_idx)` window before invoking `try_new_skew_join`. + /// Mutually exclusive with `coalesce` by construction. + pub skew_join: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, properties: Arc, @@ -185,6 +193,7 @@ impl ShuffleReaderExec { broadcast: false, upstream_partition_count, coalesce: None, + skew_join: None, metrics: ExecutionPlanMetricsSet::new(), properties, work_dir: None, // to be updated at the executor side @@ -215,6 +224,7 @@ impl ShuffleReaderExec { broadcast: true, upstream_partition_count, coalesce: None, + skew_join: None, metrics: ExecutionPlanMetricsSet::new(), properties, work_dir: None, // to be updated at the executor side @@ -266,6 +276,61 @@ impl ShuffleReaderExec { broadcast: false, upstream_partition_count, coalesce: Some(coalesce), + skew_join: None, + metrics: ExecutionPlanMetricsSet::new(), + properties, + work_dir: None, // to be updated at the executor side + client_pool: None, // to be updated at the executor side + }) + } + + /// Create a new skew-join ShuffleReaderExec. + /// + /// `partition` MUST be the K'-shape `Vec>` produced + /// by the adapter: each output index `idx` in `0..K'` holds the + /// PartitionLocations of `skew_join.shards[idx].upstream_idx` sliced to + /// the `[start_map_idx, end_map_idx)` window for that shard. Passthrough + /// (non-split) shards take the full upstream location list. + /// `partitioning.partition_count()` MUST equal `K'`. + /// + /// Spark analog: `ShuffledRowRDD` reading from a `PartialReducerPartitionSpec` + /// list — each downstream task fetches blocks only from the mapper indices + /// in its shard's window. + /// + /// In debug builds this constructor asserts the shape invariants to catch + /// rule/adapter-side mistakes early. + pub fn try_new_skew_join( + stage_id: usize, + partition: Vec>, + skew_join: SkewJoinPlan, + schema: SchemaRef, + partitioning: Partitioning, + ) -> Result { + debug_assert_eq!( + partition.len(), + skew_join.shards.len(), + "K'-shape partition vector length must equal skew_join.shards.len()", + ); + debug_assert_eq!( + partitioning.partition_count(), + skew_join.shards.len(), + "partitioning.partition_count() must equal skew_join.shards.len() (= K')", + ); + let upstream_partition_count = skew_join.upstream_partition_count as usize; + let properties = Arc::new(PlanProperties::new( + datafusion::physical_expr::EquivalenceProperties::new(schema.clone()), + partitioning, + datafusion::physical_plan::execution_plan::EmissionType::Incremental, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, + )); + Ok(Self { + stage_id, + schema, + partition, + broadcast: false, + upstream_partition_count, + coalesce: None, + skew_join: Some(skew_join), metrics: ExecutionPlanMetricsSet::new(), properties, work_dir: None, // to be updated at the executor side @@ -282,6 +347,7 @@ impl ShuffleReaderExec { broadcast: self.broadcast, upstream_partition_count: self.upstream_partition_count, coalesce: self.coalesce.clone(), + skew_join: self.skew_join.clone(), metrics: self.metrics.clone(), properties: self.properties.clone(), work_dir: Some(work_dir), @@ -297,6 +363,7 @@ impl ShuffleReaderExec { broadcast: self.broadcast, upstream_partition_count: self.upstream_partition_count, coalesce: self.coalesce.clone(), + skew_join: self.skew_join.clone(), metrics: self.metrics.clone(), properties: self.properties.clone(), work_dir: self.work_dir.clone(), @@ -333,6 +400,14 @@ impl DisplayAs for ShuffleReaderExec { c.upstream_partition_count, )?; } + if let Some(sj) = &self.skew_join { + write!( + f, + ", skew_join: {} of {}", + sj.shards.len(), + sj.upstream_partition_count, + )?; + } Ok(()) } } @@ -375,6 +450,7 @@ impl ExecutionPlan for ShuffleReaderExec { broadcast: self.broadcast, upstream_partition_count: self.upstream_partition_count, coalesce: self.coalesce.clone(), + skew_join: self.skew_join.clone(), metrics: ExecutionPlanMetricsSet::new(), properties: self.properties.clone(), work_dir: self.work_dir.clone(), diff --git a/ballista/scheduler/src/state/aqe/adapter.rs b/ballista/scheduler/src/state/aqe/adapter.rs index dae2a2944e..75f78054a4 100644 --- a/ballista/scheduler/src/state/aqe/adapter.rs +++ b/ballista/scheduler/src/state/aqe/adapter.rs @@ -19,6 +19,7 @@ use crate::planner::create_shuffle_writer_with_config; use crate::state::aqe::execution_plan::{AdaptiveDatafusionExec, ExchangeExec}; use crate::state::aqe::planner::AdaptiveStageInfo; use ballista_core::execution_plans::ShuffleReaderExec; +use ballista_core::serde::scheduler::PartitionLocation; use datafusion::common::exec_err; use datafusion::config::ConfigOptions; use datafusion::error::DataFusionError; @@ -59,8 +60,17 @@ impl BallistaAdapter { self.inputs.push(stage_id); let partitioning = exchange.properties().partitioning.clone(); - let reader = match exchange.coalesce() { - Some(cp) => { + let reader = match (exchange.coalesce(), exchange.skew_join()) { + (Some(_), Some(_)) => { + // Mutually exclusive by construction — the rules + // short-circuit when the other slot is already set. If + // we ever see both, that's a rule bug; surface it loudly. + return exec_err!( + "ExchangeExec has both coalesce and skew_join decisions set; \ + these are mutually exclusive" + ); + } + (Some(cp), None) => { // Concatenate M-shape locations into K-shape per CoalescePlan.groups. let k_shape: Vec> = cp .groups @@ -89,7 +99,52 @@ impl BallistaAdapter { new_partitioning, )? } - None => ShuffleReaderExec::try_new( + (None, Some(sp)) => { + // One entry per shard; slice each upstream partition's + // PartitionLocation list to the shard's [start_map_idx, + // end_map_idx) window. Passthrough shards (full mapper + // range) take the whole inner Vec. + let mut k_shape: Vec> = + Vec::with_capacity(sp.shards.len()); + for shard in &sp.shards { + let inner = partitions + .get(shard.upstream_idx as usize) + .ok_or_else(|| { + DataFusionError::Execution(format!( + "SkewJoinPlan references upstream_idx {} but \ + only {} upstream partitions exist", + shard.upstream_idx, + partitions.len(), + )) + })?; + let start = shard.start_map_idx as usize; + let end = (shard.end_map_idx as usize).min(inner.len()); + let assigned: Vec<_> = if start < end { + inner[start..end].to_vec() + } else { + Vec::new() + }; + k_shape.push(assigned); + } + // Preserve the hash partitioning width at K'; the + // OptimizeSkewedJoinRule's join-side fix (`is_skew_join` + // in C4) is what relaxes the "same key in one partition" + // invariant downstream. + let new_partitioning = match &partitioning { + Partitioning::Hash(keys, _m) => { + Partitioning::Hash(keys.clone(), sp.shards.len()) + } + _ => Partitioning::UnknownPartitioning(sp.shards.len()), + }; + ShuffleReaderExec::try_new_skew_join( + stage_id, + k_shape, + (*sp).clone(), + schema, + new_partitioning, + )? + } + (None, None) => ShuffleReaderExec::try_new( stage_id, partitions, schema, @@ -167,3 +222,235 @@ impl BallistaAdapter { } } } + +#[cfg(test)] +mod tests { + use super::*; + use ballista_core::execution_plans::{ + ShuffleReaderExec, SkewJoinPlan, SkewJoinShard, + }; + use ballista_core::serde::scheduler::{ + ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, + PartitionId, PartitionLocation, PartitionStats, + }; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::empty::EmptyExec; + use parking_lot::Mutex; + use std::sync::atomic::AtomicI64; + + // Build a synthetic PartitionLocation tagged with (upstream_partition, + // map_partition) so the assertions below can verify which mapper output + // each shard ended up reading. + fn synthetic_loc(partition_id: usize, map_partition_id: usize) -> PartitionLocation { + PartitionLocation { + map_partition_id, + partition_id: PartitionId { + job_id: "test_job".to_string(), + stage_id: 0, + partition_id, + }, + executor_meta: ExecutorMetadata { + id: "".to_string(), + host: "".to_string(), + port: 0, + grpc_port: 0, + specification: ExecutorSpecification::default().with_task_slots(0), + os_info: ExecutorOperatingSystemSpecification::default(), + }, + partition_stats: PartitionStats::new(Some(0), None, Some(0)), + file_id: None, + is_sort_shuffle: false, + } + } + + // Builds an `ExchangeExec` whose `shuffle_partitions` is already resolved + // to `partitions` and whose `stage_id` is set. The input is a trivial + // `EmptyExec` because the adapter never descends past the Exchange itself + // when transforming a leaf shuffle. + fn synthetic_exchange( + partitions: Vec>, + m: usize, + ) -> Arc { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let empty = Arc::new(EmptyExec::new(schema)); + let stage_id = Arc::new(AtomicI64::new(7)); + let shuffle_parts = Arc::new(Mutex::new(Some(partitions))); + let exchange = ExchangeExec::new_with_details( + empty, + Some(Partitioning::Hash(vec![], m)), + 1, + stage_id, + shuffle_parts, + ); + Arc::new(exchange) + } + + /// The adapter, when given an Exchange with a `SkewJoinPlan` attached, + /// produces a `ShuffleReaderExec` whose `partition` vector has one entry + /// per shard, each holding the per-mapper slice + /// `[start_map_idx, end_map_idx)` of the named upstream partition. + #[test] + fn adapter_slices_skew_join_shards_by_map_range() { + // 3 upstream partitions, each with 4 mapper outputs. The + // `map_partition_id` tag is `[0..4)` within each upstream so the + // assertions below can recover which mapper a shard read. + let partitions: Vec> = (0..3) + .map(|p| (0..4).map(|m| synthetic_loc(p, m)).collect()) + .collect(); + + // Mix of split shapes: + // upstream 0 → 2-way split ([0..2), [2..4)) + // upstream 1 → 4-way split ([0..1), [1..2), [2..3), [3..4)) + // upstream 2 → passthrough ([0..4)) + // K' = 2 + 4 + 1 = 7. + let skew_join = Arc::new(SkewJoinPlan { + upstream_partition_count: 3, + shards: vec![ + SkewJoinShard { upstream_idx: 0, start_map_idx: 0, end_map_idx: 2 }, + SkewJoinShard { upstream_idx: 0, start_map_idx: 2, end_map_idx: 4 }, + SkewJoinShard { upstream_idx: 1, start_map_idx: 0, end_map_idx: 1 }, + SkewJoinShard { upstream_idx: 1, start_map_idx: 1, end_map_idx: 2 }, + SkewJoinShard { upstream_idx: 1, start_map_idx: 2, end_map_idx: 3 }, + SkewJoinShard { upstream_idx: 1, start_map_idx: 3, end_map_idx: 4 }, + SkewJoinShard { upstream_idx: 2, start_map_idx: 0, end_map_idx: 4 }, + ], + }); + + let exchange = synthetic_exchange(partitions, 3); + exchange.set_skew_join(skew_join.clone()); + + let mut adapter = BallistaAdapter::default(); + let transformed = adapter + .transform_children(exchange as Arc) + .expect("transform_children should succeed"); + let reader_arc = transformed.data; + let reader = reader_arc + .as_any() + .downcast_ref::() + .expect("transform_children must produce ShuffleReaderExec"); + + assert!(reader.skew_join.is_some(), "skew_join slot must be threaded"); + assert!(reader.coalesce.is_none(), "coalesce slot must be empty"); + + // K' = 7 — one output partition per shard. + assert_eq!(reader.partition.len(), 7); + assert_eq!(reader.properties().partitioning.partition_count(), 7); + + // Each output partition's (upstream_partition_id, map_partition_id) + // pairs must match the requested [start_map_idx, end_map_idx) slice + // of the named upstream. + let actual: Vec> = reader + .partition + .iter() + .map(|inner| { + inner + .iter() + .map(|l| (l.partition_id.partition_id, l.map_partition_id)) + .collect() + }) + .collect(); + let expected: Vec> = vec![ + vec![(0, 0), (0, 1)], // upstream 0 [0..2) + vec![(0, 2), (0, 3)], // upstream 0 [2..4) + vec![(1, 0)], // upstream 1 [0..1) + vec![(1, 1)], // upstream 1 [1..2) + vec![(1, 2)], // upstream 1 [2..3) + vec![(1, 3)], // upstream 1 [3..4) + vec![(2, 0), (2, 1), (2, 2), (2, 3)], // upstream 2 [0..4) + ]; + assert_eq!(actual, expected); + } + + /// When both slots are set the adapter must error rather than silently + /// picking one — this catches rule bugs (the rules themselves + /// short-circuit when the other slot is set, so a violation here means + /// something went wrong upstream of the adapter). + #[test] + fn adapter_errors_when_both_coalesce_and_skew_join_set() { + use ballista_core::execution_plans::{CoalescePlan, PartitionGroup}; + + let partitions = vec![vec![synthetic_loc(0, 0)]]; + let exchange = synthetic_exchange(partitions, 1); + exchange.set_coalesce(Arc::new(CoalescePlan { + upstream_partition_count: 1, + groups: vec![PartitionGroup { upstream_indices: vec![0] }], + })); + exchange.set_skew_join(Arc::new(SkewJoinPlan { + upstream_partition_count: 1, + shards: vec![SkewJoinShard { + upstream_idx: 0, + start_map_idx: 0, + end_map_idx: 1, + }], + })); + + let mut adapter = BallistaAdapter::default(); + let err = adapter + .transform_children(exchange as Arc) + .expect_err("must reject both-slots-set"); + let msg = err.to_string(); + assert!( + msg.contains("mutually exclusive"), + "error should explain the mutual-exclusion invariant, got: {msg}" + ); + } + + /// Out-of-bounds `start_map_idx` clamps to an empty shard rather than + /// panicking; out-of-bounds `upstream_idx` returns a clear error. + #[test] + fn adapter_guards_out_of_bounds_skew_join_indices() { + // upstream_idx beyond the M=2 partitions → error. + let partitions: Vec> = (0..2) + .map(|p| vec![synthetic_loc(p, 0), synthetic_loc(p, 1)]) + .collect(); + let exchange = synthetic_exchange(partitions, 2); + exchange.set_skew_join(Arc::new(SkewJoinPlan { + upstream_partition_count: 2, + shards: vec![SkewJoinShard { + upstream_idx: 5, // > M-1 + start_map_idx: 0, + end_map_idx: 1, + }], + })); + let mut adapter = BallistaAdapter::default(); + let err = adapter + .transform_children(exchange as Arc) + .expect_err("must reject upstream_idx > M-1"); + assert!( + err.to_string().contains("upstream_idx 5"), + "error should name the offending idx, got: {err}" + ); + + // end_map_idx past inner.len() clamps; start past end yields an + // empty shard — both safe. + let partitions: Vec> = + vec![vec![synthetic_loc(0, 0), synthetic_loc(0, 1)]]; + let exchange = synthetic_exchange(partitions, 1); + exchange.set_skew_join(Arc::new(SkewJoinPlan { + upstream_partition_count: 1, + shards: vec![ + SkewJoinShard { + upstream_idx: 0, + start_map_idx: 0, + end_map_idx: 99, // > inner.len()=2 + }, + SkewJoinShard { + upstream_idx: 0, + start_map_idx: 5, // > end + end_map_idx: 5, + }, + ], + })); + let mut adapter = BallistaAdapter::default(); + let transformed = adapter + .transform_children(exchange as Arc) + .expect("clamping path should not error"); + let reader = transformed + .data + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(reader.partition[0].len(), 2, "end clamps to inner.len()"); + assert_eq!(reader.partition[1].len(), 0, "start>=end yields empty shard"); + } +} diff --git a/ballista/scheduler/src/state/aqe/execution_plan.rs b/ballista/scheduler/src/state/aqe/execution_plan.rs index ac37328f61..a98e077c34 100644 --- a/ballista/scheduler/src/state/aqe/execution_plan.rs +++ b/ballista/scheduler/src/state/aqe/execution_plan.rs @@ -34,7 +34,7 @@ //! shuffle metadata. use ballista_core::execution_plans::{ - CoalescePlan, stats_for_partition, stats_for_partitions, + CoalescePlan, SkewJoinPlan, stats_for_partition, stats_for_partitions, }; use ballista_core::serde::scheduler::PartitionLocation; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -96,6 +96,20 @@ pub struct ExchangeExec { /// transform-rebuilt parent chains. Same pattern as `shuffle_partitions`. coalesce: Arc>>>, + /// Per-stage skew-join decision attached to this Exchange by + /// `OptimizeSkewedJoinRule` before adapter conversion. + /// + /// `None` means: no skew-join rewrite applied; the adapter falls back to + /// `try_new` or `try_new_coalesced` depending on the `coalesce` slot. + /// `Some(sp)` means: build the SR with `try_new_skew_join(sp)` so the + /// reader exposes K' = `sp.shards.len()` partitions, each backed by a + /// per-mapper sub-range of one upstream partition. + /// + /// Mutually exclusive with `coalesce`: the rule short-circuits on any + /// leaf that already carries the other decision, and the adapter errors + /// loudly if both somehow end up set. + skew_join: Arc>>>, + /// this disables stage from running even it would be suitable to run. /// /// the main reason for this property this is to allow rules to override @@ -152,6 +166,7 @@ impl ExchangeExec { shuffle_partitions: stage_partitions, partitioning, coalesce: Arc::new(Mutex::new(None)), + skew_join: Arc::new(Mutex::new(None)), inactive_stage: false, } } @@ -227,6 +242,20 @@ impl ExchangeExec { pub fn coalesce(&self) -> Option> { self.coalesce.lock().clone() } + + /// Attaches a `SkewJoinPlan` to this Exchange. The adapter consumes the + /// plan when converting Exchange → ShuffleReader: a Some value triggers + /// `try_new_skew_join` (K'-partition reader, per-mapper sub-range + /// slicing). Mutually exclusive with `coalesce` — setting both is a rule + /// bug. Idempotent overwrite. + pub fn set_skew_join(&self, sp: Arc) { + self.skew_join.lock().replace(sp); + } + + /// Returns the attached `SkewJoinPlan`, if `set_skew_join` was called. + pub fn skew_join(&self) -> Option> { + self.skew_join.lock().clone() + } } impl DisplayAs for ExchangeExec { @@ -258,6 +287,14 @@ impl DisplayAs for ExchangeExec { cp.upstream_partition_count, )?; } + if let Some(sp) = self.skew_join.lock().as_ref() { + write!( + f, + ", skew_join={} of {}", + sp.shards.len(), + sp.upstream_partition_count, + )?; + } Ok(()) } DisplayFormatType::TreeRender => { @@ -323,6 +360,8 @@ impl ExecutionPlan for ExchangeExec { // Carry the coalesce slot so a transform-rebuilt parent chain // doesn't lose the rule's decision. new_exec.coalesce = self.coalesce.clone(); + // Same reasoning for the skew-join slot. + new_exec.skew_join = self.skew_join.clone(); Ok(Arc::new(new_exec)) } else { From bb79edb4016c7ec5d5a079a539ed779868e63fe3 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Tue, 26 May 2026 23:12:17 +0000 Subject: [PATCH 3/4] =?UTF-8?q?feat(aqe):=20OptimizeSkewedJoinRule=20?= =?UTF-8?q?=E2=80=94=20split=20skewed=20join=20inputs,=20replicate=20match?= =?UTF-8?q?ing=20side?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports Spark's OptimizeSkewedJoin into Ballista's AQE pipeline. When one side of a binary join has a partition whose bytes exceed BOTH factor * median(per-side sizes) AND an absolute threshold, the rule splits that side's per-mapper byte vector into contiguous [start_map_idx, end_map_idx) ranges (bin-packed near advisory_partition_bytes via the coalesce module's split_size_list_by_target_size helper) and cartesian-pairs them against the other side's ranges. Both sides' resulting paired SkewJoinPlans land on the two leaf ExchangeExecs via the carrier slots from C2; the adapter consumes them to build K'-partition ShuffleReaderExecs with per-mapper sub-range slicing. Apply Spark's per-join-type split-side allowlist (Inner: both, Left/LeftSemi/ LeftAnti/LeftMark: left only, Right: right only, Full: neither). v1 handles exactly one binary join per stage subtree; multi-join stages bail (Spark iterates per-join, a follow-up can do the same). Default off via ballista.planner.skew_join.enabled. Wired after CoalescePartitionsRule in planner.rs::actionable_stages() — the two carriers are mutually exclusive, so running coalesce first lets skew-join idempotently short-circuit on already-claimed leaves. Algorithm helpers (skew_join/algorithm.rs): is_skewed dual-guard detection, map_ranges_for_upstream bin-packer (reuses coalesce's helper), pair_shards cartesian product, robust_median. 11 unit tests cover detection edges, bin-pack output, and the four cartesian shapes (split×split, split×passthrough, passthrough×split, passthrough×passthrough). Integration tests (state/aqe/test/skew_join_rule.rs): 6 end-to-end cases through AdaptivePlanner — single-side skew on SMJ and HashJoin(Partitioned), disabled, below-threshold guard, uniform-bytes guard, and a final-stage shuffle-reader check that K'=7 partitions flow through the adapter into the runnable plan. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- ballista/scheduler/src/state/aqe/mod.rs | 1 + .../src/state/aqe/optimizer_rule/mod.rs | 2 + .../optimizer_rule/optimize_skewed_join.rs | 377 +++++++++++++++ ballista/scheduler/src/state/aqe/planner.rs | 8 +- .../src/state/aqe/skew_join/algorithm.rs | 332 ++++++++++++++ .../scheduler/src/state/aqe/skew_join/mod.rs | 31 ++ ballista/scheduler/src/state/aqe/test/mod.rs | 2 + .../src/state/aqe/test/skew_join_rule.rs | 429 ++++++++++++++++++ 8 files changed, 1180 insertions(+), 2 deletions(-) create mode 100644 ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs create mode 100644 ballista/scheduler/src/state/aqe/skew_join/algorithm.rs create mode 100644 ballista/scheduler/src/state/aqe/skew_join/mod.rs create mode 100644 ballista/scheduler/src/state/aqe/test/skew_join_rule.rs diff --git a/ballista/scheduler/src/state/aqe/mod.rs b/ballista/scheduler/src/state/aqe/mod.rs index cd531e499a..10d33dcdb5 100644 --- a/ballista/scheduler/src/state/aqe/mod.rs +++ b/ballista/scheduler/src/state/aqe/mod.rs @@ -57,6 +57,7 @@ pub(crate) mod coalesce; pub(crate) mod execution_plan; pub mod optimizer_rule; pub mod planner; +pub(crate) mod skew_join; #[cfg(test)] mod test; diff --git a/ballista/scheduler/src/state/aqe/optimizer_rule/mod.rs b/ballista/scheduler/src/state/aqe/optimizer_rule/mod.rs index 348158bb43..27d0c65b89 100644 --- a/ballista/scheduler/src/state/aqe/optimizer_rule/mod.rs +++ b/ballista/scheduler/src/state/aqe/optimizer_rule/mod.rs @@ -18,9 +18,11 @@ pub mod coalesce_partitions; pub mod datafusion_patch; pub mod distributed_exchange; +pub mod optimize_skewed_join; pub mod propagate_empty; pub use coalesce_partitions::*; pub use datafusion_patch::*; pub use distributed_exchange::*; +pub use optimize_skewed_join::*; pub use propagate_empty::*; diff --git a/ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs b/ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs new file mode 100644 index 0000000000..1d2e009c19 --- /dev/null +++ b/ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! AQE rule that splits skewed inputs of a partitioned hash / sort-merge join +//! and replicates the matching partition on the other side. +//! +//! [`OptimizeSkewedJoinRule`] runs once per `replan_stages()` pass on a stage +//! subtree whose root is either an [`ExchangeExec`] (intermediate stage) or +//! an [`AdaptiveDatafusionExec`] (final stage). Port of Spark's +//! `OptimizeSkewedJoin` AQE rule. +//! +//! # Mechanism +//! +//! 1. **Find** the binary join in the stage subtree +//! (`HashJoinExec(Partitioned)` or `SortMergeJoinExec`). v1 handles +//! stages with exactly one binary join; multi-join stages bail. +//! 2. **Apply** Spark's per-join-type split-side allowlist. `Full` is +//! always skipped — splitting either side drops unmatched rows. +//! 3. **Descend** each side of the join through "passthrough" wrappers +//! (`SortExec`, `ProjectionExec`, `FilterExec`, …) to the unique leaf +//! [`ExchangeExec`]. Bails when a side has zero or more than one. +//! 4. **Detect skew** per side, per upstream index: a partition is skewed +//! iff its bytes exceed BOTH `factor * median(per-side sizes)` AND an +//! absolute `threshold_bytes`. +//! 5. **Split** each skewed upstream's per-mapper byte vector into +//! contiguous `[start_map_idx, end_map_idx)` ranges via the coalesce +//! module's `split_size_list_by_target_size`. Non-skewed upstreams stay +//! as one passthrough range covering all mappers. +//! 6. **Cartesian-pair** the per-upstream left/right ranges: every left +//! range pairs with every right range, producing `|L| × |R|` downstream +//! tasks per upstream. When only one side is split N-way, the other +//! side is read N times (Spark's "replicate the matching partition"). +//! 7. **Attach** the two paired `SkewJoinPlan`s — one per leg — onto the +//! two leaf Exchanges. The adapter consumes them at conversion time. +//! +//! # Correctness +//! +//! Each (left-shard-j, right-shard-j) pair is a downstream task. By +//! construction every (left-row, right-row) join pair is produced exactly +//! once: +//! +//! - When only the left is split N-way, the right bucket is replicated N +//! times. Each right row appears in N tasks but is joined against a +//! disjoint slice of left rows each time, so it pairs with each matching +//! left row exactly once. +//! - For `LeftOuter`: every left row lives in exactly one shard; that +//! shard's task does its own left-outer against the full right replica, +//! so unmatched left rows are NULL-extended exactly once. +//! - For `RightOuter`: mirror argument on the right side. +//! - `Full` is excluded — symmetric replication would over-emit unmatched +//! rows on whichever side gets replicated, and Spark also doesn't handle +//! it here. +//! +//! # Why bail on multi-join stages +//! +//! Two joins sharing an Exchange in the same subtree would need a single +//! skew decision applied consistently across both joins' requirements — +//! beyond the scope of this v1 port. Spark handles it via a separate +//! per-join evaluation pass. +//! +//! # Default off +//! +//! `ballista.planner.skew_join.enabled = false`. The rule is an opt-in +//! trade — splitting skewed partitions adds scheduling overhead and +//! re-reads the replicated side's shuffle blocks, but eliminates the +//! straggler that comes from one fat join key. Workloads with no skew get +//! no benefit, only the overhead of the safety walk. + +use std::sync::Arc; + +use ballista_core::config::BallistaConfig; +use ballista_core::execution_plans::SkewJoinPlan; +use ballista_core::serde::scheduler::PartitionLocation; +use datafusion::common::JoinType; +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion::config::ConfigOptions; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; +use log::debug; + +use crate::state::aqe::execution_plan::{AdaptiveDatafusionExec, ExchangeExec}; +use crate::state::aqe::skew_join::{is_skewed, map_ranges_for_upstream, pair_shards}; + +/// AQE rule that attaches paired `SkewJoinPlan`s to the two leaf +/// `ExchangeExec`s of a binary join when one (or both) sides have a skewed +/// partition. +/// +/// See module docs for design intent. +#[derive(Debug, Default)] +pub struct OptimizeSkewedJoinRule; + +impl PhysicalOptimizerRule for OptimizeSkewedJoinRule { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> datafusion::common::Result> { + let bc = config + .extensions + .get::() + .cloned() + .unwrap_or_default(); + if !bc.skew_join_enabled() { + return Ok(plan); + } + let factor = bc.skew_join_skewed_partition_factor(); + let threshold_bytes = bc.skew_join_skewed_partition_threshold_bytes(); + let advisory_bytes = bc.skew_join_advisory_partition_bytes(); + let small_factor = bc.skew_join_small_partition_factor(); + + debug!( + "[skew-join-rule] fire: factor={factor} threshold_bytes={threshold_bytes} \ + advisory_bytes={advisory_bytes} small_factor={small_factor}", + ); + + // Two root kinds, same outcome — same convention as CoalescePartitionsRule. + let input = if let Some(ex) = plan.as_any().downcast_ref::() { + ex.input().clone() + } else if let Some(adp) = plan.as_any().downcast_ref::() { + adp.input().clone() + } else { + debug!( + "[skew-join-rule] root is neither ExchangeExec nor AdaptiveDatafusionExec; bail" + ); + return Ok(plan); + }; + + // Find the binary join. v1 requires exactly one; multi-join stages + // bail (Spark iterates per-join — a future pass can do the same). + let mut joins: Vec> = Vec::new(); + input.apply(|node| { + if join_type_of(node).is_some() { + joins.push(node.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + if joins.len() != 1 { + debug!( + "[skew-join-rule] subtree has {} binary joins (need exactly 1); bail", + joins.len() + ); + return Ok(plan); + } + let join = &joins[0]; + let jt = join_type_of(join).expect("filtered above to Some"); + + let (can_split_left, can_split_right) = split_allowlist(jt); + if !can_split_left && !can_split_right { + debug!("[skew-join-rule] join_type={jt:?} disallows splitting either side; bail"); + return Ok(plan); + } + + // Descend each leg to the unique ExchangeExec leaf. + let join_children = join.children(); + debug_assert_eq!( + join_children.len(), + 2, + "binary joins (HashJoinExec/SortMergeJoinExec) must have exactly 2 children", + ); + let Some(left_leaf_arc) = single_exchange_leaf(join_children[0])? else { + debug!("[skew-join-rule] left side has no unique ExchangeExec leaf; bail"); + return Ok(plan); + }; + let Some(right_leaf_arc) = single_exchange_leaf(join_children[1])? else { + debug!("[skew-join-rule] right side has no unique ExchangeExec leaf; bail"); + return Ok(plan); + }; + let left_leaf = as_exchange(&left_leaf_arc); + let right_leaf = as_exchange(&right_leaf_arc); + + // Idempotence guard: don't fight a coalesce or a prior skew-join pass. + if left_leaf.coalesce().is_some() + || left_leaf.skew_join().is_some() + || right_leaf.coalesce().is_some() + || right_leaf.skew_join().is_some() + { + debug!("[skew-join-rule] at least one leaf already has coalesce/skew_join set; bail"); + return Ok(plan); + } + + // Both leaves must be resolved with matching M. Heterogeneous M + // between legs is a Q22-style edge case the rule isn't designed + // for — coalesce bails on it too. + let Some(left_parts) = left_leaf.shuffle_partitions() else { + debug!("[skew-join-rule] left leaf unresolved; bail (will rerun next pass)"); + return Ok(plan); + }; + let Some(right_parts) = right_leaf.shuffle_partitions() else { + debug!("[skew-join-rule] right leaf unresolved; bail (will rerun next pass)"); + return Ok(plan); + }; + let m = left_parts.len(); + if right_parts.len() != m { + debug!( + "[skew-join-rule] heterogeneous M (left={} right={}); bail", + left_parts.len(), + right_parts.len() + ); + return Ok(plan); + } + + // Per-side per-upstream byte totals. + let left_sizes: Vec = left_parts.iter().map(|locs| sum_bytes(locs)).collect(); + let right_sizes: Vec = + right_parts.iter().map(|locs| sum_bytes(locs)).collect(); + + // Per-side per-upstream skew decisions, gated by the allowlist. + let left_skewed: Vec = left_sizes + .iter() + .map(|&b| can_split_left && is_skewed(b, &left_sizes, factor, threshold_bytes)) + .collect(); + let right_skewed: Vec = right_sizes + .iter() + .map(|&b| { + can_split_right && is_skewed(b, &right_sizes, factor, threshold_bytes) + }) + .collect(); + + if left_skewed.iter().all(|&b| !b) && right_skewed.iter().all(|&b| !b) { + debug!("[skew-join-rule] no upstream qualifies as skewed; bail"); + return Ok(plan); + } + + // Build per-upstream sub-ranges for each side. Skewed → bin-pack + // per-mapper bytes near advisory_bytes; non-skewed → passthrough. + let left_ranges = build_ranges( + &left_parts, + &left_skewed, + advisory_bytes, + small_factor, + ); + let right_ranges = build_ranges( + &right_parts, + &right_skewed, + advisory_bytes, + small_factor, + ); + + let (left_shards, right_shards) = pair_shards(&left_ranges, &right_ranges); + let k_prime = left_shards.len(); + debug_assert_eq!(k_prime, right_shards.len()); + + // No fan-out means no benefit, only overhead — bail. + if k_prime <= m { + debug!("[skew-join-rule] K'={k_prime} <= M={m}; no benefit, bail"); + return Ok(plan); + } + + debug!( + "[skew-join-rule] attaching SkewJoinPlan: M={m} K'={k_prime} \ + (skew bits left={left_skewed:?} right={right_skewed:?})" + ); + left_leaf.set_skew_join(Arc::new(SkewJoinPlan { + upstream_partition_count: m as u32, + shards: left_shards, + })); + right_leaf.set_skew_join(Arc::new(SkewJoinPlan { + upstream_partition_count: m as u32, + shards: right_shards, + })); + Ok(plan) + } + + fn name(&self) -> &str { + "OptimizeSkewedJoinRule" + } + + fn schema_check(&self) -> bool { + false + } +} + +/// Returns the join type if `node` is a binary join eligible for the skew +/// rewrite. Broadcast hash joins (`PartitionMode::CollectLeft`) and +/// pre-broadcast `Auto` modes are excluded — they have no per-side shuffle +/// the rule could split. +fn join_type_of(node: &Arc) -> Option { + if let Some(h) = node.as_any().downcast_ref::() { + return match h.partition_mode() { + PartitionMode::Partitioned => Some(*h.join_type()), + _ => None, + }; + } + if let Some(s) = node.as_any().downcast_ref::() { + return Some(s.join_type()); + } + None +} + +/// Spark's per-join-type split-side allowlist: `(can_split_left, can_split_right)`. +/// +/// Source: `OptimizeSkewedJoin.scala` — `canSplitLeftSide` / `canSplitRightSide`. +/// `Full` returns `(false, false)`. Right-side semi/anti/mark variants fall +/// into the conservative catch-all and are not split. +fn split_allowlist(jt: JoinType) -> (bool, bool) { + match jt { + JoinType::Inner => (true, true), + JoinType::Left => (true, false), + JoinType::Right => (false, true), + JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false), + _ => (false, false), + } +} + +/// Walks `root` and returns the unique descendant `ExchangeExec` Arc, or +/// `None` if zero or more than one is present. Stops descending at every +/// Exchange (the upstream stage's compute belongs to a different subtree). +fn single_exchange_leaf( + root: &Arc, +) -> datafusion::common::Result>> { + let mut exchanges: Vec> = Vec::new(); + root.apply(|node| { + if node.as_any().is::() { + exchanges.push(node.clone()); + return Ok(TreeNodeRecursion::Jump); + } + Ok(TreeNodeRecursion::Continue) + })?; + Ok(if exchanges.len() == 1 { + Some(exchanges.remove(0)) + } else { + None + }) +} + +/// Same one-liner used by the coalesce rule. The cast is infallible because +/// the caller filtered to `ExchangeExec` already. +fn as_exchange(arc: &Arc) -> &ExchangeExec { + arc.as_any() + .downcast_ref::() + .expect("filtered to ExchangeExec above") +} + +/// Build the `(start_map_idx, end_map_idx)` ranges per upstream for one side. +fn build_ranges( + parts: &[Vec], + skewed: &[bool], + advisory_bytes: u64, + small_factor: f64, +) -> Vec> { + parts + .iter() + .zip(skewed.iter()) + .map(|(locs, &is_skewed)| { + if is_skewed { + let per_mapper: Vec = locs + .iter() + .map(|loc| loc.partition_stats.num_bytes().unwrap_or(0)) + .collect(); + map_ranges_for_upstream(&per_mapper, advisory_bytes, small_factor) + } else { + vec![(0, locs.len() as u32)] + } + }) + .collect() +} + +fn sum_bytes(locs: &[PartitionLocation]) -> u64 { + locs.iter() + .filter_map(|l| l.partition_stats.num_bytes()) + .sum() +} diff --git a/ballista/scheduler/src/state/aqe/planner.rs b/ballista/scheduler/src/state/aqe/planner.rs index 03361dc3c5..df5cad6ae6 100644 --- a/ballista/scheduler/src/state/aqe/planner.rs +++ b/ballista/scheduler/src/state/aqe/planner.rs @@ -17,8 +17,8 @@ use crate::state::aqe::adapter::BallistaAdapter; use crate::state::aqe::execution_plan::{AdaptiveDatafusionExec, ExchangeExec}; use crate::state::aqe::optimizer_rule::{ - CoalescePartitionsRule, DistributedExchangeRule, PropagateEmptyExecRule, - WarnOnDuplicateExecRule, + CoalescePartitionsRule, DistributedExchangeRule, OptimizeSkewedJoinRule, + PropagateEmptyExecRule, WarnOnDuplicateExecRule, }; use crate::state::execution_stage::StageOutput; @@ -305,6 +305,10 @@ impl AdaptivePlanner { // that would arise if the rule walked the entire residual // plan in `default_optimizers()`. let plan = CoalescePartitionsRule.optimize(plan, config)?; + // OptimizeSkewedJoinRule must run AFTER coalesce so it + // can short-circuit on leaves the coalesce rule already + // claimed (the two carriers are mutually exclusive). + let plan = OptimizeSkewedJoinRule.optimize(plan, config)?; BallistaAdapter::adapt_to_ballista( plan, self.job_name.as_str(), diff --git a/ballista/scheduler/src/state/aqe/skew_join/algorithm.rs b/ballista/scheduler/src/state/aqe/skew_join/algorithm.rs new file mode 100644 index 0000000000..2aeb2fbb18 --- /dev/null +++ b/ballista/scheduler/src/state/aqe/skew_join/algorithm.rs @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-upstream skew detection and shard-pairing for `OptimizeSkewedJoinRule`. +//! +//! Mirrors Spark's `OptimizeSkewedJoin` + `ShufflePartitionsUtil` +//! (`createSkewPartitionSpecs`, `splitSizeListByTargetSize`): +//! +//! 1. A side's partition `i` is skewed iff its total bytes exceed BOTH +//! `factor * median(per-side sizes)` AND an absolute `threshold_bytes`. +//! 2. When a side is skewed on partition `i`, that side's mapper-output +//! byte vector is greedy-packed into contiguous mapper-index groups whose +//! cumulative bytes ≈ `advisory_bytes` (the same bin-packer the coalesce +//! rule uses). When the side is not skewed, it stays as one passthrough +//! range `[0, num_mappers)`. +//! 3. Per-upstream `i` the left and right ranges are cartesian-paired: every +//! left range pairs with every right range, producing `|L| × |R|` +//! downstream tasks for that upstream index. When neither side is split +//! `|L| × |R| = 1` (passthrough). When one side is split N-way and the +//! other isn't, that's an N×1 = N-way fan-out where the non-split side +//! is read N times (Spark's "replicate the matching partition" mechanic). +//! +//! Float arithmetic for the skew check is intentional — every comparison +//! casts `u64 → f64` so `factor * median` matches Spark's reference +//! semantics exactly. + +use ballista_core::execution_plans::SkewJoinShard; + +use crate::state::aqe::coalesce::split_size_list_by_target_size; + +/// Is per-side partition byte total `bytes` skewed? +/// +/// Skewed iff `bytes > factor * median(sizes)` AND `bytes > threshold_bytes`. +/// The dual guard prevents fan-out of trivially small partitions (Spark's +/// `skewedPartitionThresholdInBytes` and `skewedPartitionFactor`). +/// +/// Caller passes the side's full size vector (used for the median) — that +/// side is checked independently of the other side. +pub fn is_skewed( + bytes: u64, + sizes: &[u64], + factor: f64, + threshold_bytes: u64, +) -> bool { + if bytes < threshold_bytes { + return false; + } + let med = robust_median(sizes) as f64; + (bytes as f64) > factor * med +} + +/// Bin-pack a per-mapper byte vector into contiguous map-index ranges whose +/// cumulative bytes target `advisory_bytes`. +/// +/// Reuses the coalesce rule's `split_size_list_by_target_size` so the +/// skewed-join sub-shard packer matches the coalesce packer exactly +/// (Spark uses the same helper for both). +/// +/// `merged_partition_factor` is hard-coded to `1.0` — coalesce raises it +/// above 1.0 to fold neighboring small partitions together on flush, but for +/// skew sub-sharding we *want* small sub-shards near the advisory size, +/// so the neighbor-merge refinement is disabled. The `small_partition_factor` +/// passed through still controls tail-merge. +/// +/// Returns a `Vec<(start_map_idx, end_map_idx)>` (exclusive end) of length +/// ≥ 1. When `per_mapper_bytes` is empty, returns `vec![(0, 0)]` to keep the +/// downstream pairing logic correctness (one passthrough shard for an empty +/// upstream is benign — the adapter produces an empty read). +pub fn map_ranges_for_upstream( + per_mapper_bytes: &[u64], + advisory_bytes: u64, + small_partition_factor: f64, +) -> Vec<(u32, u32)> { + // merged_partition_factor=1.0 disables the coalesce-style neighbor merge. + let starts = + split_size_list_by_target_size(per_mapper_bytes, advisory_bytes, small_partition_factor, 1.0); + let total = per_mapper_bytes.len(); + starts + .iter() + .enumerate() + .map(|(k, &start)| { + let end = starts.get(k + 1).copied().unwrap_or(total); + (start as u32, end as u32) + }) + .collect() +} + +/// Cartesian-pair per-upstream left/right map-ranges into the matched K' +/// shard lists for both sides of the join. +/// +/// For each upstream index `i` and each `(left_range, right_range)` pair, two +/// `SkewJoinShard`s are appended — one to each output list. The two output +/// lists have the same length (= K') and index-aligned entries so the +/// adapter can build matched join inputs by zipping them. +/// +/// `left_ranges_per_upstream` and `right_ranges_per_upstream` must have the +/// same outer length (= M, the upstream partition count). When a side is +/// not split on upstream `i`, callers pass `vec![(0, num_mappers_i)]` — one +/// passthrough range. The cartesian product correctly handles split×split +/// (full N×M), split×passthrough (N×1 = N), passthrough×split (1×M = M), and +/// passthrough×passthrough (1×1 = 1) shapes per upstream. +pub fn pair_shards( + left_ranges_per_upstream: &[Vec<(u32, u32)>], + right_ranges_per_upstream: &[Vec<(u32, u32)>], +) -> (Vec, Vec) { + debug_assert_eq!( + left_ranges_per_upstream.len(), + right_ranges_per_upstream.len(), + "left and right per-upstream range lists must have equal length (= M)", + ); + + let mut left_shards = Vec::new(); + let mut right_shards = Vec::new(); + for (i, (l_ranges, r_ranges)) in left_ranges_per_upstream + .iter() + .zip(right_ranges_per_upstream.iter()) + .enumerate() + { + let i = i as u32; + for &(ls, le) in l_ranges { + for &(rs, re) in r_ranges { + left_shards.push(SkewJoinShard { + upstream_idx: i, + start_map_idx: ls, + end_map_idx: le, + }); + right_shards.push(SkewJoinShard { + upstream_idx: i, + start_map_idx: rs, + end_map_idx: re, + }); + } + } + } + (left_shards, right_shards) +} + +/// Median of a byte-size slice, robust against the single-outlier case the +/// skew check is designed to catch. +/// +/// Returns 0 for empty input — combined with the `threshold_bytes` guard in +/// `is_skewed`, an all-zero or empty `sizes` slice never reports skewed. +fn robust_median(values: &[u64]) -> u64 { + if values.is_empty() { + return 0; + } + let mut sorted: Vec = values.to_vec(); + sorted.sort_unstable(); + sorted[sorted.len() / 2] +} + +#[cfg(test)] +mod tests { + use super::*; + + // Match Spark defaults so the tests double as documentation of the + // production configuration semantics. + const FACTOR: f64 = 5.0; + const THRESHOLD: u64 = 256 * 1024 * 1024; + const ADVISORY: u64 = 64 * 1024 * 1024; + const SMALL: f64 = 0.2; + + #[test] + fn is_skewed_requires_both_factor_and_threshold() { + // 8 partitions: seven small, one large. Median = small. + let sizes = vec![10 * 1024 * 1024; 7] + .into_iter() + .chain(std::iter::once(2 * 1024 * 1024 * 1024)) + .collect::>(); + + // The large one qualifies on both ratio (2 GB / 10 MB = 200× > 5) + // and threshold (2 GB > 256 MB) → skewed. + assert!(is_skewed(2 * 1024 * 1024 * 1024, &sizes, FACTOR, THRESHOLD)); + + // A small one fails the threshold even though everything's small — + // ratio is 1× which fails factor anyway. + assert!(!is_skewed(10 * 1024 * 1024, &sizes, FACTOR, THRESHOLD)); + } + + #[test] + fn is_skewed_fails_threshold_with_huge_ratio() { + // Median = 10 KB; outlier = 100 MB. Ratio is 10000× (well over 5), + // but 100 MB < threshold (256 MB) — bail. Prevents fan-out of + // trivially small "skewed" partitions. + let sizes = vec![10 * 1024, 10 * 1024, 10 * 1024, 100 * 1024 * 1024]; + assert!(!is_skewed(100 * 1024 * 1024, &sizes, FACTOR, THRESHOLD)); + } + + #[test] + fn is_skewed_fails_factor_with_uniform_sizes() { + // All equal — ratio is exactly 1, fails factor of 5. + let sizes = vec![1024 * 1024 * 1024; 8]; + assert!(!is_skewed(1024 * 1024 * 1024, &sizes, FACTOR, THRESHOLD)); + } + + #[test] + fn is_skewed_handles_zero_median() { + // Lots of empty partitions; one moderately large. + // Median = 0, so factor check is bytes > 0 (trivially yes for the + // outlier). Threshold still applies — must be > 256 MB. + let sizes = vec![0, 0, 0, 0, 0, 0, 0, 512 * 1024 * 1024]; + assert!(is_skewed(512 * 1024 * 1024, &sizes, FACTOR, THRESHOLD)); + // But a 100 MB outlier with the same zero-median fails threshold. + assert!(!is_skewed(100 * 1024 * 1024, &sizes, FACTOR, THRESHOLD)); + } + + #[test] + fn map_ranges_passthrough_when_total_under_advisory() { + // 4 mappers, each 10 MB → 40 MB total < 64 MB target. The bin-packer + // never flushes; one passthrough range covers all mappers. + let per_mapper = vec![10 * 1024 * 1024; 4]; + let ranges = map_ranges_for_upstream(&per_mapper, ADVISORY, SMALL); + assert_eq!(ranges, vec![(0, 4)]); + } + + #[test] + fn map_ranges_splits_at_advisory_boundary() { + // 8 mappers, each 50 MB → 400 MB total. Advisory = 64 MB. + // Bin-pack: 50 fills; +50=100 > 64 flush at idx 1; restart; +50=100 > 64 + // flush at idx 2; etc. → boundaries [0,1,2,3,4,5,6,7] → 8 single-mapper shards. + let per_mapper = vec![50 * 1024 * 1024; 8]; + let ranges = map_ranges_for_upstream(&per_mapper, ADVISORY, SMALL); + assert_eq!( + ranges, + vec![(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8)] + ); + } + + #[test] + fn map_ranges_empty_input_yields_single_zero_range() { + // Documents the guard: empty per_mapper still returns one range so + // pair_shards' zipped iteration shape is preserved. + let ranges = map_ranges_for_upstream(&[], ADVISORY, SMALL); + assert_eq!(ranges, vec![(0, 0)]); + } + + #[test] + fn pair_shards_cartesian_when_only_left_split() { + // M = 1 upstream. Left split 3-way, right passthrough. + // Expected K' = 3×1 = 3 — same range on left repeated against the + // full right replica. + let left = vec![vec![(0, 2), (2, 4), (4, 6)]]; + let right = vec![vec![(0, 6)]]; + let (l, r) = pair_shards(&left, &right); + assert_eq!( + l, + vec![ + SkewJoinShard { upstream_idx: 0, start_map_idx: 0, end_map_idx: 2 }, + SkewJoinShard { upstream_idx: 0, start_map_idx: 2, end_map_idx: 4 }, + SkewJoinShard { upstream_idx: 0, start_map_idx: 4, end_map_idx: 6 }, + ] + ); + assert_eq!( + r, + vec![ + SkewJoinShard { upstream_idx: 0, start_map_idx: 0, end_map_idx: 6 }, + SkewJoinShard { upstream_idx: 0, start_map_idx: 0, end_map_idx: 6 }, + SkewJoinShard { upstream_idx: 0, start_map_idx: 0, end_map_idx: 6 }, + ] + ); + } + + #[test] + fn pair_shards_cartesian_when_both_split() { + // M = 1 upstream. Left split 2-way, right split 3-way. + // Expected K' = 2×3 = 6 — full cartesian. + let left = vec![vec![(0, 4), (4, 8)]]; + let right = vec![vec![(0, 2), (2, 5), (5, 8)]]; + let (l, r) = pair_shards(&left, &right); + assert_eq!(l.len(), 6); + assert_eq!(r.len(), 6); + // Layout: for each left range, pair with each right range in order. + assert_eq!(l[0].end_map_idx, 4); assert_eq!(r[0].end_map_idx, 2); + assert_eq!(l[1].end_map_idx, 4); assert_eq!(r[1].end_map_idx, 5); + assert_eq!(l[2].end_map_idx, 4); assert_eq!(r[2].end_map_idx, 8); + assert_eq!(l[3].end_map_idx, 8); assert_eq!(r[3].end_map_idx, 2); + assert_eq!(l[4].end_map_idx, 8); assert_eq!(r[4].end_map_idx, 5); + assert_eq!(l[5].end_map_idx, 8); assert_eq!(r[5].end_map_idx, 8); + } + + #[test] + fn pair_shards_passthrough_passthrough_yields_one_pair() { + // Neither side split — one (passthrough, passthrough) pair, just + // like the no-skew baseline. + let left = vec![vec![(0, 4)]]; + let right = vec![vec![(0, 4)]]; + let (l, r) = pair_shards(&left, &right); + assert_eq!(l.len(), 1); + assert_eq!(r.len(), 1); + assert_eq!(l[0].start_map_idx, 0); + assert_eq!(l[0].end_map_idx, 4); + } + + #[test] + fn pair_shards_multi_upstream_accumulates() { + // M = 3 upstreams. Mixed shapes: + // upstream 0: left split 2, right passthrough → 2 pairs + // upstream 1: both passthrough → 1 pair + // upstream 2: right split 3, left passthrough → 3 pairs + // Total K' = 6. + let left = vec![ + vec![(0, 2), (2, 4)], + vec![(0, 4)], + vec![(0, 4)], + ]; + let right = vec![ + vec![(0, 4)], + vec![(0, 4)], + vec![(0, 1), (1, 2), (2, 4)], + ]; + let (l, r) = pair_shards(&left, &right); + assert_eq!(l.len(), 6); + assert_eq!(r.len(), 6); + // upstream_idx packed in expected sequence. + let upstream_ids: Vec = l.iter().map(|s| s.upstream_idx).collect(); + assert_eq!(upstream_ids, vec![0, 0, 1, 2, 2, 2]); + } +} diff --git a/ballista/scheduler/src/state/aqe/skew_join/mod.rs b/ballista/scheduler/src/state/aqe/skew_join/mod.rs new file mode 100644 index 0000000000..9947a373ac --- /dev/null +++ b/ballista/scheduler/src/state/aqe/skew_join/mod.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! AQE optimize-skewed-join helpers (pure-CPU). +//! +//! This submodule packages the per-upstream skew detection and shard pairing +//! helpers `OptimizeSkewedJoinRule` consumes: +//! +//! - [`is_skewed`] — per-side detection (factor × median AND absolute threshold). +//! - [`map_ranges_for_upstream`] — bin-pack one upstream partition's per-mapper +//! byte sizes into `(start_map_idx, end_map_idx)` ranges via the shared +//! `split_size_list_by_target_size` helper from the coalesce module. +//! - [`pair_shards`] — cartesian-product the per-upstream left/right ranges +//! into the matched K' shard lists Spark's `OptimizeSkewedJoin` produces. + +pub(crate) mod algorithm; +pub(crate) use algorithm::*; diff --git a/ballista/scheduler/src/state/aqe/test/mod.rs b/ballista/scheduler/src/state/aqe/test/mod.rs index 831f25c1c4..27d104050d 100644 --- a/ballista/scheduler/src/state/aqe/test/mod.rs +++ b/ballista/scheduler/src/state/aqe/test/mod.rs @@ -21,6 +21,8 @@ mod alter_stages; mod coalesce_rule; /// Tests if plan is going to be split to stages correctly mod plan_to_stages; +/// Functional tests for the OptimizeSkewedJoinRule end-to-end through the planner +mod skew_join_rule; use ballista_core::config::BALLISTA_SHUFFLE_SORT_BASED_ENABLED; use ballista_core::extension::SessionConfigExt; diff --git a/ballista/scheduler/src/state/aqe/test/skew_join_rule.rs b/ballista/scheduler/src/state/aqe/test/skew_join_rule.rs new file mode 100644 index 0000000000..3c4c98007f --- /dev/null +++ b/ballista/scheduler/src/state/aqe/test/skew_join_rule.rs @@ -0,0 +1,429 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functional tests for [`OptimizeSkewedJoinRule`]: drive a hash- or +//! sort-merge join through `AdaptivePlanner`, finalize the upstream stages +//! with synthetic per-partition byte stats (one fat partition), then snapshot +//! the displayed plan tree so the rule's effect on the two leaf +//! `ExchangeExec`s is visible at the `skew_join=K' of M` field. +//! +//! Tests trace from inputs — small synthetic byte sizes are paired with +//! correspondingly small `skew_join` thresholds so the bin-pack outcome is +//! hand-traceable through `split_size_list_by_target_size`. + +use crate::assert_plan; +use crate::state::aqe::planner::AdaptivePlanner; +use crate::state::aqe::test::{mock_batch, mock_schema}; +use ballista_core::extension::SessionConfigExt; +use ballista_core::serde::scheduler::{ + ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, + PartitionId, PartitionLocation, PartitionStats, +}; +use datafusion::datasource::MemTable; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::{SessionConfig, SessionContext}; +use std::sync::Arc; + +/// Build a session context with the Ballista extension and the skew-join +/// knobs forced to test-scale values. +/// +/// Thresholds are tiny so we can drive the rule with hand-sized byte counts: +/// - `factor = 5.0` (Spark default) +/// - `threshold_bytes = 100` (vs 256 MiB in prod) — anything >100 may skew +/// - `advisory_bytes = 50` (vs 64 MiB in prod) — sub-shard target ≈ 50 +/// - `small_factor = 0.2` (Spark default) +fn skew_join_context(target_partitions: usize, enabled: bool) -> SessionContext { + let config = SessionConfig::new_with_ballista() + .with_target_partitions(target_partitions) + .with_round_robin_repartition(false) + .with_ballista_skew_join_enabled(enabled) + .with_ballista_skew_join_skewed_partition_factor(5.0) + .with_ballista_skew_join_skewed_partition_threshold_bytes(100) + .with_ballista_skew_join_advisory_partition_bytes(50) + .with_ballista_skew_join_small_partition_factor(0.2); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + + SessionContext::new_with_state(state) +} + +/// Register a MemTable with `n_partitions` partitions. Multi-partition +/// sources force `EnforceDistribution` to insert a hash repartition before +/// the join, which is what gives us `ExchangeExec` leaves to attach to. +fn register_partitioned_table( + ctx: &SessionContext, + name: &str, + n_partitions: usize, +) -> datafusion::error::Result<()> { + let data = (0..n_partitions) + .map(|_| Ok(vec![mock_batch()?])) + .collect::>>()?; + let table = MemTable::try_new(mock_schema(), data)?; + ctx.register_table(name, Arc::new(table))?; + Ok(()) +} + +/// Build a `Vec>` of length `per_mapper_bytes.len()`. +/// Each upstream partition `i` gets one mapper-output row of size +/// `per_mapper_bytes[i]`. +fn passthrough_partitions(per_mapper_bytes: &[u64]) -> Vec> { + per_mapper_bytes + .iter() + .enumerate() + .map(|(idx, &bytes)| vec![synthetic_loc(idx, 0, bytes)]) + .collect() +} + +/// Build a `Vec>` where upstream `skewed_idx` has +/// `n_mappers_at_skew` mapper outputs (so the bin-packer can carve the +/// skewed upstream into sub-ranges), and every other upstream has one +/// mapper-output row at `non_skew_bytes`. +/// +/// The skewed upstream's per-mapper bytes are uniform at +/// `skewed_per_mapper_bytes`, so the sub-shard count is deterministic: +/// `ceil(total / advisory_bytes)` give-or-take the tail-merge rule. +fn partitions_with_one_skewed( + m: usize, + skewed_idx: usize, + n_mappers_at_skew: usize, + skewed_per_mapper_bytes: u64, + non_skew_bytes: u64, +) -> Vec> { + (0..m) + .map(|i| { + if i == skewed_idx { + (0..n_mappers_at_skew) + .map(|m_idx| synthetic_loc(i, m_idx, skewed_per_mapper_bytes)) + .collect() + } else { + vec![synthetic_loc(i, 0, non_skew_bytes)] + } + }) + .collect() +} + +fn synthetic_loc( + upstream_idx: usize, + map_partition_id: usize, + bytes: u64, +) -> PartitionLocation { + PartitionLocation { + map_partition_id, + partition_id: PartitionId { + job_id: "".to_string(), + stage_id: 0, + partition_id: upstream_idx, + }, + executor_meta: ExecutorMetadata { + id: "".to_string(), + host: "".to_string(), + port: 0, + grpc_port: 0, + specification: ExecutorSpecification::default().with_task_slots(0), + os_info: ExecutorOperatingSystemSpecification::default(), + }, + partition_stats: PartitionStats::new(Some(1), None, Some(bytes)), + file_id: None, + is_sort_shuffle: false, + } +} + +/// Happy path: one side has a fat upstream partition. The other partitions +/// (and all of the other side's partitions) are well below threshold, so the +/// only skew is on left upstream 0. +/// +/// Trace: M=4. Right (uniform `[10; 4]`, median=10) — none qualifies as +/// skewed (factor*median=50, threshold=100, none over 100). Left +/// `[600, 10, 10, 10]`, median=10 — only upstream 0 qualifies (600 > 50 AND +/// 600 > 100). Skewed upstream has 4 mappers @ 150 bytes each (total 600); +/// bin-pack at advisory=50: every mapper overshoots and flushes → 4 ranges +/// `[(0,1),(1,2),(2,3),(3,4)]`. Right upstream 0 stays passthrough `(0,1)`. +/// Per-upstream cartesian: left u0 = 4×1 = 4 pairs, u1/u2/u3 = 1×1 = 1 each. +/// K' = 4 + 3 = 7. Since K' > M (7 > 4), the rule attaches. +/// +/// Both leaves get a `SkewJoinPlan { upstream_partition_count: 4, shards: 7 }`. +#[tokio::test] +async fn should_attach_skew_join_when_left_upstream_is_skewed() +-> datafusion::error::Result<()> { + let ctx = skew_join_context(4, true); + register_partitioned_table(&ctx, "t1", 4)?; + register_partitioned_table(&ctx, "t2", 4)?; + + let plan = ctx + .sql("select t1.a, t2.b from t1 join t2 on t1.c = t2.c") + .await? + .create_physical_plan() + .await?; + let mut planner = + AdaptivePlanner::try_new(ctx.state().config(), plan, "test_job".to_string())?; + + let stages = planner.runnable_stages()?.unwrap(); + assert_eq!(2, stages.len()); + + // Left (stage 0): upstream 0 is fat — 4 mappers @ 150 bytes (= 600 + // total) while the other 3 upstreams are at 10 bytes each. Right: uniform 10. + planner + .finalise_stage_internal(0, partitions_with_one_skewed(4, 0, 4, 150, 10))?; + planner.finalise_stage_internal(1, passthrough_partitions(&[10; 4]))?; + + // Surface the join stage so OptimizeSkewedJoinRule fires and attaches. + let _ = planner.runnable_stages()?; + + assert_plan!(planner.current_plan(), @ " + AdaptiveDatafusionExec: is_final=true, plan_id=2, stage_id=2, stage_resolved=false + ProjectionExec: expr=[a@0 as a, b@2 as b] + SortMergeJoinExec: join_type=Inner, on=[(c@1, c@1)] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=0, stage_id=0, stage_resolved=true, skew_join=7 of 4 + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=1, stage_id=1, stage_resolved=true, skew_join=7 of 4 + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + "); + + Ok(()) +} + +/// Disabled path: even with one fat partition, the rule short-circuits +/// at the first statement of `optimize()` and the plan flows through +/// untouched. Neither leaf gets `skew_join=`. +#[tokio::test] +async fn should_skip_skew_join_when_rule_disabled() -> datafusion::error::Result<()> { + let ctx = skew_join_context(4, false); + register_partitioned_table(&ctx, "t1", 4)?; + register_partitioned_table(&ctx, "t2", 4)?; + + let plan = ctx + .sql("select t1.a, t2.b from t1 join t2 on t1.c = t2.c") + .await? + .create_physical_plan() + .await?; + let mut planner = + AdaptivePlanner::try_new(ctx.state().config(), plan, "test_job".to_string())?; + + let _ = planner.runnable_stages()?.unwrap(); + planner + .finalise_stage_internal(0, partitions_with_one_skewed(4, 0, 4, 150, 10))?; + planner.finalise_stage_internal(1, passthrough_partitions(&[10; 4]))?; + + let _ = planner.runnable_stages()?; + + assert_plan!(planner.current_plan(), @ " + AdaptiveDatafusionExec: is_final=true, plan_id=2, stage_id=2, stage_resolved=false + ProjectionExec: expr=[a@0 as a, b@2 as b] + SortMergeJoinExec: join_type=Inner, on=[(c@1, c@1)] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=0, stage_id=0, stage_resolved=true + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=1, stage_id=1, stage_resolved=true + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + "); + + Ok(()) +} + +/// Below-threshold path: ratio meets `factor` but absolute bytes don't meet +/// the threshold (the dual guard). Rule bails — no skew_join attached. +/// +/// Left `[80, 5, 5, 5]`: median=5, ratio=16× (well over 5.0) BUT +/// 80 < threshold=100. is_skewed returns false. Right uniform 5 → no skew. +/// Bail with "no upstream qualifies as skewed". +#[tokio::test] +async fn should_skip_skew_join_when_below_threshold() -> datafusion::error::Result<()> { + let ctx = skew_join_context(4, true); + register_partitioned_table(&ctx, "t1", 4)?; + register_partitioned_table(&ctx, "t2", 4)?; + + let plan = ctx + .sql("select t1.a, t2.b from t1 join t2 on t1.c = t2.c") + .await? + .create_physical_plan() + .await?; + let mut planner = + AdaptivePlanner::try_new(ctx.state().config(), plan, "test_job".to_string())?; + + let _ = planner.runnable_stages()?.unwrap(); + planner.finalise_stage_internal(0, passthrough_partitions(&[80, 5, 5, 5]))?; + planner.finalise_stage_internal(1, passthrough_partitions(&[5; 4]))?; + + let _ = planner.runnable_stages()?; + + assert_plan!(planner.current_plan(), @ " + AdaptiveDatafusionExec: is_final=true, plan_id=2, stage_id=2, stage_resolved=false + ProjectionExec: expr=[a@0 as a, b@2 as b] + SortMergeJoinExec: join_type=Inner, on=[(c@1, c@1)] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=0, stage_id=0, stage_resolved=true + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=1, stage_id=1, stage_resolved=true + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + "); + + Ok(()) +} + +/// Uniform-bytes path: no upstream qualifies because all are at the same +/// size, so the ratio check (factor × median = same as each value) trivially +/// fails. Bail. +#[tokio::test] +async fn should_skip_skew_join_when_bytes_are_uniform() -> datafusion::error::Result<()> +{ + let ctx = skew_join_context(4, true); + register_partitioned_table(&ctx, "t1", 4)?; + register_partitioned_table(&ctx, "t2", 4)?; + + let plan = ctx + .sql("select t1.a, t2.b from t1 join t2 on t1.c = t2.c") + .await? + .create_physical_plan() + .await?; + let mut planner = + AdaptivePlanner::try_new(ctx.state().config(), plan, "test_job".to_string())?; + + let _ = planner.runnable_stages()?.unwrap(); + // Each upstream is well above threshold=100, but every value is identical + // so median = each value, ratio is 1×, fails factor=5. + planner.finalise_stage_internal(0, passthrough_partitions(&[500; 4]))?; + planner.finalise_stage_internal(1, passthrough_partitions(&[500; 4]))?; + + let _ = planner.runnable_stages()?; + + assert_plan!(planner.current_plan(), @ " + AdaptiveDatafusionExec: is_final=true, plan_id=2, stage_id=2, stage_resolved=false + ProjectionExec: expr=[a@0 as a, b@2 as b] + SortMergeJoinExec: join_type=Inner, on=[(c@1, c@1)] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=0, stage_id=0, stage_resolved=true + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + SortExec: expr=[c@1 ASC], preserve_partitioning=[true] + ExchangeExec: partitioning=Hash([c@1], 4), plan_id=1, stage_id=1, stage_resolved=true + DataSourceExec: partitions=4, partition_sizes=[1, 1, 1, 1] + "); + + Ok(()) +} + +/// Sort-merge → hash join: same rule on the other join shape. Force hash +/// join (`prefer_hash_join=true`) and check the rule still fires on the leaf +/// Exchanges below the HashJoinExec. +#[tokio::test] +async fn should_attach_skew_join_on_hash_join() -> datafusion::error::Result<()> { + let config = SessionConfig::new_with_ballista() + .with_target_partitions(4) + .with_round_robin_repartition(false) + .with_ballista_skew_join_enabled(true) + .with_ballista_skew_join_skewed_partition_factor(5.0) + .with_ballista_skew_join_skewed_partition_threshold_bytes(100) + .with_ballista_skew_join_advisory_partition_bytes(50) + .with_ballista_skew_join_small_partition_factor(0.2) + .set_bool("datafusion.optimizer.prefer_hash_join", true); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + let ctx = SessionContext::new_with_state(state); + register_partitioned_table(&ctx, "t1", 4)?; + register_partitioned_table(&ctx, "t2", 4)?; + + let plan = ctx + .sql("select t1.a, t2.b from t1 join t2 on t1.c = t2.c") + .await? + .create_physical_plan() + .await?; + let mut planner = + AdaptivePlanner::try_new(ctx.state().config(), plan, "test_job".to_string())?; + + let _ = planner.runnable_stages()?.unwrap(); + planner + .finalise_stage_internal(0, partitions_with_one_skewed(4, 0, 4, 150, 10))?; + planner.finalise_stage_internal(1, passthrough_partitions(&[10; 4]))?; + + let _ = planner.runnable_stages()?; + + // Both leaves get skew_join=7 of 4 — same K' as the SMJ case because + // the upstream byte distribution and the rule's bin-packing are + // identical. Join op above just differs. + let display = + datafusion::physical_plan::displayable(planner.current_plan()).indent(true); + let rendered = display.to_string(); + assert!( + rendered.contains("HashJoinExec: mode=Partitioned"), + "expected HashJoinExec(Partitioned) in plan, got:\n{rendered}" + ); + assert_eq!( + rendered.matches("skew_join=7 of 4").count(), + 2, + "both leaf ExchangeExecs should carry skew_join=7 of 4, got:\n{rendered}" + ); + + Ok(()) +} + +/// End-to-end: after the rule attaches `skew_join=K' of M` to both leaves, +/// the adapter must build both downstream `ShuffleReaderExec`s with K' +/// partitions. The next runnable stage's plan tree is the proof. +#[tokio::test] +async fn shuffle_readers_use_skew_join_kprime_when_rule_fires() +-> datafusion::error::Result<()> { + let ctx = skew_join_context(4, true); + register_partitioned_table(&ctx, "t1", 4)?; + register_partitioned_table(&ctx, "t2", 4)?; + + let plan = ctx + .sql("select t1.a, t2.b from t1 join t2 on t1.c = t2.c") + .await? + .create_physical_plan() + .await?; + let mut planner = + AdaptivePlanner::try_new(ctx.state().config(), plan, "test_job".to_string())?; + + let stages = planner.runnable_stages()?.unwrap(); + assert_eq!(2, stages.len()); + + planner + .finalise_stage_internal(0, partitions_with_one_skewed(4, 0, 4, 150, 10))?; + planner.finalise_stage_internal(1, passthrough_partitions(&[10; 4]))?; + + let stages = planner.runnable_stages()?.unwrap(); + assert_eq!(1, stages.len()); + + // The join stage's ShuffleReaderExecs now both expose K'=7 partitions. + // The skew-join carrier's decision has flowed through the adapter into + // the runnable plan. + let rendered = + datafusion::physical_plan::displayable(stages[0].plan.as_ref()) + .indent(true) + .to_string(); + assert_eq!( + rendered.matches("ShuffleReaderExec: partitioning: Hash([c@1], 7)").count(), + 2, + "both readers should expose K'=7 partitions, got:\n{rendered}" + ); + assert_eq!( + rendered.matches("skew_join: 7 of 4").count(), + 2, + "both readers should report the skew_join=7 of 4 source, got:\n{rendered}" + ); + + Ok(()) +} From be8fbd4ee236d452f7eb64a0e931cceeac8c30ee Mon Sep 17 00:00:00 2001 From: xuanyili Date: Wed, 27 May 2026 06:53:01 +0000 Subject: [PATCH 4/4] =?UTF-8?q?feat(aqe):=20correctness=20guards=20for=20s?= =?UTF-8?q?kew=5Fjoin=20=E2=80=94=20UnknownPartitioning=20+=20dynamic-filt?= =?UTF-8?q?er=20mutual=20exclusion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After investigating whether DataFusion's SortMergeJoinExec needs an is_skew_join flag analogous to Spark's, the answer is: no flag is needed on the join itself (DF's SMJ has no global per-key invariant the rewrite violates). But two correctness gaps in the adapter / rule pipeline did surface, and this commit closes both. B-fix (adapter, always-on): the skew_join adapter arm now returns Partitioning::UnknownPartitioning(K') instead of Hash(keys, K'). Splitting a hash bucket across multiple downstream partitions means the same key now lives in multiple partitions, so the data is not truly hash-partitioned at K' — claiming Hash would mislead downstream operators (other joins, hash aggregates, DataFusion's per-partition dynamic-filter routing) into trust they shouldn't have. UnknownPartitioning is the honest declaration; within the current stage the join above still works because each task receives a properly-paired (left-shard, right-shard) bundle. D-fix (rule, mutual exclusion): DataFusion 53.1's HashJoin dynamic filter pushdown builds a CASE expression keyed on hash(join_keys) % K' that routes each probe row to one partition's bounds. The skew rewrite intentionally violates that hash-co-location invariant, so the routed CASE would filter out probe rows whose matches live in a different partition → silent wrong results. The default for optimizer.enable_join_dynamic_filter_pushdown in DF 53.1 is true, so this is a today-problem, not future-proofing. Picked mutual exclusion (option 3 of 4) for v1: when the DF option is on, the rule refuses to fire with a clear log line. Users opt into one or the other, not both. Alternatives — documentation only (rejected, DF default is on), plan-mutation to clear dynamic filters (more invasive), and upstream DF change to make per-partition CASE skew-aware (tracked at ~/mydocs/datafusion/aqe-tasks/11-skew-compatible-hash-join-dynamic-filter.md as a follow-up) — are all documented in the rule's source comment. Tests: existing five fire/non-fire skew_join tests pre-set enable_join_dynamic_filter_pushdown=false (via a comment in skew_join_context); one new test should_bail_when_dynamic_filter_pushdown_enabled covers the mutual-exclusion bail path. The shuffle-reader integration test now expects UnknownPartitioning(7) instead of Hash([c@1], 7), with a docstring explaining why the rewrite must declare unknown partitioning. All 70 AQE tests pass (63 prior + 6 from C3 + 1 new in C4). Co-Authored-By: Claude Sonnet 4.6 (1M context) --- ballista/scheduler/src/state/aqe/adapter.rs | 30 ++++--- .../optimizer_rule/optimize_skewed_join.rs | 40 +++++++++ .../src/state/aqe/test/skew_join_rule.rs | 82 +++++++++++++++++-- 3 files changed, 136 insertions(+), 16 deletions(-) diff --git a/ballista/scheduler/src/state/aqe/adapter.rs b/ballista/scheduler/src/state/aqe/adapter.rs index 75f78054a4..2906127f4f 100644 --- a/ballista/scheduler/src/state/aqe/adapter.rs +++ b/ballista/scheduler/src/state/aqe/adapter.rs @@ -126,16 +126,26 @@ impl BallistaAdapter { }; k_shape.push(assigned); } - // Preserve the hash partitioning width at K'; the - // OptimizeSkewedJoinRule's join-side fix (`is_skew_join` - // in C4) is what relaxes the "same key in one partition" - // invariant downstream. - let new_partitioning = match &partitioning { - Partitioning::Hash(keys, _m) => { - Partitioning::Hash(keys.clone(), sp.shards.len()) - } - _ => Partitioning::UnknownPartitioning(sp.shards.len()), - }; + // Always degrade to UnknownPartitioning at K' for the + // skew-rewritten reader. + // + // The skewed-side rewrite splits one hash bucket across + // multiple downstream partitions (by mapper index), so + // the SAME join key now appears in multiple partitions. + // The data is no longer truly hash-partitioned on the + // join key — claiming Hash(K') would be a lie that + // downstream operators (other joins, hash aggregates, + // DataFusion's per-partition dynamic-filter routing) + // would trust and produce wrong results from. + // + // UnknownPartitioning(K') is the honest declaration: + // "K' partitions, no hash invariant". Inside the + // current stage the join above this reader still works + // because each task receives a properly-paired + // (left-shard, right-shard) bundle — co-location holds + // per-task even though it doesn't hold globally. + let new_partitioning = + Partitioning::UnknownPartitioning(sp.shards.len()); ShuffleReaderExec::try_new_skew_join( stage_id, k_shape, diff --git a/ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs b/ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs index 1d2e009c19..66232a4216 100644 --- a/ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs +++ b/ballista/scheduler/src/state/aqe/optimizer_rule/optimize_skewed_join.rs @@ -118,6 +118,46 @@ impl PhysicalOptimizerRule for OptimizeSkewedJoinRule { if !bc.skew_join_enabled() { return Ok(plan); } + + // Mutual-exclusion guard with DataFusion's HashJoin dynamic-filter + // pushdown. The dynamic filter builds a CASE expression keyed on + // `hash(join_keys) % K'` that routes each probe row to one + // partition's bounds. The skew rewrite intentionally violates the + // hash-co-location invariant that routing assumes (the same key + // now lives in multiple partitions of the split side), so the + // routed CASE would filter out probe rows whose matches live in + // a different partition than `hash(key) % K'` selects → silent + // wrong results. + // + // The default for `enable_join_dynamic_filter_pushdown` in DF + // 53.1 is `true`, so users wanting skew rewrite must explicitly + // set it to `false`. Picked mutual exclusion (option 3 below) + // for v1. + // + // Alternatives considered for future work: + // - **Plan mutation**: walk the subtree and rebuild any + // HashJoinExec with `.with_dynamic_filter(None)` before + // attaching skew_join. Targeted but more invasive — requires + // re-running `with_new_children` up the chain and reasoning + // about Arc identity for the carrier slots. + // - **Upstream DF fix**: add a "skew-compatible" fallback in + // `shared_bounds.rs` that uses union bounds instead of + // partition-routed bounds. Once landed, this guard becomes a + // `pass is_hash_co_located=false` call instead of a bail. + // + // Placed before the plan walk so we don't pay traversal cost + // when the user has both flags on (a real misconfiguration the + // log explicitly calls out). + if config.optimizer.enable_join_dynamic_filter_pushdown { + debug!( + "[skew-join-rule] optimizer.enable_join_dynamic_filter_pushdown=true; \ + mutually exclusive with skew_join rewrite (DataFusion's per-partition \ + dynamic-filter routing is incompatible with split shards). \ + Bail. Disable one or the other to proceed." + ); + return Ok(plan); + } + let factor = bc.skew_join_skewed_partition_factor(); let threshold_bytes = bc.skew_join_skewed_partition_threshold_bytes(); let advisory_bytes = bc.skew_join_advisory_partition_bytes(); diff --git a/ballista/scheduler/src/state/aqe/test/skew_join_rule.rs b/ballista/scheduler/src/state/aqe/test/skew_join_rule.rs index 3c4c98007f..838d7cb8c8 100644 --- a/ballista/scheduler/src/state/aqe/test/skew_join_rule.rs +++ b/ballista/scheduler/src/state/aqe/test/skew_join_rule.rs @@ -46,6 +46,10 @@ use std::sync::Arc; /// - `threshold_bytes = 100` (vs 256 MiB in prod) — anything >100 may skew /// - `advisory_bytes = 50` (vs 64 MiB in prod) — sub-shard target ≈ 50 /// - `small_factor = 0.2` (Spark default) +/// +/// Also disables `optimizer.enable_join_dynamic_filter_pushdown` — the rule +/// enforces mutual exclusion with that option (see optimize_skewed_join.rs +/// for the rationale). Tests that want the rule to fire must keep it off. fn skew_join_context(target_partitions: usize, enabled: bool) -> SessionContext { let config = SessionConfig::new_with_ballista() .with_target_partitions(target_partitions) @@ -54,7 +58,8 @@ fn skew_join_context(target_partitions: usize, enabled: bool) -> SessionContext .with_ballista_skew_join_skewed_partition_factor(5.0) .with_ballista_skew_join_skewed_partition_threshold_bytes(100) .with_ballista_skew_join_advisory_partition_bytes(50) - .with_ballista_skew_join_small_partition_factor(0.2); + .with_ballista_skew_join_small_partition_factor(0.2) + .set_bool("datafusion.optimizer.enable_join_dynamic_filter_pushdown", false); let state = SessionStateBuilder::new() .with_config(config) @@ -335,7 +340,8 @@ async fn should_attach_skew_join_on_hash_join() -> datafusion::error::Result<()> .with_ballista_skew_join_skewed_partition_threshold_bytes(100) .with_ballista_skew_join_advisory_partition_bytes(50) .with_ballista_skew_join_small_partition_factor(0.2) - .set_bool("datafusion.optimizer.prefer_hash_join", true); + .set_bool("datafusion.optimizer.prefer_hash_join", true) + .set_bool("datafusion.optimizer.enable_join_dynamic_filter_pushdown", false); let state = SessionStateBuilder::new() .with_config(config) @@ -407,17 +413,21 @@ async fn shuffle_readers_use_skew_join_kprime_when_rule_fires() let stages = planner.runnable_stages()?.unwrap(); assert_eq!(1, stages.len()); - // The join stage's ShuffleReaderExecs now both expose K'=7 partitions. - // The skew-join carrier's decision has flowed through the adapter into + // The join stage's ShuffleReaderExecs now both expose K'=7 partitions, + // declared as UnknownPartitioning — the skew rewrite is honest about + // having broken the per-partition hash invariant. (Inside the stage, + // each task still receives properly-paired (left-shard, right-shard) + // input bundles, so the join above the reader still works.) The + // skew-join carrier's decision has flowed through the adapter into // the runnable plan. let rendered = datafusion::physical_plan::displayable(stages[0].plan.as_ref()) .indent(true) .to_string(); assert_eq!( - rendered.matches("ShuffleReaderExec: partitioning: Hash([c@1], 7)").count(), + rendered.matches("ShuffleReaderExec: partitioning: UnknownPartitioning(7)").count(), 2, - "both readers should expose K'=7 partitions, got:\n{rendered}" + "both readers should expose K'=7 UnknownPartitioning, got:\n{rendered}" ); assert_eq!( rendered.matches("skew_join: 7 of 4").count(), @@ -427,3 +437,63 @@ async fn shuffle_readers_use_skew_join_kprime_when_rule_fires() Ok(()) } + +/// Mutual exclusion with `optimizer.enable_join_dynamic_filter_pushdown`: +/// even when skew_join.enabled=true AND the upstream byte distribution +/// would otherwise trigger a rewrite, the rule bails because DataFusion +/// 53.1's per-partition dynamic-filter routing is incompatible with the +/// split shards (see optimize_skewed_join.rs for the rationale). Neither +/// leaf gets a skew_join attached. +#[tokio::test] +async fn should_bail_when_dynamic_filter_pushdown_enabled() +-> datafusion::error::Result<()> { + // Same setup as the happy-path test, but with the DF dynamic filter + // option flipped back on. Skew distribution alone would otherwise + // trigger the rule. + let config = SessionConfig::new_with_ballista() + .with_target_partitions(4) + .with_round_robin_repartition(false) + .with_ballista_skew_join_enabled(true) + .with_ballista_skew_join_skewed_partition_factor(5.0) + .with_ballista_skew_join_skewed_partition_threshold_bytes(100) + .with_ballista_skew_join_advisory_partition_bytes(50) + .with_ballista_skew_join_small_partition_factor(0.2) + .set_bool("datafusion.optimizer.enable_join_dynamic_filter_pushdown", true); + + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + let ctx = SessionContext::new_with_state(state); + register_partitioned_table(&ctx, "t1", 4)?; + register_partitioned_table(&ctx, "t2", 4)?; + + let plan = ctx + .sql("select t1.a, t2.b from t1 join t2 on t1.c = t2.c") + .await? + .create_physical_plan() + .await?; + let mut planner = + AdaptivePlanner::try_new(ctx.state().config(), plan, "test_job".to_string())?; + + let _ = planner.runnable_stages()?.unwrap(); + planner + .finalise_stage_internal(0, partitions_with_one_skewed(4, 0, 4, 150, 10))?; + planner.finalise_stage_internal(1, passthrough_partitions(&[10; 4]))?; + + let _ = planner.runnable_stages()?; + + // Neither leaf carries skew_join=… — the mutual-exclusion guard fired. + let rendered = + datafusion::physical_plan::displayable(planner.current_plan()) + .indent(true) + .to_string(); + assert_eq!( + rendered.matches("skew_join=").count(), + 0, + "rule must not attach skew_join when dynamic filter pushdown is on, got:\n{rendered}" + ); + + Ok(()) +} +