Skip to content

Commit a0d58fb

Browse files
committed
fix(envd): fix data race in fan-out when subscriber is removed mid-iteration
run() copied only the slice header under RLock, sharing the backing array with remove(). When remove() shifted elements in-place via append(channels[:i], channels[i+1:]...), the fan-out's stale snapshot would skip one subscriber and deliver to another twice — silent stdout/stderr corruption. Deep-copy the slice under RLock so the iteration is immune to concurrent mutations. Fixes: a67f983 ("fix(envd): fix fan-out deadlock when process subscriber disconnects (#2579)")
1 parent 66daf3b commit a0d58fb

2 files changed

Lines changed: 88 additions & 1 deletion

File tree

packages/envd/internal/services/process/handler/multiplex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] {
5454
func (m *MultiplexedChannel[T]) run() {
5555
for v := range m.Source {
5656
m.mu.RLock()
57-
subs := m.channels
57+
subs := append([]*subscriber[T](nil), m.channels...)
5858
m.mu.RUnlock()
5959

6060
for _, s := range subs {

packages/envd/internal/services/process/handler/multiplex_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,3 +312,90 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa
312312
"before=%d after=%d (expected ~0)", leaked, wedges, before, after,
313313
)
314314
}
315+
316+
// Regression: removing a subscriber while the fan-out is mid-iteration
317+
// must not corrupt delivery to the remaining subscribers. Before the
318+
// fix, run() copied only the slice header (sharing the backing array),
319+
// so remove()'s in-place shift could cause duplicates and skips.
320+
// Run with -race to verify no data race.
321+
func TestMultiplexedChannel_RemoveDuringFanOutDoesNotCorrupt(t *testing.T) {
322+
t.Parallel()
323+
324+
const iterations = 500
325+
const values = 20
326+
327+
for iter := range iterations {
328+
m := NewMultiplexedChannel[int](values)
329+
330+
chA, cancelA := m.Fork()
331+
chB, cancelB := m.Fork()
332+
chC, cancelC := m.Fork()
333+
334+
// Pump values into the buffered Source — all fit without blocking.
335+
for i := 1; i <= values; i++ {
336+
m.Source <- i
337+
}
338+
339+
// A reads one value then cancels — triggers remove() while
340+
// the fan-out is delivering to the remaining subscribers.
341+
// No drain needed: after cancel the fan-out skips A via <-s.done.
342+
go func() {
343+
<-chA
344+
cancelA()
345+
}()
346+
347+
// B and C drain everything.
348+
bDone := make(chan []int, 1)
349+
cDone := make(chan []int, 1)
350+
go func() {
351+
var got []int
352+
for v := range chB {
353+
got = append(got, v)
354+
}
355+
bDone <- got
356+
}()
357+
go func() {
358+
var got []int
359+
for v := range chC {
360+
got = append(got, v)
361+
}
362+
cDone <- got
363+
}()
364+
365+
// Shut down. close(Source) causes run() to exit and close
366+
// all remaining subscriber channels (B and C), which lets
367+
// the drainer goroutines finish and send on bDone/cDone.
368+
close(m.Source)
369+
370+
// Collect results. B and C channels are closed by run()'s
371+
// cleanup, so the drainers will terminate. Cancel only
372+
// after collecting to avoid racing with run()'s cleanup.
373+
bGot := <-bDone
374+
cGot := <-cDone
375+
376+
cancelB()
377+
cancelC()
378+
379+
// B and C must each receive all values exactly once.
380+
if len(bGot) != values {
381+
t.Errorf("iter %d: B got %d values, want %d: %v", iter, len(bGot), values, bGot)
382+
}
383+
if len(cGot) != values {
384+
t.Errorf("iter %d: C got %d values, want %d: %v", iter, len(cGot), values, cGot)
385+
}
386+
387+
cCount := map[int]int{}
388+
for _, v := range cGot {
389+
cCount[v]++
390+
}
391+
for v, n := range cCount {
392+
if n > 1 {
393+
t.Errorf("iter %d: C got value %d %d times (duplicate delivery)", iter, v, n)
394+
}
395+
}
396+
397+
if t.Failed() {
398+
break
399+
}
400+
}
401+
}

0 commit comments

Comments
 (0)