Skip to content

fix(envd): discard output safely when no subscriber is connected#2639

Draft
arkamar wants to merge 3 commits into
mainfrom
fix/envd-discard-output-on-disconnect
Draft

fix(envd): discard output safely when no subscriber is connected#2639
arkamar wants to merge 3 commits into
mainfrom
fix/envd-discard-output-on-disconnect

Conversation

@arkamar
Copy link
Copy Markdown
Contributor

@arkamar arkamar commented May 13, 2026

When a Start/Connect client disconnects while a process is producing output, the fan-out loop in MultiplexedChannel blocks trying to deliver to the dead subscriber's channel. This back-pressures the reader goroutine, which allocates a fresh 32 KiB buffer on every Read call, causing envd RSS to grow unboundedly.

The fix makes the fan-out skip cancelled subscribers and keep draining Source to void when none remain, so the child process is never back-pressured. The reader goroutines now reuse a single buffer and only allocate+copy when HasSubscribers() returns true. The atomic.Bool exit flag is replaced with a done channel -- this is more idiomatic for broadcast shutdown signaling and also makes tests deterministic, replacing the old runtime.NumGoroutine() polling with a direct select on <-m.done.


Alternative solution to #2620

arkamar added 2 commits May 13, 2026 14:38
…ing process

When a Start/Connect client disconnects while a process is producing
output, the fan-out loop drained the Source channel with no
subscribers, allocating 32 KiB per read cycle. With a fast producer,
envd RSS grew to hundreds of MiB in seconds, OOM-killing sandbox
processes.

Readers now reuse a single read buffer and only allocate + send when
HasSubscribers is true. The fan-out always consumes from Source and
drops values with no subscribers. The child process is never blocked
so servers and background processes stay responsive during disconnects.
@cursor
Copy link
Copy Markdown

cursor Bot commented May 13, 2026

PR Summary

Medium Risk
Touches concurrent fan-out and process I/O streaming behavior; regressions could drop output or hang/terminate streams under load or disconnect scenarios.

Overview
Fixes a deadlock/backpressure path where MultiplexedChannel could block on cancelled subscribers, causing stdout/stderr readers to allocate repeatedly and grow RSS when no clients are connected. Readers now reuse a buffer and only allocate/copy output when HasSubscribers() is true, while the fan-out loop always drains Source and uses a done channel (instead of an atomic flag) for deterministic shutdown; tests were updated/added to cover disconnects, shutdown ordering, and output delivery edge cases, and the version is bumped to 0.5.18.

Reviewed by Cursor Bugbot for commit 47ee515. Bugbot is set up for automated code reviews on this repo. Configure here.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 13, 2026

❌ 9 Tests Failed:

