Skip to content

fix(pipeline): stop proxy tasks before cleaning up observer resources#4200

Closed
1Ckpwee wants to merge 1 commit into
pipecat-ai:mainfrom
1Ckpwee:fix/task-observer-cleanup-order
Closed

fix(pipeline): stop proxy tasks before cleaning up observer resources#4200
1Ckpwee wants to merge 1 commit into
pipecat-ai:mainfrom
1Ckpwee:fix/task-observer-cleanup-order

Conversation

@1Ckpwee
Copy link
Copy Markdown

@1Ckpwee 1Ckpwee commented Mar 30, 2026

Summary

Fixes #4195

During pipeline shutdown, TaskObserver.cleanup() was closing observer resources (file handles, connections) before cancelling the asyncio proxy tasks that were still consuming frames from queues. Late-arriving frames would hit the already-closed resource, producing ValueError: write to closed file.

Two bugs compounded:

  1. Proxy tasks were never stopped before cleanup() — only cancelled later in the PipelineTask.run() finally block
  2. self._proxies is a Dict[BaseObserver, Proxy], so iterating it yielded BaseObserver keys rather than Proxy values

Changes

  • Call await self.stop() at the start of cleanup() to cancel proxy tasks before any resources are closed
  • Iterate self._proxies.values() to correctly access Proxy objects

Test plan

  • Pipeline shutdown with active observers no longer produces ValueError: write to closed file
  • Observer data (e.g. session recordings) is saved correctly as before
  • stop() called multiple times is safe (idempotent)

@aconchillo
Copy link
Copy Markdown
Contributor

Thanks for identifying this issue, @khulnasoft-lab! The root cause analysis was spot on — proxy tasks were indeed still running when observer resources got cleaned up during shutdown.

We've addressed this in #4267 with a slightly different approach: instead of having cleanup() call stop() internally, we made the shutdown sequence explicit at the call site in task.py, keeping stop() and cleanup() as independent operations.

Closing this in favor of #4267. Thanks again for the contribution!

@aconchillo aconchillo closed this Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TaskObserver.cleanup() closes observer resources before stopping proxy tasks

2 participants