Skip to content

Commit 8b9a572

Browse files
committed
fix(envd): fix data race in fan-out when subscriber is removed mid-iteration
run() copied only the slice header under RLock, sharing the backing array with remove(). When remove() shifted elements in-place via append(channels[:i], channels[i+1:]...), the fan-out's stale snapshot would skip one subscriber and deliver to another twice — silent stdout/stderr corruption. Deep-copy the slice under RLock so the iteration is immune to concurrent mutations. Fixes: a67f983 ("fix(envd): fix fan-out deadlock when process subscriber disconnects (#2579)")
1 parent 66daf3b commit 8b9a572

2 files changed

Lines changed: 96 additions & 1 deletion

File tree

packages/envd/internal/services/process/handler/multiplex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func NewMultiplexedChannel[T any](buffer int) *MultiplexedChannel[T] {
5454
func (m *MultiplexedChannel[T]) run() {
5555
for v := range m.Source {
5656
m.mu.RLock()
57-
subs := m.channels
57+
subs := append([]*subscriber[T](nil), m.channels...)
5858
m.mu.RUnlock()
5959

6060
for _, s := range subs {

packages/envd/internal/services/process/handler/multiplex_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,3 +312,98 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa
312312
"before=%d after=%d (expected ~0)", leaked, wedges, before, after,
313313
)
314314
}
315+
316+
// Regression: removing a subscriber while the fan-out is mid-iteration
317+
// must not corrupt delivery to the remaining subscribers. Before the
318+
// fix, run() copied only the slice header (sharing the backing array),
319+
// so remove()'s in-place shift could cause duplicates and skips.
320+
// Run with -race to verify no data race.
321+
func TestMultiplexedChannel_RemoveDuringFanOutDoesNotCorrupt(t *testing.T) {
322+
t.Parallel()
323+
324+
const iterations = 500
325+
const values = 20
326+
327+
for iter := range iterations {
328+
m := NewMultiplexedChannel[int](values)
329+
330+
chA, cancelA := m.Fork()
331+
chB, cancelB := m.Fork()
332+
chC, cancelC := m.Fork()
333+
334+
// Pump values into the buffered Source — all fit without blocking.
335+
for i := 1; i <= values; i++ {
336+
m.Source <- i
337+
}
338+
339+
// A reads one value then cancels — triggers remove() while
340+
// the fan-out is delivering to the remaining subscribers.
341+
go func() {
342+
<-chA
343+
cancelA()
344+
for range chA {
345+
}
346+
}()
347+
348+
// B and C drain everything.
349+
bDone := make(chan []int, 1)
350+
cDone := make(chan []int, 1)
351+
go func() {
352+
var got []int
353+
for v := range chB {
354+
got = append(got, v)
355+
}
356+
bDone <- got
357+
}()
358+
go func() {
359+
var got []int
360+
for v := range chC {
361+
got = append(got, v)
362+
}
363+
cDone <- got
364+
}()
365+
366+
// Shut down.
367+
close(m.Source)
368+
369+
// Wait for the fan-out to finish.
370+
deadline := time.Now().Add(5 * time.Second)
371+
for !m.exited.Load() {
372+
if time.Now().After(deadline) {
373+
cancelA()
374+
cancelB()
375+
cancelC()
376+
t.Fatalf("iter %d: fan-out did not exit", iter)
377+
}
378+
time.Sleep(time.Millisecond)
379+
}
380+
381+
cancelB()
382+
cancelC()
383+
384+
bGot := <-bDone
385+
cGot := <-cDone
386+
387+
// B and C must each receive all values exactly once.
388+
if len(bGot) != values {
389+
t.Errorf("iter %d: B got %d values, want %d: %v", iter, len(bGot), values, bGot)
390+
}
391+
if len(cGot) != values {
392+
t.Errorf("iter %d: C got %d values, want %d: %v", iter, len(cGot), values, cGot)
393+
}
394+
395+
cCount := map[int]int{}
396+
for _, v := range cGot {
397+
cCount[v]++
398+
}
399+
for v, n := range cCount {
400+
if n > 1 {
401+
t.Errorf("iter %d: C got value %d %d times (duplicate delivery)", iter, v, n)
402+
}
403+
}
404+
405+
if t.Failed() {
406+
break
407+
}
408+
}
409+
}

0 commit comments

Comments
 (0)