Tests completed Failed Passed Skipped
2621 9 2612 7
View the full list of 13 ❄️ flaky test(s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestEgressFirewallAllowSpecificIP

Flake rate in main: 54.88% (Passed 148 times, Failed 180 times)

Stack Traces | 14.8s run time
=== RUN   TestEgressFirewallAllowSpecificIP
=== PAUSE TestEgressFirewallAllowSpecificIP
=== CONT  TestEgressFirewallAllowSpecificIP
    sandbox_network_out_test.go:117: Sandbox creation failed status=500 body={"code":500,"message":"Failed to place sandbox"}
    sandbox_network_out_test.go:117: Sandbox creation={Body:[123 34 99 111 100 101 34 58 53 48 48 44 34 109 101 115 115 97 103 101 34 58 34 70 97 105 108 101 100 32 116 111 32 112 108 97 99 101 32 115 97 110 100 98 111 120 34 125] HTTPResponse:0xc0009db290 JSON201:<nil> JSON400:<nil> JSON401:<nil> JSON500:0xc00081bdd0}
    sandbox_network_out_test.go:117: 
        	Error Trace:	.../internal/utils/sandbox.go:163
        	            				.../api/sandboxes/sandbox_network_out_test.go:117
        	Error:      	Not equal: 
        	            	expected: 201
        	            	actual  : 500
        	Test:       	TestEgressFirewallAllowSpecificIP
--- FAIL: TestEgressFirewallAllowSpecificIP (14.84s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestEgressFirewallBlockSpecificIP

Flake rate in main: 55.15% (Passed 148 times, Failed 182 times)

Stack Traces | 14.9s run time
=== RUN   TestEgressFirewallBlockSpecificIP
=== PAUSE TestEgressFirewallBlockSpecificIP
=== CONT  TestEgressFirewallBlockSpecificIP
    sandbox_network_out_test.go:143: Sandbox creation failed status=500 body={"code":500,"message":"Failed to place sandbox"}
    sandbox_network_out_test.go:143: Sandbox creation={Body:[123 34 99 111 100 101 34 58 53 48 48 44 34 109 101 115 115 97 103 101 34 58 34 70 97 105 108 101 100 32 116 111 32 112 108 97 99 101 32 115 97 110 100 98 111 120 34 125] HTTPResponse:0xc0009daf30 JSON201:<nil> JSON400:<nil> JSON401:<nil> JSON500:0xc00081bc68}
    sandbox_network_out_test.go:143: 
        	Error Trace:	.../internal/utils/sandbox.go:163
        	            				.../api/sandboxes/sandbox_network_out_test.go:143
        	Error:      	Not equal: 
        	            	expected: 201
        	            	actual  : 500
        	Test:       	TestEgressFirewallBlockSpecificIP
--- FAIL: TestEgressFirewallBlockSpecificIP (14.88s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestEgressFirewallPersistsAfterResume

Flake rate in main: 55.15% (Passed 148 times, Failed 182 times)

Stack Traces | 16.2s run time
=== RUN   TestEgressFirewallPersistsAfterResume
=== PAUSE TestEgressFirewallPersistsAfterResume
=== CONT  TestEgressFirewallPersistsAfterResume
    sandbox_network_out_test.go:257: Command [curl] output: event:{start:{pid:1316}}
    sandbox_network_out_test.go:257: Command [curl] output: event:{data:{stdout:"HTTP/2 302 \r\nx-content-type-options: nosniff\r\nlocation: https://dns.google/\r\ndate: Wed, 13 May 2026 13:54:55 GMT\r\ncontent-type: text/html; charset=UTF-8\r\nserver: HTTP server (unknown)\r\ncontent-length: 216\r\nx-xss-protection: 0\r\nx-frame-options: SAMEORIGIN\r\nalt-svc: h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000\r\n\r\n"}}
    sandbox_network_out_test.go:257: Command [curl] output: event:{end:{exited:true status:"exit status 0"}}
    sandbox_network_out_test.go:257: Command [curl] completed successfully in sandbox il5tbllskcymm9mm8gz3q
Executing command curl in sandbox il5tbllskcymm9mm8gz3q
    sandbox_network_out_test.go:258: Command [curl] output: event:{start:{pid:1317}}
    sandbox_network_out_test.go:258: Command [curl] output: event:{end:{exit_code:35 exited:true status:"exit status 35" error:"exit status 35"}}
Executing command curl in sandbox i56sp6kncusjxgjw4jii0
    sandbox_network_out_test.go:270: 
        	Error Trace:	.../api/sandboxes/sandbox_network_out_test.go:270
        	Error:      	Not equal: 
        	            	expected: 201
        	            	actual  : 500
        	Test:       	TestEgressFirewallPersistsAfterResume
        	Messages:   	Expected status code 201 Created, got 500
--- FAIL: TestEgressFirewallPersistsAfterResume (16.17s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestUpdateNetworkConfig

Flake rate in main: 76.54% (Passed 156 times, Failed 509 times)

Stack Traces | 37.6s run time
=== RUN   TestUpdateNetworkConfig
=== PAUSE TestUpdateNetworkConfig
=== CONT  TestUpdateNetworkConfig
--- FAIL: TestUpdateNetworkConfig (37.57s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false

Flake rate in main: 77.03% (Passed 150 times, Failed 503 times)

Stack Traces | 5.67s run time
=== RUN   TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false
Executing command curl in sandbox i6gd4885baiyyyy7ay6v3
    sandbox_network_update_test.go:372: Command [curl] output: event:{start:{pid:1359}}
    sandbox_network_update_test.go:372: Command [curl] output: event:{end:{exit_code:35 exited:true status:"exit status 35" error:"exit status 35"}}
Executing command curl in sandbox ixwahdxd2zhfg5ekz99zi
    sandbox_network_update_test.go:372: Command [curl] output: event:{start:{pid:1360}}
    sandbox_network_update_test.go:372: Command [curl] output: event:{end:{exit_code:35 exited:true status:"exit status 35" error:"exit status 35"}}
    sandbox_network_update_test.go:391: Command [curl] output: event:{start:{pid:1361}}
    sandbox_network_update_test.go:391: Command [curl] output: event:{data:{stdout:"HTTP/2 302 \r\nx-content-type-options: nosniff\r\nlocation: https://dns.google/\r\ndate: Wed, 13 May 2026 13:56:09 GMT\r\ncontent-type: text/html; charset=UTF-8\r\nserver: HTTP server (unknown)\r\ncontent-length: 216\r\nx-xss-protection: 0\r\nx-frame-options: SAMEORIGIN\r\nalt-svc: h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000\r\n\r\n"}}
    sandbox_network_update_test.go:391: Command [curl] output: event:{end:{exited:true status:"exit status 0"}}
    sandbox_network_update_test.go:391: Command [curl] completed successfully in sandbox ixwahdxd2zhfg5ekz99zi
    sandbox_network_update_test.go:391: 
        	Error Trace:	.../api/sandboxes/sandbox_network_out_test.go:74
        	            				.../api/sandboxes/sandbox_network_update_test.go:60
        	            				.../api/sandboxes/sandbox_network_update_test.go:391
        	Error:      	An error is expected but got nil.
        	Test:       	TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false
        	Messages:   	https://8.8.8.8 should be blocked
--- FAIL: TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false (5.67s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/templates::TestTemplateBuildENV

Flake rate in main: 58.99% (Passed 146 times, Failed 210 times)

Stack Traces | 0s run time
=== RUN   TestTemplateBuildENV
=== PAUSE TestTemplateBuildENV
=== CONT  TestTemplateBuildENV
--- FAIL: TestTemplateBuildENV (0.00s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/templates::TestTemplateBuildENV/ENV_with_multiline_value

Flake rate in main: 59.83% (Passed 139 times, Failed 207 times)

Stack Traces | 25.9s run time
=== RUN   TestTemplateBuildENV/ENV_with_multiline_value
=== PAUSE TestTemplateBuildENV/ENV_with_multiline_value
=== CONT  TestTemplateBuildENV/ENV_with_multiline_value
    build_template_test.go:134: test-ubuntu-env-multiline: [info] Building template kpuoq2c1tloji6veuv8g/0cf619d8-e1f4-4af9-b2a1-c30931999400
    build_template_test.go:134: test-ubuntu-env-multiline: [info] CACHED [base] FROM ubuntu:22.04 [ffd709f131f42dfab282de47a91dd2c139e900c1c11fc574b49b517a05ef0a32]
    build_template_test.go:134: test-ubuntu-env-multiline: [info] CACHED [base] DEFAULT USER user [90bdd4afa342293c931373351bf578872dec9179214ba3e8bf9edba311466213]
    build_template_test.go:134: test-ubuntu-env-multiline: [info] [builder 1/2] ENV MULTILINE line1
        line2
        line3 [e93da3f3765f20eb6407c336b9e4e0b9321d994ec5f6cb547743a2a4070eed23]
    build_template_test.go:134: test-ubuntu-env-multiline: [info] [builder 2/2] RUN [[ $(echo "$MULTILINE" | wc -l) -eq 3 ]] || exit 1 [477610d61cdf858776262d3331809539bcbcf16f706aac18515a57337bae1786]
    build_template_test.go:134: test-ubuntu-env-multiline: [error] Build failed: failed to run command '[[ $(echo "$MULTILINE" | wc -l) -eq 3 ]] || exit 1': exit status 1
    build_template_test.go:374: Build failed: {<nil> failed to run command '[[ $(echo "$MULTILINE" | wc -l) -eq 3 ]] || exit 1': exit status 1 0xc000484f30}
--- FAIL: TestTemplateBuildENV/ENV_with_multiline_value (25.85s)
github.com/e2b-dev/infra/tests/integration/internal/tests/envd::TestBindLocalhost

Flake rate in main: 57.14% (Passed 258 times, Failed 344 times)

Stack Traces | 0s run time
=== RUN   TestBindLocalhost
=== PAUSE TestBindLocalhost
=== CONT  TestBindLocalhost
--- FAIL: TestBindLocalhost (0.00s)
github.com/e2b-dev/infra/tests/integration/internal/tests/envd::TestBindLocalhost/bind_0_0_0_0

Flake rate in main: 62.85% (Passed 146 times, Failed 247 times)

Stack Traces | 7.31s run time
=== RUN   TestBindLocalhost/bind_0_0_0_0
=== PAUSE TestBindLocalhost/bind_0_0_0_0
=== CONT  TestBindLocalhost/bind_0_0_0_0
    localhost_bind_test.go:69: Command [python] output: event:{start:{pid:1263}}
Executing command python in sandbox izdubvpssrk9s5dtadwty
    localhost_bind_test.go:90: 
        	Error Trace:	.../tests/envd/localhost_bind_test.go:90
        	Error:      	Not equal: 
        	            	expected: 200
        	            	actual  : 502
        	Test:       	TestBindLocalhost/bind_0_0_0_0
        	Messages:   	Unexpected status code 502 for bind address 0.0.0.0
--- FAIL: TestBindLocalhost/bind_0_0_0_0 (7.31s)
github.com/e2b-dev/infra/tests/integration/internal/tests/envd::TestBindLocalhost/bind_::1

Flake rate in main: 64.73% (Passed 146 times, Failed 268 times)

Stack Traces | 7.38s run time
=== RUN   TestBindLocalhost/bind_::1
=== PAUSE TestBindLocalhost/bind_::1
=== CONT  TestBindLocalhost/bind_::1
    localhost_bind_test.go:69: Command [python] output: event:{start:{pid:1263}}
Executing command python in sandbox idh70kbujpkqib6z5qfxx
    localhost_bind_test.go:90: 
        	Error Trace:	.../tests/envd/localhost_bind_test.go:90
        	Error:      	Not equal: 
        	            	expected: 200
        	            	actual  : 502
        	Test:       	TestBindLocalhost/bind_::1
        	Messages:   	Unexpected status code 502 for bind address ::1
--- FAIL: TestBindLocalhost/bind_::1 (7.38s)
github.com/e2b-dev/infra/tests/integration/internal/tests/envd::TestBindLocalhost/bind_localhost

Flake rate in main: 64.65% (Passed 146 times, Failed 267 times)

Stack Traces | 7.93s run time
=== RUN   TestBindLocalhost/bind_localhost
=== PAUSE TestBindLocalhost/bind_localhost
=== CONT  TestBindLocalhost/bind_localhost
    localhost_bind_test.go:69: Command [python] output: event:{start:{pid:1263}}
    localhost_bind_test.go:90: 
        	Error Trace:	.../tests/envd/localhost_bind_test.go:90
        	Error:      	Not equal: 
        	            	expected: 200
        	            	actual  : 502
        	Test:       	TestBindLocalhost/bind_localhost
        	Messages:   	Unexpected status code 502 for bind address localhost
Executing command python in sandbox imh7vsbvgn8xjnfwhul5c
--- FAIL: TestBindLocalhost/bind_localhost (7.93s)
github.com/e2b-dev/infra/tests/integration/internal/tests/orchestrator::TestSandboxMemoryIntegrity

Flake rate in main: 66.23% (Passed 156 times, Failed 306 times)

Stack Traces | 90.2s run time
=== RUN   TestSandboxMemoryIntegrity
=== PAUSE TestSandboxMemoryIntegrity
=== CONT  TestSandboxMemoryIntegrity
Executing command bash in sandbox iolubm7pxnsk4l583o2gq (user: root)
    sandbox_memory_integrity_test.go:26: Build completed successfully
--- FAIL: TestSandboxMemoryIntegrity (90.17s)
github.com/e2b-dev/infra/tests/integration/internal/tests/orchestrator::TestSandboxMemoryIntegrity/tmpfs_hash

Flake rate in main: 67.26% (Passed 146 times, Failed 300 times)

Stack Traces | 51.5s run time
=== RUN   TestSandboxMemoryIntegrity/tmpfs_hash
=== PAUSE TestSandboxMemoryIntegrity/tmpfs_hash
=== CONT  TestSandboxMemoryIntegrity/tmpfs_hash
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{start:{pid:1264}}
Executing command bash in sandbox ik259l4el5mbs7izyixbz (user: root)
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stdout:"Total memory: 985 MB\nUsed memory before tmpfs mount: 186 MB\nFree memory before tmpfs mount: 798 MB\nMemory to use in integrity test (80% of free, min 64MB): 638 MB\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"638+0 records in\n638+0 records out\n668991488 bytes (669 MB, 638 MiB) copied, 17.2005 s, 38.9 MB/s\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\tCommand being timed: \"dd if=/dev/urandom of=/mnt/testfile bs=1M count=638\"\n\tUser time (seconds): 0.00\n\tSystem time (seconds): 16.86\n\tPercent of CPU this job got: 97%\n\tElapsed (wall clock) time (h:mm:ss or m:ss): 0:17.28\n\tAverage shared text size (kbytes): 0\n\tAverage unshared data size (kbytes): 0\n\tAverage stack size (kbytes): 0\n\tAverage total size (kbytes): 0\n\tMaximum resident set size (kbytes): 2632\n\tAverage resident set size (kbytes): 0\n\tMajor (requiring I/O) page faults: 2\n\tMinor (reclaiming a frame) page faults: 343\n\tVoluntary context switches: 3\n\tInvoluntary context switches: 87\n\tSwaps: 0\n\tFile system inputs: 176\n\tFile system outputs: 0\n\tSocket messages sent: 0\n\tSocket messages received: 0\n\tSignals delivered: 0\n\tPage size (bytes): 4096\n\tExit status: 0\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stdout:"Used memory after tmpfs mount and file fill: 831 MB\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{end:{exited:true  status:"exit status 0"}}
    sandbox_memory_integrity_test.go:70: Command [bash] completed successfully in sandbox id2ogmgdmdmbhjtznbe7f
Executing command bash in sandbox id2ogmgdmdmbhjtznbe7f (user: root)
    sandbox_memory_integrity_test.go:74: Command [bash] output: event:{start:{pid:1280}}
    sandbox_memory_integrity_test.go:74: Command [bash] output: event:{data:{stdout:"6af84fc5c7a1e792bd6345c45634e4580be2ce990fdf12ada3941269ca9eac66\n"}}
    sandbox_memory_integrity_test.go:74: Command [bash] output: event:{end:{exited:true  status:"exit status 0"}}
    sandbox_memory_integrity_test.go:74: Command [bash] completed successfully in sandbox id2ogmgdmdmbhjtznbe7f
Executing command bash in sandbox id2ogmgdmdmbhjtznbe7f (user: root)
    sandbox_memory_integrity_test.go:99: Command [bash] output: event:{start:{pid:1284}}
    sandbox_memory_integrity_test.go:100: 
        	Error Trace:	.../tests/orchestrator/sandbox_memory_integrity_test.go:100
        	Error:      	Received unexpected error:
        	            	failed to execute command bash in sandbox id2ogmgdmdmbhjtznbe7f: invalid_argument: protocol error: incomplete envelope: unexpected EOF
        	Test:       	TestSandboxMemoryIntegrity/tmpfs_hash
--- FAIL: TestSandboxMemoryIntegrity/tmpfs_hash (51.48s)

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1440f94c7c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread packages/envd/internal/services/process/start_test.go
@linear-code
Copy link
Copy Markdown

linear-code Bot commented May 13, 2026

ENG-3933

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Iterating over the m.channels slice header outside the lock in the fan-out loop is unsafe and can lead to race conditions where subscribers are skipped or receive duplicate values. The HasSubscribers() check in the process handler prevents stdout and stderr output from being sent to the system logs when no RPC client is connected, which results in a loss of observability for background processes.

Comment thread packages/envd/internal/services/process/handler/multiplex.go
Comment thread packages/envd/internal/services/process/handler/handler.go
Comment thread packages/envd/internal/services/process/handler/handler.go
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 1440f94. Configure here.

Comment thread packages/envd/internal/services/process/handler/handler.go
…er on Fork

Move close(m.done) inside the m.mu critical section in run() so that
Fork()'s re-check under the same lock always observes the shutdown.
Previously, close(m.done) happened after Unlock, creating a window
where Fork could add a subscriber that run() never cleans up —
its channel is never closed, leaking the subscriber.
@arkamar arkamar force-pushed the fix/envd-discard-output-on-disconnect branch from 1440f94 to 47ee515 Compare May 13, 2026 13:41
@arkamar arkamar marked this pull request as draft May 13, 2026 13:44
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional findings (outside current diff — PR may have been updated during review):

  • 🟣 packages/envd/internal/services/process/handler/multiplex.go:55-67Pre-existing data race in the fan-out loop that this PR leaves intact. run() takes a slice-header snapshot of m.channels under RLock and iterates after unlocking, while remove() concurrently mutates the same backing array in place via append(m.channels[:i], m.channels[i+1:]...). With 3+ subscribers (Start RPC + concurrent Connect RPCs both call DataEvent.Fork()) and a non-tail cancel mid-iteration, the same value can be delivered twice to one client and skipped on another — silent stdout/stderr corruption. One-line fix: deep-copy under RLock (subs := append([]*subscriber[T]{}, m.channels...)).

    Extended reasoning...

    What is wrong

    In multiplex.go the fan-out loop snapshots the subscriber slice header under RLock and then iterates after releasing the lock:

    m.mu.RLock()
    subs := m.channels   // copies header only; shares backing array
    m.mu.RUnlock()
    
    for _, s := range subs {
        if s.isCancelled() { continue }
        select {
        case s.ch <- v:
        case <-s.done:
        }
    }

    Meanwhile remove() (line 159) mutates the same backing array in place under the write Lock:

    m.channels = append(m.channels[:i], m.channels[i+1:]...)

    Because m.channels[:i] has enough capacity (we are shrinking), append does not allocate a new array — it performs an in-place forward memmove. After removing index i from a length-N slice, slots i..N-2 are overwritten with the pointers from i+1..N-1, and the trailing slot N-1 retains its old pointer. This is a well-known Go gotcha (see the SliceTricks wiki note about zeroing the trailing element so the GC can reclaim it).

    Step-by-step proof

    Start state: m.channels = [A, B, C] (cap=3). One Start RPC plus two Connect RPCs gives this exact 3-subscriber shape via proc.DataEvent.Fork() (see start.go:106 and connect.go:30).

    1. run() enters with value v. It snapshots subs := m.channels → header {ptr=base, len=3, cap=3}. RUnlock.
    2. i=0: reads subs[0]=A, enters select { case A.ch <- v: case <-A.done: }. A.ch is unbuffered; the goroutine parks.
    3. Concurrently, A’s owner disconnects → cancel() calls remove(A). remove first calls s.cancel() (closes A.done) — the fan-out send unblocks via the <-s.done branch (A receives nothing). Then remove takes the write Lock and executes append(m.channels[:0], m.channels[1:]...). The in-place memmove writes base[0]=B, then base[1]=C. Result: backing array is [B, C, C]; m.channels header is {ptr=base, len=2}. The snapshot subs still has len=3.
    4. i=1: range subs reads subs[1] from the shared backing array. It is now C. The check s.isCancelled() returns false for C. Send v to C. ✓
    5. i=2: subs[2] is also C (the duplicated trailing pointer). isCancelled() is still false. Send v to C again. ✗

    Net effect: subscriber B misses v entirely; subscriber C receives v twice. For a stdout/stderr stream this is a duplicated chunk in one client and a missing chunk in another — silent corruption of the data stream. (With only 2 subscribers [A, B], removing A leaves [B, B] and B receives v twice — the bug is actually easier to trigger.)

    This is also a textbook data race under the Go memory model: the unsynchronized subs[i] reads in run() race with the Lock-protected backing-array writes in remove(). go test -race would flag it.

    Why existing code does not prevent it

    • The RLock/RUnlock pair only protects the header read; the backing array remains shared.
    • The isCancelled() check inside the loop does not save the duplicated subscriber — C is not cancelled, so the send proceeds the second time.
    • s.cancel() is called before remove() takes the write lock, so the fan-out’s per-subscriber select can unblock via <-s.done while remove() races to shift the backing array — providing the exact timing window.

    Trigger conditions and PR relevance

    The per-subscriber case s.ch <- v: blocks until the gRPC stream.Send completes; on a slow network this is arbitrarily long, so the window between iterations is wide. This PR is pre-existing in the strict sense — the snapshot+in-place-shift pattern was introduced in PR #2579 and the two offending lines are not modified here. However, this PR is centrally about fan-out correctness (it rewrites run(), Fork(), and adds HasSubscribers()), and the new “drain to void after cancel” semantics widen the window: previously a cancelled subscriber would wedge the fan-out at the first blocked send, so a race could only corrupt one value; now iteration continues across many values, so the corruption can repeat per value emitted while two clients remain alive.

    Fix

    One-line change in run() — deep-copy the slice under RLock:

    m.mu.RLock()
    subs := append([]*subscriber[T]{}, m.channels...)
    m.mu.RUnlock()

    Alternatively, change remove() to allocate a new backing array instead of shifting in place (m.channels = append([]*subscriber[T]{}, append(m.channels[:i:i], m.channels[i+1:]...)...)), but the snapshot fix is simpler and avoids the trailing-pointer pitfall everywhere.

Comment thread packages/envd/internal/services/process/handler/handler.go
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants