fix: evict a crashed worker from the container so destroy/reset don't hang#48
fix: evict a crashed worker from the container so destroy/reset don't hang#48rustatian wants to merge 2 commits into
Conversation
…replacement When a worker crashed unexpectedly, wait() removed it from the workers map but left it parked (in StateErrored) in the container channel and then allocated a replacement on top, leaving container.Len() at numWorkers+1. The stale entry was only evicted lazily by a later Take, so with an idle pool (no Take) Destroy/Reset — which wait for numWorkers == container.Len() — never settled and hung until the destroy/reset timeout fired. Evict the dead worker via w.Callback() (container.Remove()) before allocating the replacement, mirroring the Reset/Destroy cleanup. Adds a regression test.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThe watcher now calls a worker's Callback when the worker crashes (not manually destroyed), evicting the dead worker from the container so container.Len() can reach the configured worker count. A new test kills a worker process and asserts the container is replenished and consistent. ChangesCrashed Worker Eviction
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
worker_watcher/worker_watcher_test.go (1)
334-341: ⚡ Quick winReplace fixed sleep with deterministic settle polling.
Line 340 uses a fixed
time.Sleep(500 * time.Millisecond), which can make this regression test flaky on slow CI. Poll untilww.container.Len() == 2(with timeout) instead of sleeping.Proposed patch
select { case <-allocated: case <-time.After(10 * time.Second): - t.Error("watcher did not allocate a replacement worker") + t.Fatal("watcher did not allocate a replacement worker") } - time.Sleep(500 * time.Millisecond) + deadline := time.After(2 * time.Second) + for ww.container.Len() != 2 { + select { + case <-deadline: + t.Fatalf("container did not settle to expected size, got=%d", ww.container.Len()) + default: + time.Sleep(10 * time.Millisecond) + } + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@worker_watcher/worker_watcher_test.go` around lines 334 - 341, After the allocation select, replace the fixed time.Sleep(500 * time.Millisecond) with a deterministic polling loop that checks ww.container.Len() until it equals 2 (or until a timeout elapses) to avoid flakiness; implement this by creating a timeout (e.g., time.After with a few seconds) and a short ticker (e.g., 50-100ms) and in a select loop return when ww.container.Len() == 2 or call t.Error/fail when the timeout channel fires, removing the hardcoded sleep and referencing the existing allocated signal and ww.container.Len() for the condition.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@worker_watcher/worker_watcher_test.go`:
- Line 293: Update the test comments in worker_watcher_test.go to use US
spelling to satisfy misspell lint: change "signalling" to "signaling" and
"signalled" to "signaled" in the comment near the mkCmd usage inside the test
(search for mkCmd and the surrounding handshake explanation). Ensure both
occurrences (the one referenced around line 293 and the one around line 309) are
corrected and run golangci-lint to verify the misspell check passes.
---
Nitpick comments:
In `@worker_watcher/worker_watcher_test.go`:
- Around line 334-341: After the allocation select, replace the fixed
time.Sleep(500 * time.Millisecond) with a deterministic polling loop that checks
ww.container.Len() until it equals 2 (or until a timeout elapses) to avoid
flakiness; implement this by creating a timeout (e.g., time.After with a few
seconds) and a short ticker (e.g., 50-100ms) and in a select loop return when
ww.container.Len() == 2 or call t.Error/fail when the timeout channel fires,
removing the hardcoded sleep and referencing the existing allocated signal and
ww.container.Len() for the condition.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a6ce92f5-7e06-48de-b70b-ec02afd85291
📒 Files selected for processing (2)
worker_watcher/worker_watcher.goworker_watcher/worker_watcher_test.go
There was a problem hiding this comment.
Pull request overview
Fixes a shutdown hang in WorkerWatcher when a worker crashes while parked in the container: the crashed worker could be removed from the workers map but still remain in the container channel, so after a replacement is allocated container.Len() becomes numWorkers + 1, causing Destroy/Reset to spin until timeout.
Changes:
- Evict a crashed worker from the container (via the worker’s callback) before allocating a replacement in
wait(). - Add a regression test that kills a worker process and asserts the container length returns to
numWorkersafter replacement.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
worker_watcher/worker_watcher.go |
Evicts crashed workers from the container before replacement allocation to prevent Destroy/Reset hangs. |
worker_watcher/worker_watcher_test.go |
Adds a regression test covering the crash-and-replace path and asserting container length invariants. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // parked in the container channel (now in StateErrored). Evict it so container.Len() stays | ||
| // consistent with numWorkers; otherwise Destroy/Reset, which wait for | ||
| // numWorkers == container.Len(), never settle and hang until their timeout fires. | ||
| w.Callback() |
| if err = cmd.Start(); err != nil { | ||
| return nil, nil, err | ||
| } |
| // mkCmd starts a real (sleep) worker process and returns the worker plus its command, so | ||
| // the caller can kill the OS process directly — worker.Pid() is 0 without the goridge | ||
| // handshake, so signalling it would target the wrong process. No require.* here: mkCmd is | ||
| // also called from the watcher goroutine (via the allocator). |
| // Wait until the replacement has been allocated, then let the evict+push settle. | ||
| select { | ||
| case <-allocated: | ||
| case <-time.After(10 * time.Second): | ||
| t.Error("watcher did not allocate a replacement worker") | ||
| } | ||
| time.Sleep(500 * time.Millisecond) | ||
|
|
||
| // The container must hold exactly numWorkers — the dead worker evicted and the replacement | ||
| // pushed. Before the fix the dead worker lingered in the channel, leaving Len() == 3, | ||
| // which makes Destroy/Reset hang waiting for numWorkers == container.Len(). | ||
| assert.Equal(t, 2, ww.container.Len(), "crashed worker was not evicted from the container") | ||
|
|
…wn fix The test reliably exposed a pre-existing pool teardown hang (an idle pool that lost a worker hung Destroy for ~60s), fixed separately in roadrunner-server/pool#48. Re-add the (reliable, no-workflow) version once that fix is released and bumped here.
When a worker crashes unexpectedly,
worker_watcher.wait()removed it from the workers map but left it parked (inStateErrored) in the container channel, then allocated a replacement on top — socontainer.Len()stayed atnumWorkers + 1. The stale entry was only evicted lazily by a laterTake. With an idle pool (noTakebetween the crash and shutdown),Destroy/Reset— which spin untilnumWorkers == container.Len()— never settle and hang until the destroy/reset timeout fires, then force-kill.Evict the dead worker via
w.Callback()(container.Remove()) before allocating the replacement, mirroring the existingReset/Destroycleanup. AfterWait()the worker is inStateErrored, soRemove()drops it.Adds a regression test (
TestWorkerWatcher_Wait_EvictsCrashedWorkerFromContainer): kill one worker, then assert the container returns tonumWorkers(notnumWorkers + 1).Surfaced by the roadrunner-temporal plugin: an idle activity pool that loses a worker hung shutdown for the full destroy timeout (~60s).
Summary by CodeRabbit
Bug Fixes
Tests