Summary
cluster() can expose a partitioned ExecutionPlan whose per-partition output streams are not independently consumable.
After the fix in PR #145, ClusterExec computes globally unique cluster ids by introducing a cross-partition coordinator. Each output partition stream now waits until all partitions have reported cluster counts before it can emit rows.
That fixes correctness for global cluster ids, but it also means a caller can hang indefinitely if it consumes only some partition streams, for example via execute_stream_partitioned().
This is a follow-up issue to PR #145 and is distinct from #146:
#146 is about duplicate cluster ids across partitions
- this issue is about
ClusterExec violating the expectation that execute(partition) streams can be consumed independently
Current behavior
In datafusion/bio-function-ranges/src/cluster.rs, ClusterExec still advertises multi-partition output and returns one stream per output partition.
Internally, however, each stream enters a rendezvous path where it returns Poll::Pending until:
- every partition has reported counts
- global offsets have been computed for all contigs
The relevant path is the ClusterIdCoordinator / poll_cluster_offsets() logic added in PR #145.
Why this is a problem
DataFusion’s partitioned execution API exposes output streams partition-by-partition.
execute_stream_partitioned() constructs the vector of streams by calling plan.execute(i, ...) for each output partition and returns them to the caller. It does not drive sibling streams automatically.
That means this pattern is valid:
let mut streams = df.execute_stream_partitioned().await?;
let first = streams.remove(0);
let batches: Vec<_> = first.try_collect().await?;
With the current ClusterExec design, the first stream can block forever if the other streams are never polled far enough to register with the coordinator.
So the plan currently has a hidden cross-partition dependency while still presenting itself as multi-partition output.
Expected behavior
One of the following should be true:
ClusterExec output partitions are truly independently consumable, or
ClusterExec does not expose multi-partition output and instead internalizes the cross-partition coordination behind a single output partition
At the moment it satisfies neither cleanly.
Likely fix direction
The most straightforward fix is to stop exposing the coordinator through multi-partition output:
- keep the required input distribution (
HashPartitioned by contig)
- change
ClusterExec output partitioning to a single partition
- make
execute(0) drive all child input partitions internally
- collect per-partition groups/counts, compute global offsets once, then emit final rows from the single output stream
- reject
execute(partition) for partition != 0
- remove the externally visible
AwaitingOffsets barrier between output partitions
This preserves globally unique, partition-count-invariant cluster ids without depending on the caller to poll all output streams concurrently.
If preserving multi-partition output is a hard requirement, that would need a larger redesign, such as:
- local cluster stage per partition
- single-partition renumbering / global offset stage
- optional repartition after global ids have been assigned
Reproduction idea
Add an integration test that exercises DataFusion’s partitioned stream API directly:
let df = ctx.sql("SELECT * FROM cluster('reads')").await?;
let mut streams = df.execute_stream_partitioned().await?;
assert_eq!(streams.len(), 1 /* after the fix */);
let result = tokio::time::timeout(
std::time::Duration::from_secs(5),
streams.remove(0).try_collect(),
).await;
assert!(result.is_ok());
Before the fix, a similar test that consumes only one stream from a multi-partition ClusterExec should be able to hang.
Acceptance criteria
cluster() does not rely on sibling output partitions being polled in order to make progress
- consuming only the returned
cluster() stream(s) cannot hang because another output partition was never polled
- global
cluster ids remain unique and partition-count invariant
- existing regressions for issue
#146 still pass
- add a targeted test covering
execute_stream_partitioned() behavior
Related
Summary
cluster()can expose a partitionedExecutionPlanwhose per-partition output streams are not independently consumable.After the fix in PR #145,
ClusterExeccomputes globally unique cluster ids by introducing a cross-partition coordinator. Each output partition stream now waits until all partitions have reported cluster counts before it can emit rows.That fixes correctness for global
clusterids, but it also means a caller can hang indefinitely if it consumes only some partition streams, for example viaexecute_stream_partitioned().This is a follow-up issue to PR #145 and is distinct from #146:
#146is about duplicateclusterids across partitionsClusterExecviolating the expectation thatexecute(partition)streams can be consumed independentlyCurrent behavior
In
datafusion/bio-function-ranges/src/cluster.rs,ClusterExecstill advertises multi-partition output and returns one stream per output partition.Internally, however, each stream enters a rendezvous path where it returns
Poll::Pendinguntil:The relevant path is the
ClusterIdCoordinator/poll_cluster_offsets()logic added in PR #145.Why this is a problem
DataFusion’s partitioned execution API exposes output streams partition-by-partition.
execute_stream_partitioned()constructs the vector of streams by callingplan.execute(i, ...)for each output partition and returns them to the caller. It does not drive sibling streams automatically.That means this pattern is valid:
With the current
ClusterExecdesign, thefirststream can block forever if the other streams are never polled far enough to register with the coordinator.So the plan currently has a hidden cross-partition dependency while still presenting itself as multi-partition output.
Expected behavior
One of the following should be true:
ClusterExecoutput partitions are truly independently consumable, orClusterExecdoes not expose multi-partition output and instead internalizes the cross-partition coordination behind a single output partitionAt the moment it satisfies neither cleanly.
Likely fix direction
The most straightforward fix is to stop exposing the coordinator through multi-partition output:
HashPartitionedby contig)ClusterExecoutput partitioning to a single partitionexecute(0)drive all child input partitions internallyexecute(partition)forpartition != 0AwaitingOffsetsbarrier between output partitionsThis preserves globally unique, partition-count-invariant cluster ids without depending on the caller to poll all output streams concurrently.
If preserving multi-partition output is a hard requirement, that would need a larger redesign, such as:
Reproduction idea
Add an integration test that exercises DataFusion’s partitioned stream API directly:
Before the fix, a similar test that consumes only one stream from a multi-partition
ClusterExecshould be able to hang.Acceptance criteria
cluster()does not rely on sibling output partitions being polled in order to make progresscluster()stream(s) cannot hang because another output partition was never polledclusterids remain unique and partition-count invariant#146still passexecute_stream_partitioned()behaviorRelated