Skip to content

fix: protect FrameProcessor input handler from CancelledError death#4095

Open
namanbansal013 wants to merge 2 commits into
pipecat-ai:mainfrom
namanbansal013:fix/input-handler-cancellederror-protection
Open

fix: protect FrameProcessor input handler from CancelledError death#4095
namanbansal013 wants to merge 2 commits into
pipecat-ai:mainfrom
namanbansal013:fix/input-handler-cancellederror-protection

Conversation

@namanbansal013
Copy link
Copy Markdown
Contributor

@namanbansal013 namanbansal013 commented Mar 21, 2026

Problem

The __input_frame_task_handler loop in FrameProcessor is the sole consumer of the input queue — every frame that enters a processor flows through this single while True loop. However, this loop has no protection against asyncio.CancelledError, which is a BaseException in Python 3.9+.

When CancelledError escapes from any code called during frame processing (event handlers, _start_interruption, cancel_task, or downstream push_frame calls), it propagates up and kills the input handler permanently. Since no new task is created to replace it:

  • All frames queued in __input_queue are never consumed
  • The pipeline appears to be running but is completely stalled
  • No error is logged — the failure is silent
  • No recovery is possible without restarting the entire pipeline

Root Cause

During interruption handling, the call chain is:

__input_frame_task_handler
  → __process_frame (for SystemFrame like InterruptionFrame)
    → process_frame (dispatches to subclass)
      → _start_interruption
        → __cancel_process_task
          → cancel_task (uses asyncio.wait_for internally)

CancelledError can escape from multiple points in this chain:

  1. Event handlers registered via @processor.event_handler("on_after_process_frame")BaseObject._run_handler only catches Exception, not BaseException
  2. _start_interruption — catches Exception but not CancelledError
  3. httpx streaming cleanup during LLM cancellation — the OpenAI client's _closing context manager can interact with asyncio cancellation in ways that leak CancelledError
  4. cancel_task timeout path — if asyncio.wait_for times out, the task cancellation and timeout interact, potentially leaking CancelledError to the caller

Evidence

Observed in a production voice AI pipeline (pipecat v0.0.104). The bot was mid-sentence when a system notification triggered an interruption:

# 17:09:23.858 — LLM is actively streaming a response
OpenAILLMService#0: Generating chat from universal context [...]

# 17:09:23.884 — User speaks, triggering interruption  
LLMUserAggregator#0: broadcasting interruption

# 17:09:24.184 — System notification arrives (LLMMessagesAppendFrame)
bot:handle_notification: Handling notification: {'type': 'dsa_transition', ...}

# 17:09:24.886 — Last LLM output ever seen
OpenAILLMService#0 processing time: 1.029s
CartesiaTTSService#0: Generating TTS [What was your strategy for data migrations
across both monolith schemas without]

# === COMPLETE SILENCE — input handler is dead ===

# 17:09:25 → 17:13:13 — User speaks repeatedly, transcriptions arrive,
# but LLM never responds. Multiple interruptions broadcast with no effect:
MinWordsUserTurnStartStrategy#0 should_trigger=True  # User keeps trying
LLMUserAggregator#0: broadcasting interruption       # 18+ interruptions, all ignored

# 17:13:15 — User leaves, pipeline shutdown initiated
Cancelling pipeline task PipelineTask#0

# 17:13:35 — CancelFrame can't reach end of pipeline (input handler is dead)
PipelineTask#0: timeout waiting for CancelFrame#0(reason: None) to reach the
end of the pipeline (being blocked somewhere?).

The pipeline was completely unresponsive for ~4 minutes while the user kept speaking. The CancelFrame timeout during shutdown confirms the input handler was dead — it couldn't even process the shutdown frame.

Fix

Three defensive changes in src/pipecat/processors/frame_processor.py:

1. Protect __input_frame_task_handler (Primary fix)

Wrap the frame-dispatch block in a try/except asyncio.CancelledError that logs and continues the loop. During legitimate shutdown (self._cancelling is True), the CancelledError is re-raised to allow clean termination.

try:
    if isinstance(frame, SystemFrame):
        await self.__process_frame(frame, direction, callback)
    elif self.__process_queue:
        await self.__process_queue.put((frame, direction, callback))
    else:
        raise RuntimeError(...)
except asyncio.CancelledError:
    if self._cancelling:
        raise  # Legitimate shutdown
    logger.error(
        f"{self}: CancelledError escaped while processing "
        f"{frame.name}, input handler recovered"
    )

2. Protect _start_interruption (Defense-in-depth)

