Skip to content

Commit 16ae692

Browse files
JanKaulclaude
andcommitted
distributor_channels: zero-byte sends bypass the gate
Sentinel messages (end-of-stream `None`, error markers, spill markers) all flow through `DistributionSender::send` with `bytes = 0`. Previously they were subject to the same park-on-closed-gate logic as data sends, which deadlocked the pipeline under skew: 1. One output channel accumulates enough data to push total bytes above `max_buffered_bytes`. Gate B closes. 2. An input pump finishes its source and `wait_for_task` sends a `None` sentinel to every output channel. 3. The sentinel targeting the swollen non-empty channel parks behind the queued data (gate closed && !was_empty). 4. The consumer of the swollen channel keeps draining, but the sentinel never reaches it, so `remaining_partitions` never decrements and the consumer never learns the producer is done. 5. Result: AggFinal (or any downstream consumer) waits forever for EOF on partitions whose pumps completed long ago. A zero-byte send contributes nothing to the byte budget, so throttling it is semantically meaningless and operationally fatal. Allow it through unconditionally, keeping the EOF/error/spill control plane decoupled from the data-plane byte budget. Empirically, this is the deadlock at trace_lines ~169k–288k in TPC-H Q18 SF=100 with prefer_hash_join=false on the reclaimer composite: all 16 Partial aggregates EOF, 8 of 16 Repartition pumps for the l_orderkey hash partition signal output EOF; the other 8 are indefinitely parked sending sentinels to the swollen channel. Existing tests that used `.send(X, 0)` for data payloads were updated to `.send(X, 1)` so they continue to exercise the gate; a new test asserts the zero-byte bypass directly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7606f30 commit 16ae692

1 file changed

Lines changed: 82 additions & 42 deletions

File tree

datafusion/physical-plan/src/repartition/distributor_channels.rs

