Skip to content

feat: decouple partition location from executor metadata#1853

Open
sandugood wants to merge 9 commits into
apache:mainfrom
sandugood:feat/executor-info-decouple
Open

feat: decouple partition location from executor metadata#1853
sandugood wants to merge 9 commits into
apache:mainfrom
sandugood:feat/executor-info-decouple

Conversation

@sandugood

@sandugood sandugood commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #1851.

Rationale for this change

In the current implementation there is a problem - in the PartitionLocation that is used in each shuffle operation (for each partition in the previous stage) there was executor_metadata field (which is of ExecutorMetadata type) filled with unnecessary info, because it did not add up any information that could be used by ShuffleReaderExec to extract partition (or i.e resume the execution from a failed stage)

What changes are included in this PR?

ExecutorMetadata was decoupled from PartitionLocation and now it is used as a separate struct for:

  • registering executor
  • in heartbeats
  • REST API info fetch

PartitionLocation is now exposed only to the executor_id, host and port, which is sufficient for fetching needed partitions by the ShuffleReaderExec
This way we can save up a lot of space and data-transfer during each shuffle operation (potentially removing possibility of Scheduler's OOM errors and improving speed of queries with lots of partitions)

Are there any user-facing changes?

Yes.
In the REST API interface ExecutorResponse doesnt contain nested struct with Executor's hardware and OS info. It was flattened.

Potential follow-up:

  • check the TUI REST API integration

@sandugood sandugood changed the title feat: decouple partition's location from executor's metadata feat: decouple partition location from executor metadata Jun 9, 2026

@milenkovicm milenkovicm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for contribution @sandugood

i think this is good first step, but we need to go a bit further and deduplicate executor connectivity information, is there a need to have same peace of information in thousents of places.

when we serialise PartitionLocation can we serialise it in two strucutres, vector of partition locations and a hash map of executor id -> executor metadata. partition location could reference executor with executor id (which we can store as bytes)

For struct PartitionLocation, can we make share executor_meta behind Arc (pub executor_meta: Arc<ExecutorMetadata>)

#[derive(Debug, Clone)]
pub struct PartitionLocation {
    /// The source partition ID from the map stage.
    pub map_partition_id: usize,
    /// The partition identifier.
    pub partition_id: PartitionId,
    /// Metadata about the executor hosting this partition.
    pub executor_meta: Arc<ExecutorMetadata>,
    /// Statistics about the partition data.
    pub partition_stats: PartitionStats,
    /// shuffle file id
    pub file_id: Option<u64>,
    /// whether this partition uses sort shuffle
    pub is_sort_shuffle: bool,
}

so basically when we do ser/de we can deduplicate executor meta.

Also, I'm not sure if flattening ExecutorSpecification and the other structure makes more sense than making it optional

This pr will need some effort, but it will help a lot cases where there is many partitions
thanks a lot

Comment thread ballista/core/proto/ballista.proto Outdated
Comment thread ballista/core/proto/ballista.proto
@sandugood

Copy link
Copy Markdown
Contributor Author

Thank you for your review and ideas @milenkovicm
Going to tackle it

@sandugood sandugood marked this pull request as draft June 10, 2026 12:01
@sandugood sandugood changed the title feat: decouple partition location from executor metadata [WIP] feat: decouple partition location from executor metadata Jun 10, 2026
@sandugood sandugood force-pushed the feat/executor-info-decouple branch from e5ac776 to 7fc0cea Compare June 16, 2026 09:54
@sandugood

Copy link
Copy Markdown
Contributor Author

Refactored the code:

  1. In the .proto spec now we are sending the PartitionLocation along with a map of ExecutorMetadata (used by ShuffleReaderPartition). So we can now transfer a Vec<> of locations and access the executor's metadata via its id.
  2. On the native side - added ExecutorMetadata in the PartitionLocation behind an Arc<> to avoid heavy copying.
  3. Made parts of the .proto spec in the ExecutorMetadata optional

@sandugood sandugood marked this pull request as ready for review June 16, 2026 10:27
@sandugood sandugood changed the title [WIP] feat: decouple partition location from executor metadata feat: decouple partition location from executor metadata Jun 16, 2026
@coderfender

Copy link
Copy Markdown
Contributor

@sandugood thank you for the PR. I was wondering if you could provide some sort of before and after benchmarks to gain more confidence here

@sandugood

Copy link
Copy Markdown
Contributor Author

The OOM crash was reproduced by @gabotechs
Wonder if you could potentially test it against this version? Thanks a lot

@milenkovicm

Copy link
Copy Markdown
Contributor

had a quick look, decoupling executor connection information from partition location makes sense. will try to have another look later. thanks @sandugood

Comment thread ballista/core/src/serde/scheduler/to_proto.rs Outdated
@coderfender

Copy link
Copy Markdown
Contributor

@sandugood , @milenkovicm I closed my PR to prioritize this approach which seems to be more promising and pragmatic

@milenkovicm

Copy link
Copy Markdown
Contributor

I'm running TPCH (SF10) Q2 with 512 partitions

cargo run --bin tpch -- benchmark ballista -p /Users/marko/TMP/tpch_data/tpch-data-sf10/ -f parquet -i 1 --port 50050 --host 127.0.0.1 -c datafusion.execution.target_partitions=512 -q 2 

before PR

Error, decoded message length too large: found 43952013 bytes, the limit is: 16777216 bytes

after PR has been applied

Error, decoded message length too large: found 18947588 bytes, the limit is: 16777216 byte

so its like 2.5x improvement which is good direction

I know there are other things we have to do in order to make using even less data like proposed in #1876 using uuid instead of strings 🤔

good progress @sandugood any suggestions @coderfender ?

@milenkovicm milenkovicm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @sandugood

one question for discussion, as i would like to challenge us to make that Q2 query run on default grpc configuration

Would it make sense to introduce PartitionLocationsExt (proto) or some better name which would have

message PartitionLocations {
repeated PartitionLocation partition_location = 1;  
map<string, ExecutorMetadata> executor_map = 2;
}

and use it in SuccessfulJob and ShuffleReaderPartition instead of adding map to those messages? this PartitionLocations would encapsulate transformation from/to proto.

so from

message ShuffleReaderPartition {
  // each partition of a shuffle read can read data from multiple locations
  repeated PartitionLocation location = 1;
  map<string, ExecutorMetadata> executor_map = 2;
}

to

message ShuffleReaderPartition {
  repeated PartitionLocations locations = 1;
}

executors and partition locations need each other so having a message to encapsulate them make sense in my head, not sure what you think

Furthermore ExecutorMetadata has executor_id currently string, which is duplicated information, not sure if it would make sense to have map<string, ExecutorConnection> executor_map = 2; which would just be

message ExecutorConnection {
string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
}

Which could further save few bytes

IMHO this looks good, but lets make it a challenge of it if you agree

@sandugood

Copy link
Copy Markdown
Contributor Author

Yeah, I will definitely dig into that
Saving few bytes here and there makes massive change at scale
Thanks for review @milenkovicm

@milenkovicm

Copy link
Copy Markdown
Contributor

furthermore,

when running Q2 with 256 partitions, executor takes around 460MB with this patch, and 550MB without (which might not be very good metrics to start with) but what i see

with this PR:

Query 2 took 6.989 s and returned 100 rows
Total time: 6.989 s

and without:

Query 2 took 12.037 s and returned 100 rows
Total time: 12.037 s

(unless i made some kind of mess-up with test runs)

Im running latest main with

cargo run  --bin ballista-executor --release  -- -c 4  --work-dir /path/to/work/dir./

@milenkovicm

Copy link
Copy Markdown
Contributor

we might not be able to save much on top of this changes, and removing executor_id from metadata might not be justified to bring new type (especially as number of executors might not be too big) so id suggest to try to encapsulate partition locations and executor maps and then if further optimisations could be done we can take them

@milenkovicm

Copy link
Copy Markdown
Contributor

running with 1024 partitions

Error, decoded message length too large: found 51824978 bytes, the limit is: 16777216 bytes

@sandugood

sandugood commented Jun 20, 2026

Copy link
Copy Markdown
Contributor Author

Refactored the code and placed it inside the PartitionLocationExt .proto spec. Seemed like a good reason to save some bytes there.

However, by running the same Q-2 query on TPCH SF-2 - cargo run --bin tpch -- benchmark ballista -p /home/sandu/datafusion-ballista/data -f parquet -i 1 --port 50050 --host 127.0.0.1 -c datafusion.execution.target_partitions=512 -q 2

Got:
Error, decoded message length too large: found 18834462 bytes, the limit is: 16777216 bytes

Saved 100k (comparing to previous test report) by this change, but couldn't fit it inside the default threshold.
cc @milenkovicm @coderfender

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ExecutorMetatada too verbose

3 participants