feat: decouple partition location from executor metadata#1853
feat: decouple partition location from executor metadata#1853sandugood wants to merge 9 commits into
Conversation
milenkovicm
left a comment
There was a problem hiding this comment.
thanks for contribution @sandugood
i think this is good first step, but we need to go a bit further and deduplicate executor connectivity information, is there a need to have same peace of information in thousents of places.
when we serialise PartitionLocation can we serialise it in two strucutres, vector of partition locations and a hash map of executor id -> executor metadata. partition location could reference executor with executor id (which we can store as bytes)
For struct PartitionLocation, can we make share executor_meta behind Arc (pub executor_meta: Arc<ExecutorMetadata>)
#[derive(Debug, Clone)]
pub struct PartitionLocation {
/// The source partition ID from the map stage.
pub map_partition_id: usize,
/// The partition identifier.
pub partition_id: PartitionId,
/// Metadata about the executor hosting this partition.
pub executor_meta: Arc<ExecutorMetadata>,
/// Statistics about the partition data.
pub partition_stats: PartitionStats,
/// shuffle file id
pub file_id: Option<u64>,
/// whether this partition uses sort shuffle
pub is_sort_shuffle: bool,
}so basically when we do ser/de we can deduplicate executor meta.
Also, I'm not sure if flattening ExecutorSpecification and the other structure makes more sense than making it optional
This pr will need some effort, but it will help a lot cases where there is many partitions
thanks a lot
|
Thank you for your review and ideas @milenkovicm |
e5ac776 to
7fc0cea
Compare
|
Refactored the code:
|
|
@sandugood thank you for the PR. I was wondering if you could provide some sort of before and after benchmarks to gain more confidence here |
|
The OOM crash was reproduced by @gabotechs |
|
had a quick look, decoupling executor connection information from partition location makes sense. will try to have another look later. thanks @sandugood |
|
@sandugood , @milenkovicm I closed my PR to prioritize this approach which seems to be more promising and pragmatic |
|
I'm running TPCH (SF10) Q2 with 512 partitions cargo run --bin tpch -- benchmark ballista -p /Users/marko/TMP/tpch_data/tpch-data-sf10/ -f parquet -i 1 --port 50050 --host 127.0.0.1 -c datafusion.execution.target_partitions=512 -q 2 before PR after PR has been applied so its like 2.5x improvement which is good direction I know there are other things we have to do in order to make using even less data like proposed in #1876 using uuid instead of strings 🤔 good progress @sandugood any suggestions @coderfender ? |
There was a problem hiding this comment.
thanks @sandugood
one question for discussion, as i would like to challenge us to make that Q2 query run on default grpc configuration
Would it make sense to introduce PartitionLocationsExt (proto) or some better name which would have
message PartitionLocations {
repeated PartitionLocation partition_location = 1;
map<string, ExecutorMetadata> executor_map = 2;
}and use it in SuccessfulJob and ShuffleReaderPartition instead of adding map to those messages? this PartitionLocations would encapsulate transformation from/to proto.
so from
message ShuffleReaderPartition {
// each partition of a shuffle read can read data from multiple locations
repeated PartitionLocation location = 1;
map<string, ExecutorMetadata> executor_map = 2;
}to
message ShuffleReaderPartition {
repeated PartitionLocations locations = 1;
}executors and partition locations need each other so having a message to encapsulate them make sense in my head, not sure what you think
Furthermore ExecutorMetadata has executor_id currently string, which is duplicated information, not sure if it would make sense to have map<string, ExecutorConnection> executor_map = 2; which would just be
message ExecutorConnection {
string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
}Which could further save few bytes
IMHO this looks good, but lets make it a challenge of it if you agree
|
Yeah, I will definitely dig into that |
|
furthermore, when running Q2 with 256 partitions, executor takes around 460MB with this patch, and 550MB without (which might not be very good metrics to start with) but what i see with this PR: and without: (unless i made some kind of mess-up with test runs) Im running latest main with |
|
we might not be able to save much on top of this changes, and removing |
|
running with 1024 partitions |
|
Refactored the code and placed it inside the However, by running the same Q-2 query on TPCH SF-2 - Got: Saved 100k (comparing to previous test report) by this change, but couldn't fit it inside the default threshold. |
Which issue does this PR close?
Closes #1851.
Rationale for this change
In the current implementation there is a problem - in the
PartitionLocationthat is used in each shuffle operation (for each partition in the previous stage) there wasexecutor_metadatafield (which is ofExecutorMetadatatype) filled with unnecessary info, because it did not add up any information that could be used byShuffleReaderExecto extract partition (or i.e resume the execution from a failed stage)What changes are included in this PR?
ExecutorMetadatawas decoupled fromPartitionLocationand now it is used as a separate struct for:PartitionLocationis now exposed only to theexecutor_id,hostandport, which is sufficient for fetching needed partitions by theShuffleReaderExecThis way we can save up a lot of space and data-transfer during each shuffle operation (potentially removing possibility of
Scheduler's OOM errors and improving speed of queries with lots of partitions)Are there any user-facing changes?
Yes.
In the REST API interface
ExecutorResponsedoesnt contain nested struct withExecutor's hardware and OS info. It was flattened.Potential follow-up: