From 62e49dab29656966c3f58d29b7df014a0eb67407 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 17 Jun 2026 00:23:04 -0700 Subject: [PATCH 1/4] address_sched_crash_high_partitions --- .../src/execution_plans/shuffle_reader.rs | 34 +++--- .../core/src/serde/scheduler/from_proto.rs | 17 ++- ballista/core/src/serde/scheduler/mod.rs | 26 +++- ballista/core/src/serde/scheduler/to_proto.rs | 9 +- ballista/scheduler/src/planner.rs | 2 +- .../scheduler/src/scheduler_server/mod.rs | 5 +- ballista/scheduler/src/state/aqe/mod.rs | 2 +- .../src/state/aqe/test/alter_stages.rs | 4 +- .../src/state/aqe/test/coalesce_rule.rs | 2 +- ballista/scheduler/src/state/aqe/test/mod.rs | 4 +- .../scheduler/src/state/execution_graph.rs | 15 ++- .../scheduler/src/state/execution_stage.rs | 8 +- python/Cargo.lock | 2 + q9.log | 115 ++++++++++++++++++ 14 files changed, 203 insertions(+), 42 deletions(-) create mode 100644 q9.log diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index 8311c2a6a0..9b5dade96b 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -380,7 +380,7 @@ impl ExecutionPlan for ShuffleReaderExec { let mut partition_locations = HashMap::new(); for p in &self.partition[partition] { partition_locations - .entry(p.executor_meta.id.clone()) + .entry(p.partition_location_metadata.id.clone()) .or_insert_with(Vec::new) .push(p.clone()); } @@ -747,7 +747,7 @@ async fn fetch_partition_remote( customize_endpoint: Option>, client_pool: Option>, ) -> result::Result { - let metadata = &location.executor_meta; + let metadata = &location.partition_location_metadata; let partition_id = &location.partition_id; let file_id = location.file_id; let is_sort_shuffle = location.is_sort_shuffle; @@ -817,7 +817,7 @@ fn fetch_partition_local( sort_shuffle_enabled: bool, ) -> result::Result { let path = &location.path(work_dir)?; - let metadata = &location.executor_meta; + let metadata = &location.partition_location_metadata; let partition_id = &location.partition_id; let data_path = std::path::Path::new(path); @@ -1022,7 +1022,7 @@ mod tests { stage_id, partition_id: i, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(ExecutorMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, @@ -1030,7 +1030,7 @@ mod tests { specification: ExecutorSpecification::default() .with_task_slots(1), os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(rows), num_batches: None, @@ -1141,14 +1141,14 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(ExecutorMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, specification: ExecutorSpecification::default().with_task_slots(1), os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(1), num_batches: None, @@ -1192,14 +1192,14 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(ExecutorMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, specification: ExecutorSpecification::default().with_task_slots(1), os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(1), num_batches: None, @@ -1244,14 +1244,14 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(ExecutorMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, specification: ExecutorSpecification::default().with_task_slots(1), os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(1), num_batches: None, @@ -1296,14 +1296,14 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(ExecutorMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, specification: ExecutorSpecification::default().with_task_slots(1), os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: Default::default(), file_id: None, is_sort_shuffle: false, @@ -1486,14 +1486,14 @@ mod tests { stage_id: 1, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(ExecutorMetadata { id: format!("exec{partition_id}"), host: "localhost".to_string(), port: 50051, grpc_port: 50052, specification: ExecutorSpecification::default().with_task_slots(12), os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: Default::default(), file_id, is_sort_shuffle: false, @@ -1722,14 +1722,14 @@ mod tests { stage_id: 7, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(ExecutorMetadata { id: format!("exec-{partition_id}"), host: "localhost".to_string(), port: 50051, grpc_port: 50052, specification: ExecutorSpecification::default(), os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats::new(Some(100), Some(1), Some(1024)), file_id: None, is_sort_shuffle: false, diff --git a/ballista/core/src/serde/scheduler/from_proto.rs b/ballista/core/src/serde/scheduler/from_proto.rs index b99cb274fb..9e76cd88d9 100644 --- a/ballista/core/src/serde/scheduler/from_proto.rs +++ b/ballista/core/src/serde/scheduler/from_proto.rs @@ -38,7 +38,7 @@ use crate::serde::protobuf::{NamedPruningMetrics, NamedRatio}; use crate::serde::scheduler::{ Action, BallistaFunctionRegistry, ExecutorData, ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, PartitionId, - PartitionLocation, PartitionStats, TaskDefinition, + PartitionLocation, PartitionLocationMetadata, PartitionStats, TaskDefinition, }; use crate::RuntimeProducer; @@ -108,14 +108,19 @@ impl TryInto for protobuf::PartitionLocation { ) })? .into(), - executor_meta: self - .executor_meta - .ok_or_else(|| { + partition_location_metadata: { + let m = self.executor_meta.ok_or_else(|| { BallistaError::General( "executor_meta in PartitionLocation is missing".to_owned(), ) - })? - .into(), + })?; + Arc::new(PartitionLocationMetadata { + id: m.id, + host: m.host, + port: m.port as u16, + grpc_port: m.grpc_port as u16, + }) + }, partition_stats: self .partition_stats .ok_or_else(|| { diff --git a/ballista/core/src/serde/scheduler/mod.rs b/ballista/core/src/serde/scheduler/mod.rs index eca6700d99..f27134c813 100644 --- a/ballista/core/src/serde/scheduler/mod.rs +++ b/ballista/core/src/serde/scheduler/mod.rs @@ -79,6 +79,30 @@ impl PartitionId { } } +/// Lean exec information for a shuffle partition. +#[derive(Debug, Clone)] +pub struct PartitionLocationMetadata { + /// Unique executor identifier. + pub id: String, + /// Hostname or IP address of the executor. + pub host: String, + /// Port number for data transfer. + pub port: u16, + /// Port number for gRPC communication. + pub grpc_port: u16, +} + +impl From<&ExecutorMetadata> for PartitionLocationMetadata { + fn from(m: &ExecutorMetadata) -> Self { + Self { + id: m.id.clone(), + host: m.host.clone(), + port: m.port, + grpc_port: m.grpc_port, + } + } +} + /// Location information for a shuffle partition. #[derive(Debug, Clone)] pub struct PartitionLocation { @@ -87,7 +111,7 @@ pub struct PartitionLocation { /// The partition identifier. pub partition_id: PartitionId, /// Metadata about the executor hosting this partition. - pub executor_meta: ExecutorMetadata, + pub partition_location_metadata: Arc, /// Statistics about the partition data. pub partition_stats: PartitionStats, /// shuffle file id diff --git a/ballista/core/src/serde/scheduler/to_proto.rs b/ballista/core/src/serde/scheduler/to_proto.rs index d09dbad17a..ba59a6c8e4 100644 --- a/ballista/core/src/serde/scheduler/to_proto.rs +++ b/ballista/core/src/serde/scheduler/to_proto.rs @@ -78,7 +78,14 @@ impl TryInto for PartitionLocation { Ok(protobuf::PartitionLocation { map_partition_id: self.map_partition_id as u32, partition_id: Some(self.partition_id.into()), - executor_meta: Some(self.executor_meta.into()), + executor_meta: Some(protobuf::ExecutorMetadata { + id: self.partition_location_metadata.id.clone(), + host: self.partition_location_metadata.host.clone(), + port: self.partition_location_metadata.port as u32, + grpc_port: self.partition_location_metadata.grpc_port as u32, + specification: None, + os_info: None, + }), partition_stats: Some(self.partition_stats.into()), file_id: self.file_id, is_sort_shuffle: self.is_sort_shuffle, diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index 04405d9fd2..72967e51b4 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -1048,7 +1048,7 @@ order by stage_id: 42, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: ExecutorMetadata { id: format!("exec-{partition_id}"), host: "localhost".to_string(), port: 50050, diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 9c9f8c2307..a083af9972 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -551,7 +551,10 @@ mod test { assert_eq!(final_graph.output_locations().len(), 4); for output_location in final_graph.output_locations() { - assert_eq!(output_location.executor_meta.host, "localhost1".to_owned()) + assert_eq!( + output_location.partition_location_metadata.host, + "localhost1".to_owned() + ) } Ok(()) diff --git a/ballista/scheduler/src/state/aqe/mod.rs b/ballista/scheduler/src/state/aqe/mod.rs index f069714e6d..94d5fe6215 100644 --- a/ballista/scheduler/src/state/aqe/mod.rs +++ b/ballista/scheduler/src/state/aqe/mod.rs @@ -413,7 +413,7 @@ impl AdaptiveExecutionGraph { stage_output.partition_locations.iter_mut().for_each( |(_partition, locs)| { let before_len = locs.len(); - locs.retain(|loc| loc.executor_meta.id != executor_id); + locs.retain(|loc| loc.partition_location_metadata.id != executor_id); if locs.len() < before_len { match_found = true; } diff --git a/ballista/scheduler/src/state/aqe/test/alter_stages.rs b/ballista/scheduler/src/state/aqe/test/alter_stages.rs index e38d4f6436..c64e1b86e5 100644 --- a/ballista/scheduler/src/state/aqe/test/alter_stages.rs +++ b/ballista/scheduler/src/state/aqe/test/alter_stages.rs @@ -534,7 +534,7 @@ fn small_statistics_exchange() -> Vec> { stage_id: 0, partition_id: 0, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: ExecutorMetadata { id: "".to_string(), host: "".to_string(), port: 0, @@ -566,7 +566,7 @@ fn big_statistics_exchange() -> Vec> { stage_id: 0, partition_id: 0, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: ExecutorMetadata { id: "".to_string(), host: "".to_string(), port: 0, diff --git a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs index 2c98003fad..876cdd1fde 100644 --- a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs +++ b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs @@ -101,7 +101,7 @@ fn partitions_with_byte_sizes( stage_id: 0, partition_id: idx, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: ExecutorMetadata { id: "".to_string(), host: "".to_string(), port: 0, diff --git a/ballista/scheduler/src/state/aqe/test/mod.rs b/ballista/scheduler/src/state/aqe/test/mod.rs index 939347359f..8a03e7b32b 100644 --- a/ballista/scheduler/src/state/aqe/test/mod.rs +++ b/ballista/scheduler/src/state/aqe/test/mod.rs @@ -47,7 +47,7 @@ pub(crate) fn mock_partitions_with_statistics() -> Vec> { stage_id: 0, partition_id: 0, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: ExecutorMetadata { id: "".to_string(), host: "".to_string(), port: 0, @@ -72,7 +72,7 @@ pub(crate) fn mock_partitions_with_statistics_no_data() -> Vec, ) -> Vec { + let partition_metadata = Arc::new(PartitionLocationMetadata::from(exec_metadata)); shuffles .into_iter() .map(|shuffle| PartitionLocation { @@ -1746,7 +1748,7 @@ pub(crate) fn partition_to_location( stage_id, partition_id: shuffle.partition_id as usize, }, - executor_meta: executor.clone(), + partition_location_metadata: Arc::clone(&partition_metadata), partition_stats: PartitionStats::new( Some(shuffle.num_rows), Some(shuffle.num_batches), @@ -1881,7 +1883,10 @@ mod test { let outputs = agg_graph.output_locations(); for location in outputs { - assert_eq!(location.executor_meta.host, "localhost2".to_owned()); + assert_eq!( + location.partition_location_metadata.host, + "localhost2".to_owned() + ); } Ok(()) diff --git a/ballista/scheduler/src/state/execution_stage.rs b/ballista/scheduler/src/state/execution_stage.rs index a9695d3b8b..8c2a28add0 100644 --- a/ballista/scheduler/src/state/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_stage.rs @@ -342,12 +342,12 @@ impl UnresolvedStage { .iter_mut() .for_each(|(_partition, locs)| { locs.iter().for_each(|loc| { - if loc.executor_meta.id == executor_id { + if loc.partition_location_metadata.id == executor_id { bad_map_partitions.insert(loc.map_partition_id); } }); - locs.retain(|loc| loc.executor_meta.id != executor_id); + locs.retain(|loc| loc.partition_location_metadata.id != executor_id); }); stage_output.complete = false; Ok(bad_map_partitions) @@ -824,12 +824,12 @@ impl RunningStage { .iter_mut() .for_each(|(_partition, locs)| { locs.iter().for_each(|loc| { - if loc.executor_meta.id == executor_id { + if loc.partition_location_metadata.id == executor_id { bad_map_partitions.insert(loc.map_partition_id); } }); - locs.retain(|loc| loc.executor_meta.id != executor_id); + locs.retain(|loc| loc.partition_location_metadata.id != executor_id); }); stage_output.complete = false; Ok(bad_map_partitions) diff --git a/python/Cargo.lock b/python/Cargo.lock index 8ccc4358b7..dc1c986dc8 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -642,6 +642,7 @@ dependencies = [ "futures", "http", "insta", + "itertools", "log", "object_store", "parking_lot", @@ -652,6 +653,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tower-http", "uuid", ] diff --git a/q9.log b/q9.log new file mode 100644 index 0000000000..cedc0eab41 --- /dev/null +++ b/q9.log @@ -0,0 +1,115 @@ + Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.35s + Running `target/debug/ballista-cli` +Ballista CLI v53.0.0 +[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ \q [?2026l[?2026h ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, + case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, + case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, + case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, + case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 + +; [?2026l[?2004l +Collection([Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(1,40)..Location(1,51))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(4,42)..Location(4,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(7,42)..Location(7,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(10,42)..Location(10,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(13,42)..Location(13,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found"))]) +[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ CREATE EXTERNAL TABLE store_sales STORED AS PARQUET LOCATION '/Users/bhargava/Projects/oss/datafusion/benchmarks/data/tpcds_sf1/store_sales.parquet'; [?2026l[?2004l +0 row(s) fetched. +Elapsed 0.059 seconds. + +[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ CREATE EXTERNAL TABLE reason STORED AS PARQUET LOCATION '/Users/bhargava/Projects/oss/datafusion/benchmarks/data/tpcds_sf1/reason.parquet'; [?2026l[?2004l +0 row(s) fetched. +Elapsed 0.006 seconds. + +[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, + case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, + case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, + case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, + case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 + from reason where r_reason_sk = 1; [?2026l[?2004l +2026-06-11T06:41:53.393166Z  WARN ballista_executor::execution_loop: Executor failed to run task: PartitionId { job_id: "py8NA3j", stage_id: 16, partition_id: 0 }, error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) +2026-06-11T06:41:53.395745Z ERROR ballista_scheduler::scheduler_server::query_stage_scheduler: Job py8NA3j running failed +2026-06-11T06:41:53.396515Z ERROR ballista_core::execution_plans::distributed_query: Job py8NA3j failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) + +ArrowError(ExternalError(Execution("Job py8NA3j failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal(\"Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)\"))\n")), Some("")) +[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ [?2026l[?2026h ❯ + [?2026l[?2026h  ❯ + + [?2026l;[?2004l +Plan("No SQL statements were provided in the query string") +[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ + +; [?2026l[?2026h   ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, + case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, + case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, + case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, + case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 + from reason where r_reason_sk = 1; [?2026l[?2004l +2026-06-11T07:00:31.183600Z  WARN ballista_executor::execution_loop: Executor failed to run task: PartitionId { job_id: "5zg9liN", stage_id: 16, partition_id: 0 }, error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) +2026-06-11T07:00:31.183996Z ERROR ballista_scheduler::scheduler_server::query_stage_scheduler: Job 5zg9liN running failed +2026-06-11T07:00:31.184692Z ERROR ballista_core::execution_plans::distributed_query: Job 5zg9liN failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) + +ArrowError(ExternalError(Execution("Job 5zg9liN failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal(\"Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)\"))\n")), Some("")) +[?2004h[?2026h ❯  \? for help, \q to quit [?2026l2026-06-11T08:01:20.885457Z  WARN ballista_scheduler::scheduler_server: ACTIVE executor 6e7e4ea9-ad5e-469f-b327-829e1cd78ae5 heartbeat timed out after 180s +2026-06-11T08:01:20.892955Z  WARN ballista_scheduler::state::executor_manager: Executor is already dead, failed to connect to Executor 6e7e4ea9-ad5e-469f-b327-829e1cd78ae5 +[?2026h ❯ w [?2026lait is this AWE QE ?[?2026h ❯ wait is this AQE ? + [?2026l;[?2026h  ❯ wait is this AQE ? +; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? +; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? +; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? +; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? +; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? +; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, + case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, + case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, + case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, + case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 + then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) + else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 + from reason where r_reason_sk = 1; [?2026l[?2004l +^C +[?2004h[?2026h ❯ [?2026l[?2026h ❯ + [?2026l[?2026h  ❯ + + [?2026l\q[?2026h   ❯ + +\q + [?2026l[?2004l +^C +[?2004h[?2026h ❯ [?2026l/q[?2026h ❯ /q + [?2026l[?2004l +\q From 3b8041f2eea907e42d2238fbe4cdcfd893954f04 Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 17 Jun 2026 00:34:11 -0700 Subject: [PATCH 2/4] address_sched_crash_high_partitions --- .../src/execution_plans/shuffle_reader.rs | 34 +++++-------------- ballista/scheduler/src/planner.rs | 6 ++-- .../src/state/aqe/test/alter_stages.rs | 12 +++---- .../src/state/aqe/test/coalesce_rule.rs | 6 ++-- ballista/scheduler/src/state/aqe/test/mod.rs | 12 +++---- 5 files changed, 20 insertions(+), 50 deletions(-) diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index 9b5dade96b..ec7c5a7890 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -980,10 +980,7 @@ impl RecordBatchStream for CoalescedShuffleReaderStream { mod tests { use super::*; use crate::execution_plans::{ShuffleWriterExec, create_shuffle_path}; - use crate::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, - }; + use crate::serde::scheduler::{PartitionId, PartitionLocationMetadata}; use crate::utils; use datafusion::arrow::array::{Int32Array, StringArray, UInt32Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; @@ -1022,14 +1019,11 @@ mod tests { stage_id, partition_id: i, }, - partition_location_metadata: Arc::new(ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default() - .with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), }), partition_stats: PartitionStats { num_rows: Some(rows), @@ -1141,13 +1135,11 @@ mod tests { stage_id: input_stage_id, partition_id, }, - partition_location_metadata: Arc::new(ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), }), partition_stats: PartitionStats { num_rows: Some(1), @@ -1192,13 +1184,11 @@ mod tests { stage_id: input_stage_id, partition_id, }, - partition_location_metadata: Arc::new(ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), }), partition_stats: PartitionStats { num_rows: Some(1), @@ -1244,13 +1234,11 @@ mod tests { stage_id: input_stage_id, partition_id, }, - partition_location_metadata: Arc::new(ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), }), partition_stats: PartitionStats { num_rows: Some(1), @@ -1296,13 +1284,11 @@ mod tests { stage_id: input_stage_id, partition_id, }, - partition_location_metadata: Arc::new(ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), }), partition_stats: Default::default(), file_id: None, @@ -1486,13 +1472,11 @@ mod tests { stage_id: 1, partition_id, }, - partition_location_metadata: Arc::new(ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: format!("exec{partition_id}"), host: "localhost".to_string(), port: 50051, grpc_port: 50052, - specification: ExecutorSpecification::default().with_task_slots(12), - os_info: ExecutorOperatingSystemSpecification::default(), }), partition_stats: Default::default(), file_id, @@ -1722,13 +1706,11 @@ mod tests { stage_id: 7, partition_id, }, - partition_location_metadata: Arc::new(ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: format!("exec-{partition_id}"), host: "localhost".to_string(), port: 50051, grpc_port: 50052, - specification: ExecutorSpecification::default(), - os_info: ExecutorOperatingSystemSpecification::default(), }), partition_stats: PartitionStats::new(Some(100), Some(1), Some(1024)), file_id: None, diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index 72967e51b4..351070408d 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -1048,14 +1048,12 @@ order by stage_id: 42, partition_id, }, - partition_location_metadata: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: format!("exec-{partition_id}"), host: "localhost".to_string(), port: 50050, grpc_port: 50051, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats::new(Some(10), None, Some(1)), file_id: None, is_sort_shuffle: false, diff --git a/ballista/scheduler/src/state/aqe/test/alter_stages.rs b/ballista/scheduler/src/state/aqe/test/alter_stages.rs index c64e1b86e5..f081e3643f 100644 --- a/ballista/scheduler/src/state/aqe/test/alter_stages.rs +++ b/ballista/scheduler/src/state/aqe/test/alter_stages.rs @@ -534,14 +534,12 @@ fn small_statistics_exchange() -> Vec> { stage_id: 0, partition_id: 0, }, - partition_location_metadata: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), // next few properties are needed partition_stats: PartitionStats::new( Some(threshold_num_rows as u64 / 128), @@ -566,14 +564,12 @@ fn big_statistics_exchange() -> Vec> { stage_id: 0, partition_id: 0, }, - partition_location_metadata: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), // next few properties are needed partition_stats: PartitionStats::new( diff --git a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs index 876cdd1fde..0f3e3c763d 100644 --- a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs +++ b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs @@ -101,14 +101,12 @@ fn partitions_with_byte_sizes( stage_id: 0, partition_id: idx, }, - partition_location_metadata: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats::new(Some(1), None, Some(bytes)), file_id: None, is_sort_shuffle: false, diff --git a/ballista/scheduler/src/state/aqe/test/mod.rs b/ballista/scheduler/src/state/aqe/test/mod.rs index 8a03e7b32b..651937d2e8 100644 --- a/ballista/scheduler/src/state/aqe/test/mod.rs +++ b/ballista/scheduler/src/state/aqe/test/mod.rs @@ -47,14 +47,12 @@ pub(crate) fn mock_partitions_with_statistics() -> Vec> { stage_id: 0, partition_id: 0, }, - partition_location_metadata: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), // next few properties are needed partition_stats: PartitionStats::new(Some(42), None, Some(10)), file_id: None, @@ -72,14 +70,12 @@ pub(crate) fn mock_partitions_with_statistics_no_data() -> Vec Date: Wed, 17 Jun 2026 00:36:14 -0700 Subject: [PATCH 3/4] address_sched_crash_high_partitions --- ballista/scheduler/src/planner.rs | 3 +-- ballista/scheduler/src/state/aqe/test/alter_stages.rs | 3 +-- ballista/scheduler/src/state/aqe/test/coalesce_rule.rs | 3 +-- ballista/scheduler/src/state/aqe/test/mod.rs | 3 +-- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index 351070408d..0a8b4007da 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -1031,8 +1031,7 @@ order by { use ballista_core::execution_plans::ShuffleReaderExec; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, - ExecutorSpecification, PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::arrow::datatypes::{DataType, Field, Schema}; diff --git a/ballista/scheduler/src/state/aqe/test/alter_stages.rs b/ballista/scheduler/src/state/aqe/test/alter_stages.rs index f081e3643f..0375a0a1a6 100644 --- a/ballista/scheduler/src/state/aqe/test/alter_stages.rs +++ b/ballista/scheduler/src/state/aqe/test/alter_stages.rs @@ -21,8 +21,7 @@ use crate::state::aqe::test::{ mock_batch, mock_context, mock_partitions_with_statistics_no_data, }; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::stats::Precision; diff --git a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs index 0f3e3c763d..a66362e784 100644 --- a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs +++ b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs @@ -29,8 +29,7 @@ use crate::state::aqe::planner::AdaptivePlanner; use crate::state::aqe::test::{mock_batch, mock_schema}; use ballista_core::extension::SessionConfigExt; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::datasource::MemTable; use datafusion::execution::SessionStateBuilder; diff --git a/ballista/scheduler/src/state/aqe/test/mod.rs b/ballista/scheduler/src/state/aqe/test/mod.rs index 651937d2e8..f873f7b38f 100644 --- a/ballista/scheduler/src/state/aqe/test/mod.rs +++ b/ballista/scheduler/src/state/aqe/test/mod.rs @@ -27,8 +27,7 @@ mod plan_to_stages; use ballista_core::config::BALLISTA_SHUFFLE_SORT_BASED_ENABLED; use ballista_core::extension::SessionConfigExt; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::arrow::array::{Int32Array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; From 90af60db761de0b2831fc053de5d1191819b213c Mon Sep 17 00:00:00 2001 From: Bhargava Vadlamani Date: Wed, 17 Jun 2026 00:39:54 -0700 Subject: [PATCH 4/4] address_sched_crash_high_partitions --- q9.log | 115 --------------------------------------------------------- 1 file changed, 115 deletions(-) delete mode 100644 q9.log diff --git a/q9.log b/q9.log deleted file mode 100644 index cedc0eab41..0000000000 --- a/q9.log +++ /dev/null @@ -1,115 +0,0 @@ - Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.35s - Running `target/debug/ballista-cli` -Ballista CLI v53.0.0 -[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ \q [?2026l[?2026h ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, - case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, - case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, - case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, - case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 - -; [?2026l[?2004l -Collection([Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(1,40)..Location(1,51))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(4,42)..Location(4,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(7,42)..Location(7,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(10,42)..Location(10,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found")), Diagnostic(Diagnostic { kind: Error, message: "table 'store_sales' not found", span: Some(Span(Location(13,42)..Location(13,53))), notes: [], helps: [] }, Plan("table 'datafusion.public.store_sales' not found"))]) -[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ CREATE EXTERNAL TABLE store_sales STORED AS PARQUET LOCATION '/Users/bhargava/Projects/oss/datafusion/benchmarks/data/tpcds_sf1/store_sales.parquet'; [?2026l[?2004l -0 row(s) fetched. -Elapsed 0.059 seconds. - -[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ CREATE EXTERNAL TABLE reason STORED AS PARQUET LOCATION '/Users/bhargava/Projects/oss/datafusion/benchmarks/data/tpcds_sf1/reason.parquet'; [?2026l[?2004l -0 row(s) fetched. -Elapsed 0.006 seconds. - -[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, - case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, - case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, - case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, - case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 - from reason where r_reason_sk = 1; [?2026l[?2004l -2026-06-11T06:41:53.393166Z  WARN ballista_executor::execution_loop: Executor failed to run task: PartitionId { job_id: "py8NA3j", stage_id: 16, partition_id: 0 }, error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) -2026-06-11T06:41:53.395745Z ERROR ballista_scheduler::scheduler_server::query_stage_scheduler: Job py8NA3j running failed -2026-06-11T06:41:53.396515Z ERROR ballista_core::execution_plans::distributed_query: Job py8NA3j failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) - -ArrowError(ExternalError(Execution("Job py8NA3j failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal(\"Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)\"))\n")), Some("")) -[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ [?2026l[?2026h ❯ - [?2026l[?2026h  ❯ - - [?2026l;[?2004l -Plan("No SQL statements were provided in the query string") -[?2004h[?2026h ❯  \? for help, \q to quit [?2026l[?2026h ❯ - -; [?2026l[?2026h   ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, - case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, - case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, - case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, - case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 - from reason where r_reason_sk = 1; [?2026l[?2004l -2026-06-11T07:00:31.183600Z  WARN ballista_executor::execution_loop: Executor failed to run task: PartitionId { job_id: "5zg9liN", stage_id: 16, partition_id: 0 }, error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) -2026-06-11T07:00:31.183996Z ERROR ballista_scheduler::scheduler_server::query_stage_scheduler: Job 5zg9liN running failed -2026-06-11T07:00:31.184692Z ERROR ballista_core::execution_plans::distributed_query: Job 5zg9liN failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal("Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)")) - -ArrowError(ExternalError(Execution("Job 5zg9liN failed: Job failed due to stage 16 failed: Task failed due to runtime execution error: DataFusionError(Internal(\"Assertion failed: col.name() == matching_name: Input field name r_reason_sk does not match with the projection expression count(*)\"))\n")), Some("")) -[?2004h[?2026h ❯  \? for help, \q to quit [?2026l2026-06-11T08:01:20.885457Z  WARN ballista_scheduler::scheduler_server: ACTIVE executor 6e7e4ea9-ad5e-469f-b327-829e1cd78ae5 heartbeat timed out after 180s -2026-06-11T08:01:20.892955Z  WARN ballista_scheduler::state::executor_manager: Executor is already dead, failed to connect to Executor 6e7e4ea9-ad5e-469f-b327-829e1cd78ae5 -[?2026h ❯ w [?2026lait is this AWE QE ?[?2026h ❯ wait is this AQE ? - [?2026l;[?2026h  ❯ wait is this AQE ? -; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? -; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? -; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? -; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? -; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ wait is this AQE ? -; 🤔 Invalid statement: SQL error: ParserError("Expected: an SQL statement, found: wait at Line: 1, Column: 1") [?2026l[?2026h   ❯ select case when (select count(*) from store_sales where ss_quantity between 1 and 20) > 409437 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 1 and 20) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 1 and 20) end bucket1, - case when (select count(*) from store_sales where ss_quantity between 21 and 40) > 4595804 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 21 and 40) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 21 and 40) end bucket2, - case when (select count(*) from store_sales where ss_quantity between 41 and 60) > 1333710 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 41 and 60) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 41 and 60) end bucket3, - case when (select count(*) from store_sales where ss_quantity between 61 and 80) > 2361102 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 61 and 80) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 61 and 80) end bucket4, - case when (select count(*) from store_sales where ss_quantity between 81 and 100) > 1517817 - then (select avg(ss_ext_tax) from store_sales where ss_quantity between 81 and 100) - else (select avg(ss_net_paid) from store_sales where ss_quantity between 81 and 100) end bucket5 - from reason where r_reason_sk = 1; [?2026l[?2004l -^C -[?2004h[?2026h ❯ [?2026l[?2026h ❯ - [?2026l[?2026h  ❯ - - [?2026l\q[?2026h   ❯ - -\q - [?2026l[?2004l -^C -[?2004h[?2026h ❯ [?2026l/q[?2026h ❯ /q - [?2026l[?2004l -\q