99 "github.com/stretchr/testify/require"
1010)
1111
12- // Tests for MultiplexedChannel fan-out, covering the fix for the goroutine
13- // leak that occurred when a subscriber disconnected mid-send.
12+ // Tests for MultiplexedChannel fan-out.
1413
1514const multiplexTestTimeout = 500 * time .Millisecond
1615
@@ -84,15 +83,15 @@ func TestMultiplexedChannel_BasicFanOut(t *testing.T) {
8483 assert .Equal (t , []int {1 , 2 , 3 }, gotB )
8584}
8685
87- // Regression: an abandoned consumer must not wedge the fan-out loop.
86+ // An abandoned consumer must not wedge the fan-out loop.
8887func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut (t * testing.T ) {
8988 t .Parallel ()
9089
9190 m := NewMultiplexedChannel [int ](1 )
9291
9392 abandoned , cancelAbandoned := m .Fork ()
9493
95- // Consumer reads one value then exits, modeling a disconnected client .
94+ // Consumer reads one value then exits.
9695 abandonReader := make (chan struct {})
9796 go func () {
9897 <- abandoned
@@ -110,7 +109,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) {
110109 t .Fatal ("abandoned consumer should have read its single value" )
111110 }
112111
113- // Simulate the handler's deferred cancel after return .
112+ // Cancel the subscriber .
114113 cancelDone := make (chan struct {})
115114 go func () {
116115 cancelAbandoned ()
@@ -122,18 +121,15 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) {
122121 t .Fatal ("cancel func did not return promptly; fan-out is wedged" )
123122 }
124123
125- // With no subscribers, the fan-out stops draining Source to apply
126- // back-pressure. After the in-flight value is consumed, Source
127- // fills and further sends block.
124+ // With no subscribers, back-pressure kicks in and further sends block.
128125 sent := 0
129126 for i := 2 ; i <= 8 ; i ++ {
130127 if ! sendOrTimeout (t , m .Source , i , 100 * time .Millisecond ) {
131128 break
132129 }
133130 sent ++
134131 }
135- // The buffer (size 1) plus at most one in-flight value means
136- // at most ~2 sends succeed before blocking.
132+ // Buffer (size 1) plus at most one in-flight value.
137133 assert .LessOrEqual (t , sent , 3 ,
138134 "fan-out should stop draining Source when no subscribers remain (back-pressure)" )
139135
@@ -146,7 +142,7 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotWedgeFanOut(t *testing.T) {
146142 }
147143}
148144
149- // Regression: an abandoned consumer must not starve other subscribers.
145+ // An abandoned consumer must not starve other subscribers.
150146func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers (t * testing.T ) {
151147 t .Parallel ()
152148
@@ -191,15 +187,15 @@ func TestMultiplexedChannel_AbandonedConsumerDoesNotStarveOthers(t *testing.T) {
191187 }
192188}
193189
194- // cancel must be idempotent and non-blocking even under producer load.
190+ // Cancel must be idempotent and non-blocking even under producer load.
195191func TestMultiplexedChannel_CancelIsIdempotentAndPrompt (t * testing.T ) {
196192 t .Parallel ()
197193
198194 m := NewMultiplexedChannel [int ](0 )
199195
200196 _ , cancel := m .Fork ()
201197
202- // Concurrently push values without anyone draining the consumer chan .
198+ // Push values without anyone draining.
203199 stop := make (chan struct {})
204200 producerDone := make (chan struct {})
205201 go func () {
@@ -286,44 +282,23 @@ func TestMultiplexedChannel_ForkAfterSourceCloseReturnsClosedChan(t *testing.T)
286282 assert .False (t , ok , "Fork after shutdown must return a pre-closed channel" )
287283}
288284
289- // Regression: sending to Source after the last subscriber cancelled
290- // must not deadlock. This reproduces the bug where handler.Wait()
291- // calls `p.EndEvent.Source <- event` after the handleStart/handleConnect
292- // handler has returned and run `defer endCancel()`.
293- //
294- // The fix is to give EndEvent a buffer of 1 so the single send in
295- // Wait() always succeeds, and then call CloseSource() so the fan-out
296- // goroutine can exit cleanly.
285+ // Sending to a buffered Source after the last subscriber cancelled must
286+ // not deadlock. This mirrors the EndEvent pattern in handler.Wait().
297287func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock (t * testing.T ) {
298288 t .Parallel ()
299289
300- // Mirror EndEvent: buffer=1 (the fix — was 0 before).
301290 m := NewMultiplexedChannel [int ](1 )
302291
303292 _ , cancel := m .Fork ()
304-
305- // Cancel the only subscriber immediately, without ever
306- // consuming a value. This simulates the case where
307- // handleStart returns (running `defer endCancel()`) before
308- // the child process exits and Wait() tries to send the
309- // EndEvent.
310293 cancel ()
311294
312- // Give the fan-out goroutine time to re-enter receiveWhenReady,
313- // observe HasSubscribers()==false, and park on <-sig.
314295 time .Sleep (50 * time .Millisecond )
315296
316- // Now attempt to send — this models handler.Wait()'s
317- // `p.EndEvent.Source <- event`. With buffer=1 the send
318- // succeeds immediately even though no subscriber is draining.
319297 sent := sendOrTimeout (t , m .Source , 1 , 2 * time .Second )
320298 if ! sent {
321- t .Fatal ("Source send deadlocked after last subscriber cancelled; " +
322- "EndEvent must use buffer >= 1 so Wait() never blocks" )
299+ t .Fatal ("Source send deadlocked after last subscriber cancelled" )
323300 }
324301
325- // CloseSource wakes the fan-out so it drains the buffered
326- // value and exits. Without this the fan-out goroutine leaks.
327302 m .CloseSource ()
328303
329304 select {
@@ -333,12 +308,8 @@ func TestMultiplexedChannel_SendToSourceAfterLastCancelDoesNotDeadlock(t *testin
333308 }
334309}
335310
336- // Regression: CloseSource after the last subscriber is cancelled must
337- // not leak the fan-out goroutine. This reproduces the pattern from
338- // start.go where startCancel() runs before CloseSource() (LIFO defer
339- // order). Previously start.go used bare close(Source) which bypassed
340- // the closed flag and NotifySubscriberChange, leaving the fan-out
341- // goroutine parked on <-sig forever.
311+ // CloseSource after the last subscriber is cancelled must not leak
312+ // the fan-out goroutine. Mirrors the LIFO defer order in handleStart.
342313func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut (t * testing.T ) {
343314 t .Parallel ()
344315
@@ -348,11 +319,8 @@ func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T
348319 m := NewMultiplexedChannel [int ](0 )
349320 _ , cancel := m .Fork ()
350321
351- // Simulate LIFO defer order in handleStart:
352- // 1. startCancel() runs first — removes subscriber
353- // 2. CloseSource() runs second (the fix — was bare close before)
354322 cancel ()
355- time .Sleep (10 * time .Millisecond ) // let fan-out park on <-sig
323+ time .Sleep (10 * time .Millisecond )
356324 m .CloseSource ()
357325
358326 select {
@@ -363,7 +331,7 @@ func TestMultiplexedChannel_CloseSourceAfterCancelDoesNotLeakFanOut(t *testing.T
363331 }
364332}
365333
366- // Fan-out goroutine must exit after cancelled subscribers and CloseSource.
334+ // Fan-out goroutine exits after cancelled subscribers and CloseSource.
367335func TestMultiplexedChannel_NoGoroutineLeakOnAbandon (t * testing.T ) {
368336 t .Parallel ()
369337
0 commit comments