Skip to content

cluster(): partitioned execution can hang because output streams wait for all partitions #147

@mwiewior

Description

@mwiewior

Summary

cluster() can expose a partitioned ExecutionPlan whose per-partition output streams are not independently consumable.

After the fix in PR #145, ClusterExec computes globally unique cluster ids by introducing a cross-partition coordinator. Each output partition stream now waits until all partitions have reported cluster counts before it can emit rows.

That fixes correctness for global cluster ids, but it also means a caller can hang indefinitely if it consumes only some partition streams, for example via execute_stream_partitioned().

This is a follow-up issue to PR #145 and is distinct from #146:

  • #146 is about duplicate cluster ids across partitions
  • this issue is about ClusterExec violating the expectation that execute(partition) streams can be consumed independently

Current behavior

In datafusion/bio-function-ranges/src/cluster.rs, ClusterExec still advertises multi-partition output and returns one stream per output partition.

Internally, however, each stream enters a rendezvous path where it returns Poll::Pending until:

  • every partition has reported counts
  • global offsets have been computed for all contigs

The relevant path is the ClusterIdCoordinator / poll_cluster_offsets() logic added in PR #145.

Why this is a problem

DataFusion’s partitioned execution API exposes output streams partition-by-partition.

execute_stream_partitioned() constructs the vector of streams by calling plan.execute(i, ...) for each output partition and returns them to the caller. It does not drive sibling streams automatically.

That means this pattern is valid:

let mut streams = df.execute_stream_partitioned().await?;
let first = streams.remove(0);
let batches: Vec<_> = first.try_collect().await?;

With the current ClusterExec design, the first stream can block forever if the other streams are never polled far enough to register with the coordinator.

So the plan currently has a hidden cross-partition dependency while still presenting itself as multi-partition output.

Expected behavior

One of the following should be true:

  1. ClusterExec output partitions are truly independently consumable, or
  2. ClusterExec does not expose multi-partition output and instead internalizes the cross-partition coordination behind a single output partition

At the moment it satisfies neither cleanly.

Likely fix direction

The most straightforward fix is to stop exposing the coordinator through multi-partition output:

  1. keep the required input distribution (HashPartitioned by contig)
  2. change ClusterExec output partitioning to a single partition
  3. make execute(0) drive all child input partitions internally
  4. collect per-partition groups/counts, compute global offsets once, then emit final rows from the single output stream
  5. reject execute(partition) for partition != 0
  6. remove the externally visible AwaitingOffsets barrier between output partitions

This preserves globally unique, partition-count-invariant cluster ids without depending on the caller to poll all output streams concurrently.

If preserving multi-partition output is a hard requirement, that would need a larger redesign, such as:

  • local cluster stage per partition
  • single-partition renumbering / global offset stage
  • optional repartition after global ids have been assigned

Reproduction idea

Add an integration test that exercises DataFusion’s partitioned stream API directly:

let df = ctx.sql("SELECT * FROM cluster('reads')").await?;
let mut streams = df.execute_stream_partitioned().await?;
assert_eq!(streams.len(), 1 /* after the fix */);

let result = tokio::time::timeout(
    std::time::Duration::from_secs(5),
    streams.remove(0).try_collect(),
).await;
assert!(result.is_ok());

Before the fix, a similar test that consumes only one stream from a multi-partition ClusterExec should be able to hang.

Acceptance criteria

  • cluster() does not rely on sibling output partitions being polled in order to make progress
  • consuming only the returned cluster() stream(s) cannot hang because another output partition was never polled
  • global cluster ids remain unique and partition-count invariant
  • existing regressions for issue #146 still pass
  • add a targeted test covering execute_stream_partitioned() behavior

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions