From 3f0b1f8948c507a9babe3265035881706f262815 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Wed, 13 May 2026 16:44:08 +0200 Subject: [PATCH 1/5] test(envd): add data race in fan-out when subscriber is removed mid-iteration reproducer --- .../process/handler/multiplex_test.go | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index ae79495157..f70a1464e1 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -312,3 +312,90 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa "before=%d after=%d (expected ~0)", leaked, wedges, before, after, ) } + +// Regression: removing a subscriber while the fan-out is mid-iteration +// must not corrupt delivery to the remaining subscribers. Before the +// fix, run() copied only the slice header (sharing the backing array), +// so remove()'s in-place shift could cause duplicates and skips. +// Run with -race to verify no data race. +func TestMultiplexedChannel_RemoveDuringFanOutDoesNotCorrupt(t *testing.T) { + t.Parallel() + + const iterations = 500 + const values = 20 + + for iter := range iterations { + m := NewMultiplexedChannel[int](values) + + chA, cancelA := m.Fork() + chB, cancelB := m.Fork() + chC, cancelC := m.Fork() + + // Pump values into the buffered Source — all fit without blocking. + for i := 1; i <= values; i++ { + m.Source <- i + } + + // A reads one value then cancels — triggers remove() while + // the fan-out is delivering to the remaining subscribers. + // No drain needed: after cancel the fan-out skips A via <-s.done. + go func() { + <-chA + cancelA() + }() + + // B and C drain everything. + bDone := make(chan []int, 1) + cDone := make(chan []int, 1) + go func() { + var got []int + for v := range chB { + got = append(got, v) + } + bDone <- got + }() + go func() { + var got []int + for v := range chC { + got = append(got, v) + } + cDone <- got + }() + + // Shut down. close(Source) causes run() to exit and close + // all remaining subscriber channels (B and C), which lets + // the drainer goroutines finish and send on bDone/cDone. + close(m.Source) + + // Collect results. B and C channels are closed by run()'s + // cleanup, so the drainers will terminate. Cancel only + // after collecting to avoid racing with run()'s cleanup. + bGot := <-bDone + cGot := <-cDone + + cancelB() + cancelC() + + // B and C must each receive all values exactly once. + if len(bGot) != values { + t.Errorf("iter %d: B got %d values, want %d: %v", iter, len(bGot), values, bGot) + } + if len(cGot) != values { + t.Errorf("iter %d: C got %d values, want %d: %v", iter, len(cGot), values, cGot) + } + + cCount := map[int]int{} + for _, v := range cGot { + cCount[v]++ + } + for v, n := range cCount { + if n > 1 { + t.Errorf("iter %d: C got value %d %d times (duplicate delivery)", iter, v, n) + } + } + + if t.Failed() { + break + } + } +} From 3aea0b709c0c3f516632525e1856b18b316276b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Thu, 14 May 2026 09:00:45 +0200 Subject: [PATCH 2/5] fix(envd): fix data race in fan-out when subscriber is removed mid-iteration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run() snapshots the m.channels slice header under RLock and iterates after unlocking. remove() used append(channels[:i], channels[i+1:]...) which shifts the shared backing array in-place, corrupting the concurrent iteration — a subscriber can receive a value twice while another misses it entirely. Fix remove() to allocate a new backing array via slices.Concat so the old slice that run() holds remains stable. This moves the allocation to the cold path (subscriber disconnect) instead of the hot path (every fan-out delivery). Fixes: a67f9830cda7 ("fix(envd): fix fan-out deadlock when process subscriber disconnects (#2579)") --- packages/envd/internal/services/process/handler/multiplex.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index 131f097981..45b88e68ad 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -1,6 +1,7 @@ package handler import ( + "slices" "sync" "sync/atomic" ) @@ -128,7 +129,8 @@ func (m *MultiplexedChannel[T]) remove(s *subscriber[T]) { for i, sub := range m.channels { if sub == s { - m.channels = append(m.channels[:i], m.channels[i+1:]...) + // New backing array so run()'s lock-free iteration is safe. + m.channels = slices.Concat(m.channels[:i], m.channels[i+1:]) return } From 27cf1eabf710f83982f3050bbc8ac85b0b0a740f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Wed, 13 May 2026 17:10:25 +0200 Subject: [PATCH 3/5] chore(envd): bump version to 0.5.18 --- packages/envd/pkg/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/envd/pkg/version.go b/packages/envd/pkg/version.go index a5ccaf4cab..e251abd82f 100644 --- a/packages/envd/pkg/version.go +++ b/packages/envd/pkg/version.go @@ -1,3 +1,3 @@ package pkg -const Version = "0.5.17" +const Version = "0.5.18" From 543198979529859ae236b5d1f480e994e797cb29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Wed, 13 May 2026 18:00:10 +0200 Subject: [PATCH 4/5] test(envd): drop the reproducer --- .../process/handler/multiplex_test.go | 87 ------------------- 1 file changed, 87 deletions(-) diff --git a/packages/envd/internal/services/process/handler/multiplex_test.go b/packages/envd/internal/services/process/handler/multiplex_test.go index f70a1464e1..ae79495157 100644 --- a/packages/envd/internal/services/process/handler/multiplex_test.go +++ b/packages/envd/internal/services/process/handler/multiplex_test.go @@ -312,90 +312,3 @@ func TestMultiplexedChannel_NoGoroutineLeakOnAbandon(t *testing.T) { //nolint:pa "before=%d after=%d (expected ~0)", leaked, wedges, before, after, ) } - -// Regression: removing a subscriber while the fan-out is mid-iteration -// must not corrupt delivery to the remaining subscribers. Before the -// fix, run() copied only the slice header (sharing the backing array), -// so remove()'s in-place shift could cause duplicates and skips. -// Run with -race to verify no data race. -func TestMultiplexedChannel_RemoveDuringFanOutDoesNotCorrupt(t *testing.T) { - t.Parallel() - - const iterations = 500 - const values = 20 - - for iter := range iterations { - m := NewMultiplexedChannel[int](values) - - chA, cancelA := m.Fork() - chB, cancelB := m.Fork() - chC, cancelC := m.Fork() - - // Pump values into the buffered Source — all fit without blocking. - for i := 1; i <= values; i++ { - m.Source <- i - } - - // A reads one value then cancels — triggers remove() while - // the fan-out is delivering to the remaining subscribers. - // No drain needed: after cancel the fan-out skips A via <-s.done. - go func() { - <-chA - cancelA() - }() - - // B and C drain everything. - bDone := make(chan []int, 1) - cDone := make(chan []int, 1) - go func() { - var got []int - for v := range chB { - got = append(got, v) - } - bDone <- got - }() - go func() { - var got []int - for v := range chC { - got = append(got, v) - } - cDone <- got - }() - - // Shut down. close(Source) causes run() to exit and close - // all remaining subscriber channels (B and C), which lets - // the drainer goroutines finish and send on bDone/cDone. - close(m.Source) - - // Collect results. B and C channels are closed by run()'s - // cleanup, so the drainers will terminate. Cancel only - // after collecting to avoid racing with run()'s cleanup. - bGot := <-bDone - cGot := <-cDone - - cancelB() - cancelC() - - // B and C must each receive all values exactly once. - if len(bGot) != values { - t.Errorf("iter %d: B got %d values, want %d: %v", iter, len(bGot), values, bGot) - } - if len(cGot) != values { - t.Errorf("iter %d: C got %d values, want %d: %v", iter, len(cGot), values, cGot) - } - - cCount := map[int]int{} - for _, v := range cGot { - cCount[v]++ - } - for v, n := range cCount { - if n > 1 { - t.Errorf("iter %d: C got value %d %d times (duplicate delivery)", iter, v, n) - } - } - - if t.Failed() { - break - } - } -} From a354a07a4ee481ba67ed40c950967b0a9844d11f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Van=C4=9Bk?= Date: Thu, 14 May 2026 09:33:55 +0200 Subject: [PATCH 5/5] fixup! fix(envd): fix data race in fan-out when subscriber is removed mid-iteration --- packages/envd/internal/services/process/handler/multiplex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/envd/internal/services/process/handler/multiplex.go b/packages/envd/internal/services/process/handler/multiplex.go index 45b88e68ad..ff699812db 100644 --- a/packages/envd/internal/services/process/handler/multiplex.go +++ b/packages/envd/internal/services/process/handler/multiplex.go @@ -129,7 +129,7 @@ func (m *MultiplexedChannel[T]) remove(s *subscriber[T]) { for i, sub := range m.channels { if sub == s { - // New backing array so run()'s lock-free iteration is safe. + // New backing array so run()'s concurrent iteration is safe. m.channels = slices.Concat(m.channels[:i], m.channels[i+1:]) return