Lines changed: 82 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
//!
4949
//! As a soft cap, an empty channel is always allowed one push regardless of Gate B (the "overdraw escape"). Without
5050
//! this, a single batch larger than `max_buffered_bytes` would head-of-line block its consumer indefinitely.
51+
//!
52+
//! Zero-byte sends (end-of-stream `None`, error markers, spilled-batch markers) also bypass the gate
53+
//! unconditionally — they carry no in-memory bytes, so the byte budget cannot meaningfully back-pressure them, and
54+
//! parking them behind a swollen-channel backlog would starve the EOF/error/spill control plane and deadlock the
55+
//! pipeline.
5156
use std::{
5257
collections::VecDeque,
5358
future::Future,
@@ -252,12 +257,22 @@ impl<T> Future for SendFuture<'_, T> {
252257

253258
let was_empty = data.is_empty();
254259

255-
// Park if the gate is closed *and* this channel is not empty.
260+
// Park if the gate is closed *and* this channel is not empty *and* this push would
261+
// actually consume gate budget (`bytes > 0`).
262+
//
263+
// Three escapes from parking:
264+
//
265+
// - Overdraw escape: if `was_empty` is true, this channel's consumer has nothing to chew on. Blocking the
266+
// push would starve the consumer. Gate A cannot be closed when `was_empty` is true (the empty-channels
267+
// count is at least 1), so `was_empty` only short-circuits Gate B.
256268
//
257-
// Overdraw escape: if `was_empty` is true, this channel's consumer has nothing to chew on. Blocking the
258-
// push would starve the consumer. Gate A cannot be closed when `was_empty` is true (the empty-channels
259-
// count is at least 1), so `was_empty` only short-circuits Gate B.
260-
if !was_empty && this.gate.is_closed() {
269+
// - Zero-byte escape: sentinel sends (end-of-stream `None`, error markers, spill markers) carry no
270+
// in-memory bytes. Throttling them serves no purpose (they don't grow the budget) and parking them
271+
// deadlocks the pipeline: an EOF sentinel queued behind a backlog on a swollen channel never reaches
272+
// the consumer, who never decrements `remaining_partitions`, who never learns the producer finished.
273+
// Allowing zero-byte sends through unconditionally keeps the EOF/error/spill control plane independent
274+
// of the data-plane byte budget.
275+
if this.bytes > 0 && !was_empty && this.gate.is_closed() {
261276
// Re-check under the send_wakers lock to avoid a lost-wakeup race:
262277
// - if the gate is still closed, park.
263278
// - if the gate opened between our first read and acquiring this lock, fall through and push.
@@ -570,13 +585,13 @@ mod tests {
570585
let mut recv_fut = rxs[0].recv();
571586
let waker = poll_pending(&mut recv_fut);
572587

573-
poll_ready(&mut txs[0].send("foo", 0)).unwrap();
588+
poll_ready(&mut txs[0].send("foo", 1)).unwrap();
574589
assert!(waker.woken());
575590
assert_eq!(poll_ready(&mut recv_fut), Some("foo"),);
576591

577-
poll_ready(&mut txs[0].send("bar", 0)).unwrap();
578-
poll_ready(&mut txs[0].send("baz", 0)).unwrap();
579-
poll_ready(&mut txs[0].send("end", 0)).unwrap();
592+
poll_ready(&mut txs[0].send("bar", 1)).unwrap();
593+
poll_ready(&mut txs[0].send("baz", 1)).unwrap();
594+
poll_ready(&mut txs[0].send("end", 1)).unwrap();
580595
assert_eq!(poll_ready(&mut rxs[0].recv()), Some("bar"),);
581596
assert_eq!(poll_ready(&mut rxs[0].recv()), Some("baz"),);
582597

@@ -594,8 +609,8 @@ mod tests {
594609

595610
let tx_clone = txs[0].clone();
596611

597-
poll_ready(&mut txs[0].send("foo", 0)).unwrap();
598-
poll_ready(&mut tx_clone.send("bar", 0)).unwrap();
612+
poll_ready(&mut txs[0].send("foo", 1)).unwrap();
613+
poll_ready(&mut tx_clone.send("bar", 1)).unwrap();
599614

600615
assert_eq!(poll_ready(&mut rxs[0].recv()), Some("foo"),);
601616
assert_eq!(poll_ready(&mut rxs[0].recv()), Some("bar"),);
@@ -606,17 +621,17 @@ mod tests {
606621
let (txs, mut rxs) = channels(2, UNLIMITED);
607622

608623
// gate initially open
609-
poll_ready(&mut txs[0].send("0_a", 0)).unwrap();
624+
poll_ready(&mut txs[0].send("0_a", 1)).unwrap();
610625

611626
// gate still open because channel 1 is still empty
612-
poll_ready(&mut txs[0].send("0_b", 0)).unwrap();
627+
poll_ready(&mut txs[0].send("0_b", 1)).unwrap();
613628

614629
// gate still open because channel 1 is still empty prior to this call, so this call still goes through
615-
poll_ready(&mut txs[1].send("1_a", 0)).unwrap();
630+
poll_ready(&mut txs[1].send("1_a", 1)).unwrap();
616631

617632
// both channels non-empty => gate closed
618633

619-
let mut send_fut = txs[1].send("1_b", 0);
634+
let mut send_fut = txs[1].send("1_b", 1);
620635
let waker = poll_pending(&mut send_fut);
621636

622637
// drain channel 0
@@ -639,34 +654,34 @@ mod tests {
639654

640655
let mut recv_fut = rxs[0].recv();
641656

642-
poll_ready(&mut tx1.send("a", 0)).unwrap();
657+
poll_ready(&mut tx1.send("a", 1)).unwrap();
643658
let recv_waker = poll_pending(&mut recv_fut);
644659

645660
// drop original sender
646661
drop(tx0);
647662

648663
// not yet closed (there's a clone left)
649664
assert!(!recv_waker.woken());
650-
poll_ready(&mut tx1.send("b", 0)).unwrap();
665+
poll_ready(&mut tx1.send("b", 1)).unwrap();
651666
let recv_waker = poll_pending(&mut recv_fut);
652667

653668
// create new clone
654669
let tx0_clone2 = tx0_clone.clone();
655670
assert!(!recv_waker.woken());
656-
poll_ready(&mut tx1.send("c", 0)).unwrap();
671+
poll_ready(&mut tx1.send("c", 1)).unwrap();
657672
let recv_waker = poll_pending(&mut recv_fut);
658673

659674
// drop first clone
660675
drop(tx0_clone);
661676
assert!(!recv_waker.woken());
662-
poll_ready(&mut tx1.send("d", 0)).unwrap();
677+
poll_ready(&mut tx1.send("d", 1)).unwrap();
663678
let recv_waker = poll_pending(&mut recv_fut);
664679

665680
// drop last clone
666681
drop(tx0_clone2);
667682

668683
// channel closed => also close gate
669-
poll_pending(&mut tx1.send("e", 0));
684+
poll_pending(&mut tx1.send("e", 1));
670685
assert!(recv_waker.woken());
671686
assert_eq!(poll_ready(&mut recv_fut), None,);
672687
}
@@ -678,14 +693,14 @@ mod tests {
678693
let rx0 = rxs.remove(0);
679694
let _rx1 = rxs.remove(0);
680695

681-
poll_ready(&mut txs[1].send("a", 0)).unwrap();
696+
poll_ready(&mut txs[1].send("a", 1)).unwrap();
682697

683698
// drop receiver => also close gate
684699
drop(rx0);
685700

686-
poll_pending(&mut txs[1].send("b", 0));
701+
poll_pending(&mut txs[1].send("b", 1));
687702
assert_eq!(
688-
poll_ready(&mut txs[0].send("foo", 0)),
703+
poll_ready(&mut txs[0].send("foo", 1)),
689704
Err(SendError("foo")),
690705
);
691706
}
@@ -698,11 +713,11 @@ mod tests {
698713
let mut rx1 = rxs.remove(0);
699714

700715
// fill both channels
701-
poll_ready(&mut txs[0].send("0_a", 0)).unwrap();
702-
poll_ready(&mut txs[1].send("1_a", 0)).unwrap();
716+
poll_ready(&mut txs[0].send("0_a", 1)).unwrap();
717+
poll_ready(&mut txs[1].send("1_a", 1)).unwrap();
703718

704-
let mut send_fut0 = txs[0].send("0_b", 0);
705-
let mut send_fut1 = txs[1].send("1_b", 0);
719+
let mut send_fut0 = txs[0].send("0_b", 1);
720+
let mut send_fut1 = txs[1].send("1_b", 1);
706721
let waker0 = poll_pending(&mut send_fut0);
707722
let waker1 = poll_pending(&mut send_fut1);
708723

@@ -732,9 +747,9 @@ mod tests {
732747
let _rx2 = rxs.remove(0);
733748

734749
// fill channels
735-
poll_ready(&mut tx0.send("0_a", 0)).unwrap();
736-
poll_ready(&mut tx1.send("1_a", 0)).unwrap();
737-
poll_ready(&mut tx2.send("2_a", 0)).unwrap();
750+
poll_ready(&mut tx0.send("0_a", 1)).unwrap();
751+
poll_ready(&mut tx1.send("1_a", 1)).unwrap();
752+
poll_ready(&mut tx2.send("2_a", 1)).unwrap();
738753

739754
// drop / close one channel
740755
drop(rx1);
@@ -743,9 +758,9 @@ mod tests {
743758
assert_eq!(poll_ready(&mut rx0.recv()), Some("0_a"),);
744759

745760
// use senders again
746-
poll_ready(&mut tx0.send("0_b", 0)).unwrap();
747-
assert_eq!(poll_ready(&mut tx1.send("1_b", 0)), Err(SendError("1_b")),);
748-
poll_pending(&mut tx2.send("2_b", 0));
761+
poll_ready(&mut tx0.send("0_b", 1)).unwrap();
762+
assert_eq!(poll_ready(&mut tx1.send("1_b", 1)), Err(SendError("1_b")),);
763+
poll_pending(&mut tx2.send("2_b", 1));
749764
}
750765

751766
#[test]
@@ -757,7 +772,7 @@ mod tests {
757772
assert_eq!(counter.strong_count(), 1);
758773

759774
// add object to channel
760-
poll_ready(&mut txs[0].send(obj, 0)).unwrap();
775+
poll_ready(&mut txs[0].send(obj, 1)).unwrap();
761776
assert_eq!(counter.strong_count(), 1);
762777

763778
// drop receiver
@@ -778,14 +793,14 @@ mod tests {
778793
let mut recv_fut = rxs[0].recv();
779794
let waker_2 = poll_pending(&mut recv_fut);
780795

781-
poll_ready(&mut txs[0].send("a", 0)).unwrap();
796+
poll_ready(&mut txs[0].send("a", 1)).unwrap();
782797
assert!(waker_1a.woken());
783798
assert!(waker_1b.woken());
784799
assert!(waker_2.woken());
785800
assert_eq!(poll_ready(&mut recv_fut), Some("a"),);
786801

787-
poll_ready(&mut txs[0].send("b", 0)).unwrap();
788-
let mut send_fut = txs[0].send("c", 0);
802+
poll_ready(&mut txs[0].send("b", 1)).unwrap();
803+
let mut send_fut = txs[0].send("c", 1);
789804
let waker_3 = poll_pending(&mut send_fut);
790805
assert_eq!(poll_ready(&mut rxs[0].recv()), Some("b"),);
791806
assert!(waker_3.woken());
@@ -798,8 +813,8 @@ mod tests {
798813
let mut recv_fut = rxs[0].recv();
799814
let waker_5 = poll_pending(&mut recv_fut);
800815

801-
poll_ready(&mut txs[0].send("d", 0)).unwrap();
802-
let mut send_fut = txs[0].send("e", 0);
816+
poll_ready(&mut txs[0].send("d", 1)).unwrap();
817+
let mut send_fut = txs[0].send("e", 1);
803818
let waker_6a = poll_pending(&mut send_fut);
804819
let waker_6b = poll_pending(&mut send_fut);
805820

@@ -816,7 +831,7 @@ mod tests {
816831
#[should_panic(expected = "polled ready future")]
817832
fn test_panic_poll_send_future_after_ready_ok() {
818833
let (txs, _rxs) = channels(1, UNLIMITED);
819-
let mut fut = txs[0].send("foo", 0);
834+
let mut fut = txs[0].send("foo", 1);
820835
poll_ready(&mut fut).unwrap();
821836
poll_ready(&mut fut).ok();
822837
}
@@ -828,7 +843,7 @@ mod tests {
828843

829844
drop(rxs);
830845

831-
let mut fut = txs[0].send("foo", 0);
846+
let mut fut = txs[0].send("foo", 1);
832847
poll_ready(&mut fut).unwrap_err();
833848
poll_ready(&mut fut).ok();
834849
}
@@ -838,7 +853,7 @@ mod tests {
838853
fn test_panic_poll_recv_future_after_ready_some() {
839854
let (txs, mut rxs) = channels(1, UNLIMITED);
840855

841-
poll_ready(&mut txs[0].send("foo", 0)).unwrap();
856+
poll_ready(&mut txs[0].send("foo", 1)).unwrap();
842857

843858
let mut fut = rxs[0].recv();
844859
poll_ready(&mut fut).unwrap();
@@ -961,6 +976,31 @@ mod tests {
961976
assert!(waker_b.woken());
962977
}
963978

979+
/// Zero-byte sends (EOF sentinels, error markers, spill markers) bypass the gate even when the gate is
980+
/// closed and the target channel is non-empty. Without this escape, EOF queued behind a data backlog on a
981+
/// swollen channel would never reach the consumer.
982+
#[test]
983+
fn test_zero_byte_send_bypasses_closed_gate() {
984+
let (txs, mut rxs) = channels(2, 100);
985+
986+
// Saturate the byte budget on ch0 (overdraw on the second push allowed because the first put us at the limit).
987+
poll_ready(&mut txs[0].send("a", 50)).unwrap();
988+
poll_ready(&mut txs[0].send("b", 50)).unwrap();
989+
// Gate is now closed on Gate B (bytes >= max).
990+
991+
// A non-zero-byte send to non-empty ch0 parks.
992+
let mut data_fut = txs[0].send("data", 1);
993+
poll_pending(&mut data_fut);
994+
995+
// A zero-byte send to the same non-empty ch0 must NOT park — it carries no bytes, so throttling is
996+
// meaningless and parking would block EOF/error/spill control plane progress.
997+
poll_ready(&mut txs[0].send("eof", 0)).unwrap();
998+
999+
// Drain so the data future can also complete.
1000+
assert_eq!(poll_ready(&mut rxs[0].recv()), Some("a"));
1001+
poll_ready(&mut data_fut).unwrap();
1002+
}
1003+
9641004
/// Dropping a receiver returns its bytes to the budget and wakes parked senders on surviving channels.
9651005
#[test]
9661006
fn test_drop_receiver_returns_bytes_to_budget() {

0 commit comments

Comments
 (0)