Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 18 additions & 36 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl ExecutionPlan for ShuffleReaderExec {
let mut partition_locations = HashMap::new();
for p in &self.partition[partition] {
partition_locations
.entry(p.executor_meta.id.clone())
.entry(p.partition_location_metadata.id.clone())
.or_insert_with(Vec::new)
.push(p.clone());
}
Expand Down Expand Up @@ -747,7 +747,7 @@ async fn fetch_partition_remote(
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
client_pool: Option<Arc<dyn BallistaClientPool>>,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
let metadata = &location.executor_meta;
let metadata = &location.partition_location_metadata;
let partition_id = &location.partition_id;
let file_id = location.file_id;
let is_sort_shuffle = location.is_sort_shuffle;
Expand Down Expand Up @@ -817,7 +817,7 @@ fn fetch_partition_local(
sort_shuffle_enabled: bool,
) -> result::Result<SendableRecordBatchStream, BallistaError> {
let path = &location.path(work_dir)?;
let metadata = &location.executor_meta;
let metadata = &location.partition_location_metadata;
let partition_id = &location.partition_id;
let data_path = std::path::Path::new(path);

Expand Down Expand Up @@ -980,10 +980,7 @@ impl RecordBatchStream for CoalescedShuffleReaderStream {
mod tests {
use super::*;
use crate::execution_plans::{ShuffleWriterExec, create_shuffle_path};
use crate::serde::scheduler::{
ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification,
PartitionId,
};
use crate::serde::scheduler::{PartitionId, PartitionLocationMetadata};
use crate::utils;
use datafusion::arrow::array::{Int32Array, StringArray, UInt32Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
Expand Down Expand Up @@ -1022,15 +1019,12 @@ mod tests {
stage_id,
partition_id: i,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "executor_1".to_string(),
host: "executor_1".to_string(),
port: 7070,
grpc_port: 8080,
specification: ExecutorSpecification::default()
.with_task_slots(1),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: PartitionStats {
num_rows: Some(rows),
num_batches: None,
Expand Down Expand Up @@ -1141,14 +1135,12 @@ mod tests {
stage_id: input_stage_id,
partition_id,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "executor_1".to_string(),
host: "executor_1".to_string(),
port: 7070,
grpc_port: 8080,
specification: ExecutorSpecification::default().with_task_slots(1),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: PartitionStats {
num_rows: Some(1),
num_batches: None,
Expand Down Expand Up @@ -1192,14 +1184,12 @@ mod tests {
stage_id: input_stage_id,
partition_id,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "executor_1".to_string(),
host: "executor_1".to_string(),
port: 7070,
grpc_port: 8080,
specification: ExecutorSpecification::default().with_task_slots(1),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: PartitionStats {
num_rows: Some(1),
num_batches: None,
Expand Down Expand Up @@ -1244,14 +1234,12 @@ mod tests {
stage_id: input_stage_id,
partition_id,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "executor_1".to_string(),
host: "executor_1".to_string(),
port: 7070,
grpc_port: 8080,
specification: ExecutorSpecification::default().with_task_slots(1),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: PartitionStats {
num_rows: Some(1),
num_batches: None,
Expand Down Expand Up @@ -1296,14 +1284,12 @@ mod tests {
stage_id: input_stage_id,
partition_id,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "executor_1".to_string(),
host: "executor_1".to_string(),
port: 7070,
grpc_port: 8080,
specification: ExecutorSpecification::default().with_task_slots(1),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: Default::default(),
file_id: None,
is_sort_shuffle: false,
Expand Down Expand Up @@ -1487,14 +1473,12 @@ mod tests {
stage_id: 1,
partition_id,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: format!("exec{partition_id}"),
host: "localhost".to_string(),
port: 50051,
grpc_port: 50052,
specification: ExecutorSpecification::default().with_task_slots(12),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: Default::default(),
file_id,
is_sort_shuffle: false,
Expand Down Expand Up @@ -1723,14 +1707,12 @@ mod tests {
stage_id: 7,
partition_id,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: format!("exec-{partition_id}"),
host: "localhost".to_string(),
port: 50051,
grpc_port: 50052,
specification: ExecutorSpecification::default(),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: PartitionStats::new(Some(100), Some(1), Some(1024)),
file_id: None,
is_sort_shuffle: false,
Expand Down
17 changes: 11 additions & 6 deletions ballista/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::serde::protobuf::{NamedPruningMetrics, NamedRatio};
use crate::serde::scheduler::{
Action, BallistaFunctionRegistry, ExecutorData, ExecutorMetadata,
ExecutorOperatingSystemSpecification, ExecutorSpecification, PartitionId,
PartitionLocation, PartitionStats, TaskDefinition,
PartitionLocation, PartitionLocationMetadata, PartitionStats, TaskDefinition,
};

use crate::serde::{BallistaCodec, protobuf};
Expand Down Expand Up @@ -108,14 +108,19 @@ impl TryInto<PartitionLocation> for protobuf::PartitionLocation {
)
})?
.into(),
executor_meta: self
.executor_meta
.ok_or_else(|| {
partition_location_metadata: {
let m = self.executor_meta.ok_or_else(|| {
BallistaError::General(
"executor_meta in PartitionLocation is missing".to_owned(),
)
})?
.into(),
})?;
Arc::new(PartitionLocationMetadata {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my idea was to have single instance of partition location and then Arc-ed behind shared pointers. At the moment, we have instance of executor metadata per partition location which for scenarios where we have just a few executor and many partitions may produce a lot of data (id, host ...). also when serialised we would serialize executor metadata for each of partition, which is hitting gprc issues. not sure if we can somehow reduce executor metadata instance count and its serialization

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be okay to create a followup PR to handle the second part and port all common info in ShuffleReaderExec proto level and not PartitionLocation ?

id: m.id,
host: m.host,
port: m.port as u16,
grpc_port: m.grpc_port as u16,
})
},
partition_stats: self
.partition_stats
.ok_or_else(|| {
Expand Down
26 changes: 25 additions & 1 deletion ballista/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,30 @@ impl PartitionId {
}
}

/// Lean exec information for a shuffle partition.
#[derive(Debug, Clone)]
pub struct PartitionLocationMetadata {
/// Unique executor identifier.
pub id: String,
/// Hostname or IP address of the executor.
pub host: String,
/// Port number for data transfer.
pub port: u16,
/// Port number for gRPC communication.
pub grpc_port: u16,
}

impl From<&ExecutorMetadata> for PartitionLocationMetadata {
fn from(m: &ExecutorMetadata) -> Self {
Self {
id: m.id.clone(),
host: m.host.clone(),
port: m.port,
grpc_port: m.grpc_port,
}
}
}

/// Location information for a shuffle partition.
#[derive(Debug, Clone)]
pub struct PartitionLocation {
Expand All @@ -88,7 +112,7 @@ pub struct PartitionLocation {
/// The partition identifier.
pub partition_id: PartitionId,
/// Metadata about the executor hosting this partition.
pub executor_meta: ExecutorMetadata,
pub partition_location_metadata: Arc<PartitionLocationMetadata>,
/// Statistics about the partition data.
pub partition_stats: PartitionStats,
/// shuffle file id
Expand Down
9 changes: 8 additions & 1 deletion ballista/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,14 @@ impl TryInto<protobuf::PartitionLocation> for PartitionLocation {
Ok(protobuf::PartitionLocation {
map_partition_id: self.map_partition_id as u32,
partition_id: Some(self.partition_id.into()),
executor_meta: Some(self.executor_meta.into()),
executor_meta: Some(protobuf::ExecutorMetadata {
id: self.partition_location_metadata.id.clone(),
host: self.partition_location_metadata.host.clone(),
port: self.partition_location_metadata.port as u32,
grpc_port: self.partition_location_metadata.grpc_port as u32,
specification: None,
os_info: None,
}),
partition_stats: Some(self.partition_stats.into()),
file_id: self.file_id,
is_sort_shuffle: self.is_sort_shuffle,
Expand Down
9 changes: 3 additions & 6 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1135,8 +1135,7 @@ order by
{
use ballista_core::execution_plans::ShuffleReaderExec;
use ballista_core::serde::scheduler::{
ExecutorMetadata, ExecutorOperatingSystemSpecification,
ExecutorSpecification, PartitionId, PartitionLocation, PartitionStats,
PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema};

Expand All @@ -1152,14 +1151,12 @@ order by
stage_id: 42,
partition_id,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: format!("exec-{partition_id}"),
host: "localhost".to_string(),
port: 50050,
grpc_port: 50051,
specification: ExecutorSpecification::default().with_task_slots(1),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: PartitionStats::new(Some(10), None, Some(1)),
file_id: None,
is_sort_shuffle: false,
Expand Down
5 changes: 4 additions & 1 deletion ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,10 @@ mod test {
assert_eq!(final_graph.output_locations().len(), 4);

for output_location in final_graph.output_locations() {
assert_eq!(output_location.executor_meta.host, "localhost1".to_owned())
assert_eq!(
output_location.partition_location_metadata.host,
"localhost1".to_owned()
)
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/state/aqe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ impl AdaptiveExecutionGraph {
stage_output.partition_locations.iter_mut().for_each(
|(_partition, locs)| {
let before_len = locs.len();
locs.retain(|loc| loc.executor_meta.id != executor_id);
locs.retain(|loc| loc.partition_location_metadata.id != executor_id);
if locs.len() < before_len {
match_found = true;
}
Expand Down
15 changes: 5 additions & 10 deletions ballista/scheduler/src/state/aqe/test/alter_stages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use crate::state::aqe::test::{
mock_batch, mock_context, mock_partitions_with_statistics_no_data,
};
use ballista_core::serde::scheduler::{
ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification,
PartitionId, PartitionLocation, PartitionStats,
PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::stats::Precision;
Expand Down Expand Up @@ -519,14 +518,12 @@ fn small_statistics_exchange() -> Vec<Vec<PartitionLocation>> {
stage_id: 0,
partition_id: 0,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "".to_string(),
host: "".to_string(),
port: 0,
grpc_port: 0,
specification: ExecutorSpecification::default().with_task_slots(0),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
// next few properties are needed
partition_stats: PartitionStats::new(
Some(threshold_num_rows as u64 / 128),
Expand All @@ -551,14 +548,12 @@ fn big_statistics_exchange() -> Vec<Vec<PartitionLocation>> {
stage_id: 0,
partition_id: 0,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "".to_string(),
host: "".to_string(),
port: 0,
grpc_port: 0,
specification: ExecutorSpecification::default().with_task_slots(0),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),

// next few properties are needed
partition_stats: PartitionStats::new(
Expand Down
9 changes: 3 additions & 6 deletions ballista/scheduler/src/state/aqe/test/coalesce_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use crate::state::aqe::planner::AdaptivePlanner;
use crate::state::aqe::test::{mock_batch, mock_schema};
use ballista_core::extension::SessionConfigExt;
use ballista_core::serde::scheduler::{
ExecutorMetadata, ExecutorOperatingSystemSpecification, ExecutorSpecification,
PartitionId, PartitionLocation, PartitionStats,
PartitionId, PartitionLocation, PartitionLocationMetadata, PartitionStats,
};
use datafusion::datasource::MemTable;
use datafusion::execution::SessionStateBuilder;
Expand Down Expand Up @@ -101,14 +100,12 @@ fn partitions_with_byte_sizes(
stage_id: 0,
partition_id: idx,
},
executor_meta: ExecutorMetadata {
partition_location_metadata: Arc::new(PartitionLocationMetadata {
id: "".to_string(),
host: "".to_string(),
port: 0,
grpc_port: 0,
specification: ExecutorSpecification::default().with_task_slots(0),
os_info: ExecutorOperatingSystemSpecification::default(),
},
}),
partition_stats: PartitionStats::new(Some(1), None, Some(bytes)),
file_id: None,
is_sort_shuffle: false,
Expand Down
Loading
Loading