fix: protect FrameProcessor input handler from CancelledError death#4095
fix: protect FrameProcessor input handler from CancelledError death#4095namanbansal013 wants to merge 2 commits into
Conversation
The __input_frame_task_handler loop in FrameProcessor has no protection against asyncio.CancelledError escaping from InterruptionFrame processing. When CancelledError propagates up (e.g. from cancel_task during _start_interruption, from event handlers, or from httpx streaming cleanup), it kills the input handler permanently. Since this is the only consumer of the input queue, the entire pipeline stalls silently — no frames are processed, no errors are logged, and no recovery is possible. This was observed in production where an LLM mid-sentence was interrupted by a system notification. The interruption triggered _start_interruption -> __cancel_process_task -> cancel_task, and the CancelledError escaped back into __input_frame_task_handler, killing it. The bot went permanently silent while the user kept speaking for ~4 minutes until the session timed out. Three defensive changes: 1. Wrap the frame-dispatch block in __input_frame_task_handler with a CancelledError catch that logs and continues (re-raises only during legitimate shutdown when self._cancelling is True). 2. Catch CancelledError in _start_interruption so that if cancellation propagates during process task teardown, the process task is still re-created rather than left in a broken state. 3. After __cancel_process_task awaits cancel_task, check if the old task is actually done. If it survived the cancel timeout (zombie task), replace the process queue to prevent it from stealing frames meant for the new process task.
|
Thanks for the details. Can we back up though? What is the scenario in which this happens and can you reproduce it? If you can reproduce it, I think a great place to start would be a minimal repro file along with a description of how to hit the issue. Without that, it's hard to accept this change as we haven't seen this as an issue and it hasn't been reported anywhere. It's certainly possible that an issue exists but in a way that's outside of what we've tested. This is where the repro case would come in handy. |
|
Hi @markbackman Scenario: InterruptionFrame arrives while the LLM is actively streaming. _start_interruption → __cancel_process_task → cancel_task cancels the process task mid-stream. CancelledError escapes back into __input_frame_task_handler, killing it permanently. No frames processed, no errors logged, pipeline silently stalled. We observed this in our pipecat cloud session 1b8fbcbf-aa05-42d3-be64-27653f5c8a60 (v0.0.104) where bot went silent for ~4 minutes after a user interruption coincided with an LLMMessagesAppendFrame. Shutdown log confirmed it: "timeout waiting for CancelFrame to reach the end of the pipeline (being blocked somewhere?)". Repro: The regression test test_input_handler_survives_cancelled_error in this PR demonstrates this where an event handler raises CancelledError during InterruptionFrame processing. Without the fix, the test hangs indefinitely. I'll also try to put together a standalone minimal repro script. I understand that the bug is extremely narrow as it requires CancelledError to escape from within _start_interruption, which mainly happens when interrupting an actively-streaming LLM call (httpx cleanup + asyncio cancellation interaction). We have also seen this once only in prod but wanted to bring this to your attention. |
Problem
The
__input_frame_task_handlerloop inFrameProcessoris the sole consumer of the input queue — every frame that enters a processor flows through this singlewhile Trueloop. However, this loop has no protection againstasyncio.CancelledError, which is aBaseExceptionin Python 3.9+.When
CancelledErrorescapes from any code called during frame processing (event handlers,_start_interruption,cancel_task, or downstreampush_framecalls), it propagates up and kills the input handler permanently. Since no new task is created to replace it:__input_queueare never consumedRoot Cause
During interruption handling, the call chain is:
CancelledErrorcan escape from multiple points in this chain:@processor.event_handler("on_after_process_frame")—BaseObject._run_handleronly catchesException, notBaseException_start_interruption— catchesExceptionbut notCancelledError_closingcontext manager can interact with asyncio cancellation in ways that leakCancelledErrorcancel_tasktimeout path — ifasyncio.wait_fortimes out, the task cancellation and timeout interact, potentially leakingCancelledErrorto the callerEvidence
Observed in a production voice AI pipeline (pipecat v0.0.104). The bot was mid-sentence when a system notification triggered an interruption:
The pipeline was completely unresponsive for ~4 minutes while the user kept speaking. The
CancelFrametimeout during shutdown confirms the input handler was dead — it couldn't even process the shutdown frame.Fix
Three defensive changes in
src/pipecat/processors/frame_processor.py:1. Protect
__input_frame_task_handler(Primary fix)Wrap the frame-dispatch block in a
try/except asyncio.CancelledErrorthat logs and continues the loop. During legitimate shutdown (self._cancelling is True), theCancelledErroris re-raised to allow clean termination.2. Protect
_start_interruption(Defense-in-depth)Add a
CancelledErrorcatch in_start_interruptionso that if cancellation propagates during process task teardown, the process task is still re-created:3. Zombie process task detection (Defense-in-depth)
After
__cancel_process_taskawaitscancel_task, check if the old task actually completed. If it survived the cancel timeout (zombie), replace the process queue to prevent it from stealing frames meant for the new task:Testing
Existing tests
All 11 existing tests in
tests/test_frame_processor.pypass with no regressions.New regression tests
test_input_handler_survives_cancelled_error— Registers an event handler that raisesasyncio.CancelledErrorduringInterruptionFrameprocessing. Verifies the input handler catches it and processes subsequent frames. This test hangs indefinitely without Fix 1, confirming the bug.test_interrupted_processor_handles_subsequent_frames— Verifies the overall interruption recovery flow: a slow processor is interrupted, its process task is cancelled, and subsequent frames are still processed by the replacement task.uv run pytest tests/test_frame_processor.py -v # 11 passedChangelog
Added
changelog/4095.fixed.md.