Add a CancelledError catch in _start_interruption so that if cancellation propagates during process task teardown, the process task is still re-created:

except asyncio.CancelledError:
    logger.warning(f"{self}: CancelledError during _start_interruption, recovering")
    self.__process_frame_task = None
    self.__create_process_task()

3. Zombie process task detection (Defense-in-depth)

After __cancel_process_task awaits cancel_task, check if the old task actually completed. If it survived the cancel timeout (zombie), replace the process queue to prevent it from stealing frames meant for the new task:

old_task = self.__process_frame_task
await self.cancel_task(self.__process_frame_task)
self.__process_frame_task = None
if not old_task.done():
    logger.warning(
        f"{self}: process task still alive after cancel timeout, "
        "replacing process queue to prevent frame theft"
    )
    self.__process_queue = asyncio.Queue()

Testing

Existing tests

All 11 existing tests in tests/test_frame_processor.py pass with no regressions.

New regression tests

test_input_handler_survives_cancelled_error — Registers an event handler that raises asyncio.CancelledError during InterruptionFrame processing. Verifies the input handler catches it and processes subsequent frames. This test hangs indefinitely without Fix 1, confirming the bug.

test_interrupted_processor_handles_subsequent_frames — Verifies the overall interruption recovery flow: a slow processor is interrupted, its process task is cancelled, and subsequent frames are still processed by the replacement task.

uv run pytest tests/test_frame_processor.py -v
# 11 passed

Changelog

Added changelog/4095.fixed.md.

Naman Bansal added 2 commits March 21, 2026 13:11
The __input_frame_task_handler loop in FrameProcessor has no protection
against asyncio.CancelledError escaping from InterruptionFrame
processing. When CancelledError propagates up (e.g. from cancel_task
during _start_interruption, from event handlers, or from httpx streaming
cleanup), it kills the input handler permanently. Since this is the only
consumer of the input queue, the entire pipeline stalls silently — no
frames are processed, no errors are logged, and no recovery is possible.

This was observed in production where an LLM mid-sentence was
interrupted by a system notification. The interruption triggered
_start_interruption -> __cancel_process_task -> cancel_task, and the
CancelledError escaped back into __input_frame_task_handler, killing it.
The bot went permanently silent while the user kept speaking for ~4
minutes until the session timed out.

Three defensive changes:

1. Wrap the frame-dispatch block in __input_frame_task_handler with a
   CancelledError catch that logs and continues (re-raises only during
   legitimate shutdown when self._cancelling is True).

2. Catch CancelledError in _start_interruption so that if cancellation
   propagates during process task teardown, the process task is still
   re-created rather than left in a broken state.

3. After __cancel_process_task awaits cancel_task, check if the old task
   is actually done. If it survived the cancel timeout (zombie task),
   replace the process queue to prevent it from stealing frames meant
   for the new process task.
@markbackman
Copy link
Copy Markdown
Contributor

Thanks for the details. Can we back up though? What is the scenario in which this happens and can you reproduce it? If you can reproduce it, I think a great place to start would be a minimal repro file along with a description of how to hit the issue.

Without that, it's hard to accept this change as we haven't seen this as an issue and it hasn't been reported anywhere. It's certainly possible that an issue exists but in a way that's outside of what we've tested. This is where the repro case would come in handy.

@namanbansal013
Copy link
Copy Markdown
Contributor Author

Hi @markbackman

Scenario: InterruptionFrame arrives while the LLM is actively streaming. _start_interruption → __cancel_process_task → cancel_task cancels the process task mid-stream. CancelledError escapes back into __input_frame_task_handler, killing it permanently. No frames processed, no errors logged, pipeline silently stalled.

We observed this in our pipecat cloud session 1b8fbcbf-aa05-42d3-be64-27653f5c8a60 (v0.0.104) where bot went silent for ~4 minutes after a user interruption coincided with an LLMMessagesAppendFrame. Shutdown log confirmed it: "timeout waiting for CancelFrame to reach the end of the pipeline (being blocked somewhere?)".

Repro: The regression test test_input_handler_survives_cancelled_error in this PR demonstrates this where an event handler raises CancelledError during InterruptionFrame processing. Without the fix, the test hangs indefinitely. I'll also try to put together a standalone minimal repro script.

I understand that the bug is extremely narrow as it requires CancelledError to escape from within _start_interruption, which mainly happens when interrupting an actively-streaming LLM call (httpx cleanup + asyncio cancellation interaction). We have also seen this once only in prod but wanted to bring this to your attention.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants