Commit 590a8d8
feat: Pinot-style colocated-join optimizer for hash-bucketed tables
Brings three Pinot V2 Physical Optimizer ideas to Ballista so queries
against pre-bucketed tables can avoid the network shuffle on every join.
Architecture (all changes additive; no DataFusion fork required):
DataFusion physical planner
→ DataFusion optimizer rules (incl. EnforceDistribution, JoinSelection)
→ ColocatedJoinRule ← new: strip redundant RepartitionExec
(also handles divisor sub-partitioning)
→ BroadcastSmallSideRule ← new: replace partitioned join with CollectLeft
→ DistributedExchangeRule ← existing: maps remaining repartitions to ExchangeExec
→ DefaultDistributedPlanner ← existing: cuts stages at ExchangeExec
Metadata layer (ballista-core):
- BallistaPartitionMetadata trait — optional contract any TableProvider
can implement to declare on-disk hash bucketing (keys + hash function +
bucket count).
- HashDistribution / HashFn — the declared layout.
- PartitionedTableProvider wrapper — attaches a HashDistribution to any
existing TableProvider without modifying it.
- HashDistributedScanExec adapter — re-advertises the wrapped scan's
output_partitioning() as Partitioning::Hash so optimizer rules can
read it.
- BucketSubPartitionExec — chains input partitions per output partition
via stream::iter + flatten (pure local concat, no network) for the
divisor case.
Optimizer rules (ballista-scheduler):
- ColocatedJoinRule — for each HashJoinExec, when both inputs declare
matching keys + hash function and either equal or divisor-related
bucket counts, strips the RepartitionExec above each input or wraps
the larger side in BucketSubPartitionExec. Divisor case relies on
(hash(k) % 16) % 8 == hash(k) % 8.
- BroadcastSmallSideRule — promotes Partitioned HashJoinExec to
CollectLeft when one side's total_byte_size is below the configured
threshold; uses HashJoinExec::swap_inputs when the small side is on
the right. Restricted to JoinType::Inner pending issue #1055.
- default_optimizers now takes the SessionConfig so the broadcast
threshold flows through. ColocatedJoinRule runs before
BroadcastSmallSideRule so colocation wins when both could apply.
Both features are opt-in. Tables only get colocation behavior if the
user wraps them with PartitionedTableProvider; broadcast only fires when
ballista.optimizer.broadcast_threshold_bytes > 0 (default 0). All
existing snapshot tests are unchanged.
Tests:
- 86 ballista-core tests pass (trait, wrapper, scan adapter,
BucketSubPartitionExec correctness + non-divisor rejection).
- 84 ballista-scheduler tests pass: 5 ColocatedJoinRule, 4 divisor,
5 BroadcastSmallSideRule, 3 end-to-end plan-snapshot tests
(matching → no exchange; 8/4 divisor → BucketSubPartitionExec; plain
MemTable → ExchangeExec retained), plus all pre-existing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>1 parent aaf8f08 commit 590a8d8
8 files changed
Lines changed: 1592 additions & 2 deletions
File tree
- ballista
- core/src
- scheduler/src
- physical_optimizer
- state/aqe
- test
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
50 | 50 | | |
51 | 51 | | |
52 | 52 | | |
| 53 | + | |
| 54 | + | |
53 | 55 | | |
54 | 56 | | |
55 | 57 | | |
| |||
0 commit comments