diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index 556ccc809..99021a64d 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -380,7 +380,7 @@ impl ExecutionPlan for ShuffleReaderExec { let mut partition_locations = HashMap::new(); for p in &self.partition[partition] { partition_locations - .entry(p.executor_meta.id.clone()) + .entry(p.partition_location_metadata.id.clone()) .or_insert_with(Vec::new) .push(p.clone()); } @@ -747,7 +747,7 @@ async fn fetch_partition_remote( customize_endpoint: Option>, client_pool: Option>, ) -> result::Result { - let metadata = &location.executor_meta; + let metadata = &location.partition_location_metadata; let partition_id = &location.partition_id; let file_id = location.file_id; let is_sort_shuffle = location.is_sort_shuffle; @@ -817,7 +817,7 @@ fn fetch_partition_local( sort_shuffle_enabled: bool, ) -> result::Result { let path = &location.path(work_dir)?; - let metadata = &location.executor_meta; + let metadata = &location.partition_location_metadata; let partition_id = &location.partition_id; let data_path = std::path::Path::new(path); @@ -980,10 +980,7 @@ impl RecordBatchStream for CoalescedShuffleReaderStream { mod tests { use super::*; use crate::execution_plans::{ShuffleWriterExec, create_shuffle_path}; - use crate::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, - }; + use crate::serde::scheduler::{PartitionId, PartitionLocationMetadata}; use crate::utils; use datafusion::arrow::array::{Int32Array, StringArray, UInt32Array}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; @@ -1022,15 +1019,12 @@ mod tests { stage_id, partition_id: i, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default() - .with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(rows), num_batches: None, @@ -1141,14 +1135,12 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(1), num_batches: None, @@ -1192,14 +1184,12 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(1), num_batches: None, @@ -1244,14 +1234,12 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats { num_rows: Some(1), num_batches: None, @@ -1296,14 +1284,12 @@ mod tests { stage_id: input_stage_id, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "executor_1".to_string(), host: "executor_1".to_string(), port: 7070, grpc_port: 8080, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: Default::default(), file_id: None, is_sort_shuffle: false, @@ -1487,14 +1473,12 @@ mod tests { stage_id: 1, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: format!("exec{partition_id}"), host: "localhost".to_string(), port: 50051, grpc_port: 50052, - specification: ExecutorSpecification::default().with_task_slots(12), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: Default::default(), file_id, is_sort_shuffle: false, @@ -1723,14 +1707,12 @@ mod tests { stage_id: 7, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: format!("exec-{partition_id}"), host: "localhost".to_string(), port: 50051, grpc_port: 50052, - specification: ExecutorSpecification::default(), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats::new(Some(100), Some(1), Some(1024)), file_id: None, is_sort_shuffle: false, diff --git a/ballista/core/src/serde/scheduler/from_proto.rs b/ballista/core/src/serde/scheduler/from_proto.rs index ba8b9b33a..6bb156e41 100644 --- a/ballista/core/src/serde/scheduler/from_proto.rs +++ b/ballista/core/src/serde/scheduler/from_proto.rs @@ -38,7 +38,7 @@ use crate::serde::protobuf::{NamedPruningMetrics, NamedRatio}; use crate::serde::scheduler::{ Action, BallistaFunctionRegistry, ExecutorData, ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, PartitionId, - PartitionLocation, PartitionStats, TaskDefinition, + PartitionLocation, PartitionLocationMetadata, PartitionStats, TaskDefinition, }; use crate::serde::{BallistaCodec, protobuf}; @@ -108,14 +108,19 @@ impl TryInto for protobuf::PartitionLocation { ) })? .into(), - executor_meta: self - .executor_meta - .ok_or_else(|| { + partition_location_metadata: { + let m = self.executor_meta.ok_or_else(|| { BallistaError::General( "executor_meta in PartitionLocation is missing".to_owned(), ) - })? - .into(), + })?; + Arc::new(PartitionLocationMetadata { + id: m.id, + host: m.host, + port: m.port as u16, + grpc_port: m.grpc_port as u16, + }) + }, partition_stats: self .partition_stats .ok_or_else(|| { diff --git a/ballista/core/src/serde/scheduler/mod.rs b/ballista/core/src/serde/scheduler/mod.rs index cb357e003..b5d12b1e3 100644 --- a/ballista/core/src/serde/scheduler/mod.rs +++ b/ballista/core/src/serde/scheduler/mod.rs @@ -80,6 +80,30 @@ impl PartitionId { } } +/// Lean exec information for a shuffle partition. +#[derive(Debug, Clone)] +pub struct PartitionLocationMetadata { + /// Unique executor identifier. + pub id: String, + /// Hostname or IP address of the executor. + pub host: String, + /// Port number for data transfer. + pub port: u16, + /// Port number for gRPC communication. + pub grpc_port: u16, +} + +impl From<&ExecutorMetadata> for PartitionLocationMetadata { + fn from(m: &ExecutorMetadata) -> Self { + Self { + id: m.id.clone(), + host: m.host.clone(), + port: m.port, + grpc_port: m.grpc_port, + } + } +} + /// Location information for a shuffle partition. #[derive(Debug, Clone)] pub struct PartitionLocation { @@ -88,7 +112,7 @@ pub struct PartitionLocation { /// The partition identifier. pub partition_id: PartitionId, /// Metadata about the executor hosting this partition. - pub executor_meta: ExecutorMetadata, + pub partition_location_metadata: Arc, /// Statistics about the partition data. pub partition_stats: PartitionStats, /// shuffle file id diff --git a/ballista/core/src/serde/scheduler/to_proto.rs b/ballista/core/src/serde/scheduler/to_proto.rs index 2f804a8ce..6c73b0f95 100644 --- a/ballista/core/src/serde/scheduler/to_proto.rs +++ b/ballista/core/src/serde/scheduler/to_proto.rs @@ -78,7 +78,14 @@ impl TryInto for PartitionLocation { Ok(protobuf::PartitionLocation { map_partition_id: self.map_partition_id as u32, partition_id: Some(self.partition_id.into()), - executor_meta: Some(self.executor_meta.into()), + executor_meta: Some(protobuf::ExecutorMetadata { + id: self.partition_location_metadata.id.clone(), + host: self.partition_location_metadata.host.clone(), + port: self.partition_location_metadata.port as u32, + grpc_port: self.partition_location_metadata.grpc_port as u32, + specification: None, + os_info: None, + }), partition_stats: Some(self.partition_stats.into()), file_id: self.file_id, is_sort_shuffle: self.is_sort_shuffle, diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index f24a7ab39..e51b09b85 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -1135,8 +1135,7 @@ order by { use ballista_core::execution_plans::ShuffleReaderExec; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, - ExecutorSpecification, PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::arrow::datatypes::{DataType, Field, Schema}; @@ -1152,14 +1151,12 @@ order by stage_id: 42, partition_id, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: format!("exec-{partition_id}"), host: "localhost".to_string(), port: 50050, grpc_port: 50051, - specification: ExecutorSpecification::default().with_task_slots(1), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats::new(Some(10), None, Some(1)), file_id: None, is_sort_shuffle: false, diff --git a/ballista/scheduler/src/scheduler_server/mod.rs b/ballista/scheduler/src/scheduler_server/mod.rs index 17b955a98..200165e77 100644 --- a/ballista/scheduler/src/scheduler_server/mod.rs +++ b/ballista/scheduler/src/scheduler_server/mod.rs @@ -551,7 +551,10 @@ mod test { assert_eq!(final_graph.output_locations().len(), 4); for output_location in final_graph.output_locations() { - assert_eq!(output_location.executor_meta.host, "localhost1".to_owned()) + assert_eq!( + output_location.partition_location_metadata.host, + "localhost1".to_owned() + ) } Ok(()) diff --git a/ballista/scheduler/src/state/aqe/mod.rs b/ballista/scheduler/src/state/aqe/mod.rs index 74cedb1f4..d9c836021 100644 --- a/ballista/scheduler/src/state/aqe/mod.rs +++ b/ballista/scheduler/src/state/aqe/mod.rs @@ -414,7 +414,7 @@ impl AdaptiveExecutionGraph { stage_output.partition_locations.iter_mut().for_each( |(_partition, locs)| { let before_len = locs.len(); - locs.retain(|loc| loc.executor_meta.id != executor_id); + locs.retain(|loc| loc.partition_location_metadata.id != executor_id); if locs.len() < before_len { match_found = true; } diff --git a/ballista/scheduler/src/state/aqe/test/alter_stages.rs b/ballista/scheduler/src/state/aqe/test/alter_stages.rs index fcf48e9cf..7d8a56b2d 100644 --- a/ballista/scheduler/src/state/aqe/test/alter_stages.rs +++ b/ballista/scheduler/src/state/aqe/test/alter_stages.rs @@ -21,8 +21,7 @@ use crate::state::aqe::test::{ mock_batch, mock_context, mock_partitions_with_statistics_no_data, }; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::common::stats::Precision; @@ -519,14 +518,12 @@ fn small_statistics_exchange() -> Vec> { stage_id: 0, partition_id: 0, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), // next few properties are needed partition_stats: PartitionStats::new( Some(threshold_num_rows as u64 / 128), @@ -551,14 +548,12 @@ fn big_statistics_exchange() -> Vec> { stage_id: 0, partition_id: 0, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), // next few properties are needed partition_stats: PartitionStats::new( diff --git a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs index 665eee88f..1cd688604 100644 --- a/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs +++ b/ballista/scheduler/src/state/aqe/test/coalesce_rule.rs @@ -29,8 +29,7 @@ use crate::state::aqe::planner::AdaptivePlanner; use crate::state::aqe::test::{mock_batch, mock_schema}; use ballista_core::extension::SessionConfigExt; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::datasource::MemTable; use datafusion::execution::SessionStateBuilder; @@ -101,14 +100,12 @@ fn partitions_with_byte_sizes( stage_id: 0, partition_id: idx, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), partition_stats: PartitionStats::new(Some(1), None, Some(bytes)), file_id: None, is_sort_shuffle: false, diff --git a/ballista/scheduler/src/state/aqe/test/mod.rs b/ballista/scheduler/src/state/aqe/test/mod.rs index c1237f216..5e5aef700 100644 --- a/ballista/scheduler/src/state/aqe/test/mod.rs +++ b/ballista/scheduler/src/state/aqe/test/mod.rs @@ -27,8 +27,7 @@ mod plan_to_stages; use ballista_core::config::BALLISTA_SHUFFLE_SORT_BASED_ENABLED; use ballista_core::extension::SessionConfigExt; use ballista_core::serde::scheduler::{ - ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification, - PartitionId, PartitionLocation, PartitionStats, + PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats, }; use datafusion::arrow::array::{Int32Array, RecordBatch}; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -47,14 +46,12 @@ pub(crate) fn mock_partitions_with_statistics() -> Vec> { stage_id: 0, partition_id: 0, }, - executor_meta: ExecutorMetadata { + partition_location_metadata: Arc::new(PartitionLocationMetadata { id: "".to_string(), host: "".to_string(), port: 0, grpc_port: 0, - specification: ExecutorSpecification::default().with_task_slots(0), - os_info: ExecutorOperatingSystemSpecification::default(), - }, + }), // next few properties are needed partition_stats: PartitionStats::new(Some(42), None, Some(10)), file_id: None, @@ -72,14 +69,12 @@ pub(crate) fn mock_partitions_with_statistics_no_data() -> Vec, ) -> Vec { + let partition_metadata = Arc::new(PartitionLocationMetadata::from(exec_metadata)); shuffles .into_iter() .map(|shuffle| PartitionLocation { @@ -1747,7 +1749,7 @@ pub(crate) fn partition_to_location( stage_id, partition_id: shuffle.partition_id as usize, }, - executor_meta: executor.clone(), + partition_location_metadata: Arc::clone(&partition_metadata), partition_stats: PartitionStats::new( Some(shuffle.num_rows), Some(shuffle.num_batches), @@ -1882,7 +1884,10 @@ mod test { let outputs = agg_graph.output_locations(); for location in outputs { - assert_eq!(location.executor_meta.host, "localhost2".to_owned()); + assert_eq!( + location.partition_location_metadata.host, + "localhost2".to_owned() + ); } Ok(()) diff --git a/ballista/scheduler/src/state/execution_stage.rs b/ballista/scheduler/src/state/execution_stage.rs index a9695d3b8..8c2a28add 100644 --- a/ballista/scheduler/src/state/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_stage.rs @@ -342,12 +342,12 @@ impl UnresolvedStage { .iter_mut() .for_each(|(_partition, locs)| { locs.iter().for_each(|loc| { - if loc.executor_meta.id == executor_id { + if loc.partition_location_metadata.id == executor_id { bad_map_partitions.insert(loc.map_partition_id); } }); - locs.retain(|loc| loc.executor_meta.id != executor_id); + locs.retain(|loc| loc.partition_location_metadata.id != executor_id); }); stage_output.complete = false; Ok(bad_map_partitions) @@ -824,12 +824,12 @@ impl RunningStage { .iter_mut() .for_each(|(_partition, locs)| { locs.iter().for_each(|loc| { - if loc.executor_meta.id == executor_id { + if loc.partition_location_metadata.id == executor_id { bad_map_partitions.insert(loc.map_partition_id); } }); - locs.retain(|loc| loc.executor_meta.id != executor_id); + locs.retain(|loc| loc.partition_location_metadata.id != executor_id); }); stage_output.complete = false; Ok(bad_map_partitions)