Add cost model#486
Open
gabotechs wants to merge 2 commits into
Open
Conversation
This was referenced Jun 8, 2026
Merged
bda1c2d to
2879131
Compare
bda802f to
4c66ecb
Compare
2879131 to
4e96136
Compare
4c66ecb to
26eeb03
Compare
5ad9e3f to
4cb23c0
Compare
3db9e03 to
20ed9aa
Compare
gabotechs
added a commit
that referenced
this pull request
Jun 11, 2026
This is one PR from the following stack of PRs: - #477 - #463 <- you are here - #464 - #478 - #479 - #486 - #432 This PR introduces a NetworkBoundaryBuilder argument to the network boundary injection logic, allowing more flexible and configurable strategies for determining which exchanges require network communication. This enables better optimization of data movement across distributed tasks.
gabotechs
added a commit
that referenced
this pull request
Jun 11, 2026
This is one PR from the following stack of PRs: - #477 - #463 - #464 <- you are here - #478 - #479 - #486 - #432 This PR introduces a MaxGauge metric to provide better tracking of peak values in distributed metrics collection. This enables more accurate monitoring of resource utilization and helps identify bottlenecks in the execution pipeline.
gabotechs
added a commit
that referenced
this pull request
Jun 11, 2026
This is one PR from the following stack of PRs: - #477 - #463 - #464 - #478 <- you are here - #479 - #486 - #432 --- Introduces the `ProducerHead` type: ```rust pub enum ProducerHead { /// No specific head node is necessary. None, /// The head node should be a [BroadcastExec]. BroadcastExec { output_partitions: usize }, /// The head node should be a [RepartitionExec]. RepartitionExec { partitioning: Partitioning }, } ``` Which is passed over the network while remotely executing tasks in order to set the appropriate node at the head of a stage. Today, this is a noop because the right head node in stages is ensured statically at planning time, but in follow up PRs, network boundaries can get swamped and reorganized arbitrarily. One example that happens in AQE: 1. A JOIN is planned as a CollectLeft ```js HashJoinExec: mode=CollectLeft CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: unknown size DistributedLeafExec: unknown size ``` 2. While collecting runtime statistics, it happens that `Stage 1` is huge, and during AQE the JOINs are swapped ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkBroadcastExec BroadcastExec DistributedLeafExec: big size ``` 3. The `Stage 1` is now on the probe side, so it needs to be rewritten to a `NetworkShuffleExec`, otherwise duplicate data will be returned: ```js HashJoinExec: mode=CollectLeft DistributedLeafExec: small size CoalescePartitionsExec: [Stage 1] => NetworkShuffleExec RepartitionExec // <- dynamically swapped at runtime based on the passed `ProducerHead` DistributedLeafExec: big size ``` Passing a `ProducerHead` at execution time unlocks two things: 1. dynamically set the fanout width accounting for a dynamically scaled upper stage 2. dynamically set the correct operator `BroadcastExec` or `RepartitionExec` based on the network boundary above, which might have changed because of AQE
4c906e5 to
6b7d1a4
Compare
b176c44 to
8422609
Compare
6b7d1a4 to
fc261f1
Compare
156eaf7 to
dad19f2
Compare
fc261f1 to
a4ceac5
Compare
dad19f2 to
b8e0d65
Compare
a4ceac5 to
1c864d5
Compare
b8e0d65 to
5575ad2
Compare
1c864d5 to
3477c45
Compare
gabotechs
added a commit
that referenced
this pull request
Jun 17, 2026
This is one PR from the following stack of PRs: - #477 - #463 - #464 - #478 - #479 <- you are here - #486 - #432 --- This is a pure refactor PR with a couple of implementation detail changes: # Reducing the bloat of complex functions in already complicated parts of the codebase. For this, two key structs are introduced: - `QueryCoordinator`: scoped to a whole distributed query, it handles references to pieces of data global to a query's lifetime, like the `TaskContext`, the metrics, the `JoinSet` used for spawning tasks, etc... it's in charge also of building `StageCoordinator` instances, which are scoped per-stage instead of per-query. - `StageCoordinator`: this is the old `CoordinatorToWorkerTaskSpawner`, but with some more methods that allow reusability and some better naming. It handles all the comms between workers and coordinator needed for driving a stage forward. This allows reducing the bloat in `prepare_static_plan` and the future `prepare_dynamic_plan` functions. # Ensuring a coordinator->worker channel is held active for as long as the `DistributedExec` node is executing the query on the coordinator. For the static planner, this is a noop, as the previous model worked fine before, but this will become important in the future for the dynamic planner. In the dynamic planner, the plan can be set for some stages, but they might never reach execution, so instead of coupling the task entry cache invalidation to the task execution finish, it's coupled instead of the coordinator->channel lifetime. This has one collateral effect: `WorkUnit` feeds can no longer rely on the global `coordinator->worker` EOS signal for ensuring that no further `WorkUnit` feed is going to be sent by the coordinator, so they need an explicit EOS message that signals that no further `WorkUnit`s will be received, even though the `coordinator->worker` channel will still be alive for a while. # Add a `plan_for_viz` field in `DistributedExec`. This is a new slot in `DistributedExec` that holds a reference of the plan that is supposed to be rewritten with metrics for visualization purposes. Again, for the static planner this is a noop, because the plan meant for visualization is equal to the plan that arrived as child to `DistributedExec` on the first place. However, during dynamic planning, the plan that arrives to `DistributedExec` is not going to be the same as the final one after execution, so we need a slot for storing that final plan.
Base automatically changed from
gabrielmusat/task-spawner-refactor-and-cache-invalidation
to
main
June 17, 2026 07:08
e1fa4a1 to
fd4e808
Compare
fd4e808 to
740e86e
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is one PR from the following stack of PRs:
Mostly rescued from:
This PR is pretty big, but the whole purpose is to expose a very simple function:
While keeping everything else private to the rest of the codebase.
This PR introduces a cost model used in Adaptive task count assignation, that attributes a compute cost to a plan based on:
Statisticsavailable by normal DataFusion mechanisms (ExecutionPlan::partition_statistics())The whole meat of this PR lives in the src/distributed_planner/statistics/compute_per_node.rs file: it's in charge of attributing a time complexity to the different
ExecutionPlanimplementations using big O notation. For example:In this example:
DataSourceExecneeds to allocate all theRecordBatches for all the output columns, as it's the data source, so it's time complexity isO(out_Cols), as its compute complexity linearly scales with the amount of columns it needs to allocate.FilterExecneeds to:MinTempcolumn (O(Col0))RecordBatches for all output columns based on rows filtered by the evaulated predicate (O(out_Cols))The tests in compute_per_node.rs contains several examples of this for the different operators, which can go from pretty simple (
FilterExec,ProjectionExec), to very complex (HashJoinExec).Once the time complexity in big O notation is attributed to a node, doing a simple multiplication math of the columns appearing in the notation by the estimated
byte_sizeof those columns (given byStatistics), gives the compute cost measured in bytes.