From 265540b8ce3822f6e9da4f8605c749896e3b8ce4 Mon Sep 17 00:00:00 2001 From: filipi87 Date: Fri, 27 Mar 2026 18:15:25 -0300 Subject: [PATCH 1/4] Keeping audio and clock queue in sync. --- src/pipecat/transports/base_output.py | 77 +++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 4 deletions(-) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 01af97be8c..22d7d44e8e 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -359,6 +359,16 @@ async def _handle_frame(self, frame: Frame): await sender.handle_mixer_control_frame(frame) elif isinstance(frame, TTSStoppedFrame): await sender.handle_sync_frame(frame) + if frame.pts: + # The frame goes into both queues. Create fresh events to keep + # the queues in sync: the audio task signals _clock_flush_event + # to trigger an immediate drain; the clock task signals + # _clock_drained_event once the drain is complete so the audio + # task knows it can safely push TTSStoppedFrame downstream. + # This way we can keep audio and words in sync. + sender._clock_flush_event = asyncio.Event() + sender._clock_drained_event = asyncio.Event() + await sender.handle_timed_frame(frame) elif frame.pts: await sender.handle_timed_frame(frame) else: @@ -431,6 +441,22 @@ def __init__( self._video_task: Optional[asyncio.Task] = None self._clock_task: Optional[asyncio.Task] = None + # When a TTSStoppedFrame has a pts it is enqueued in both the + # audio_queue and the clock_queue. These two events keep the queues + # in sync around that frame: + # + # _clock_flush_event – set by the audio task once all preceding + # audio has been sent, telling the clock task to stop waiting on + # timestamps and drain every pending frame immediately. + # + # _clock_drained_event – set by the clock task once it has drained + # all frames up to and including the TTSStoppedFrame, telling the + # audio task it is safe to push the TTSStoppedFrame downstream. + # This guarantees that all timed frames arrive before the stop + # signal regardless of which queue wins the race. + self._clock_flush_event: Optional[asyncio.Event] = None + self._clock_drained_event: Optional[asyncio.Event] = None + @property def sample_rate(self) -> int: """Get the audio sample rate. @@ -800,6 +826,18 @@ async def _audio_task_handler(self): await self._send_silence(self._params.audio_out_end_silence_secs) break + # If this TTSStoppedFrame is also in the clock queue, signal + # the clock task to drain immediately and then wait for it to + # confirm all timed frames have been pushed downstream before + # pushing TTSStoppedFrame here. This keeps the audio queue as + # the single owner of the downstream push for this frame. + if isinstance(frame, TTSStoppedFrame) and self._clock_flush_event is not None: + logger.debug(f"{self._transport} audio queue signalling clock queue flush") + self._clock_flush_event.set() + await self._clock_drained_event.wait() + self._clock_flush_event = None + self._clock_drained_event = None + # Handle frame. await self._handle_frame(frame) @@ -951,13 +989,44 @@ async def _clock_task_handler(self): # If we have a frame we check it's presentation timestamp. If it # has already passed we process it, otherwise we wait until it's # time to process it. + # + # When a TTSStoppedFrame with a pts is in flight, this queue and + # the audio_queue are kept in sync: the audio task signals + # _clock_flush_event as soon as all preceding audio has been + # sent, which wakes up the wait below early so every pending + # clock-queue frame is delivered immediately instead of stalling + # until its timestamp arrives. if running: current_time = self._transport.get_clock().get_time() if timestamp > current_time: wait_time = nanoseconds_to_seconds(timestamp - current_time) - await asyncio.sleep(wait_time) - - # Push frame downstream. - await self._transport.push_frame(frame) + if self._clock_flush_event: + # Race between the natural timestamp and a drain + # signal from the audio task. If the audio task sets + # the event first, we fall through immediately so + # this frame (and all subsequent ones up to the + # TTSStoppedFrame) are processed without delay. + try: + await asyncio.wait_for( + asyncio.shield(self._clock_flush_event.wait()), + timeout=wait_time, + ) + logger.debug( + f"{self._transport} clock queue flushed: delivering {frame} immediately" + ) + except asyncio.TimeoutError: + pass + else: + await asyncio.sleep(wait_time) + + # If this is the TTSStoppedFrame, signal the audio task that + # the drain is complete. The audio task owns the downstream + # push for this frame, so skip it here. + if isinstance(frame, TTSStoppedFrame) and self._clock_drained_event is not None: + logger.debug(f"{self._transport} clock queue drained, handing off TTSStoppedFrame to audio queue") + self._clock_drained_event.set() + else: + # Push frame downstream. + await self._transport.push_frame(frame) self._clock_queue.task_done() From de1fd67b2dc9b07c2a13d29bc6da16102dff5172 Mon Sep 17 00:00:00 2001 From: filipi87 Date: Fri, 27 Mar 2026 18:18:28 -0300 Subject: [PATCH 2/4] Adding fallback in case the clock queue is not drained. --- src/pipecat/transports/base_output.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index 22d7d44e8e..dba2f44128 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -834,7 +834,13 @@ async def _audio_task_handler(self): if isinstance(frame, TTSStoppedFrame) and self._clock_flush_event is not None: logger.debug(f"{self._transport} audio queue signalling clock queue flush") self._clock_flush_event.set() - await self._clock_drained_event.wait() + try: + await asyncio.wait_for(self._clock_drained_event.wait(), timeout=BOT_VAD_STOP_FALLBACK_SECS) + except asyncio.TimeoutError: + logger.warning( + f"{self._transport} timed out waiting for clock queue to drain, " + "pushing TTSStoppedFrame downstream anyway" + ) self._clock_flush_event = None self._clock_drained_event = None From 22b3a24548e75ba8d6a85d8262471934a5b99b41 Mon Sep 17 00:00:00 2001 From: filipi87 Date: Fri, 27 Mar 2026 18:21:55 -0300 Subject: [PATCH 3/4] Fixing ruff format. --- src/pipecat/transports/base_output.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index dba2f44128..73a97b5ebf 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -835,7 +835,9 @@ async def _audio_task_handler(self): logger.debug(f"{self._transport} audio queue signalling clock queue flush") self._clock_flush_event.set() try: - await asyncio.wait_for(self._clock_drained_event.wait(), timeout=BOT_VAD_STOP_FALLBACK_SECS) + await asyncio.wait_for( + self._clock_drained_event.wait(), timeout=BOT_VAD_STOP_FALLBACK_SECS + ) except asyncio.TimeoutError: logger.warning( f"{self._transport} timed out waiting for clock queue to drain, " @@ -1029,7 +1031,9 @@ async def _clock_task_handler(self): # the drain is complete. The audio task owns the downstream # push for this frame, so skip it here. if isinstance(frame, TTSStoppedFrame) and self._clock_drained_event is not None: - logger.debug(f"{self._transport} clock queue drained, handing off TTSStoppedFrame to audio queue") + logger.debug( + f"{self._transport} clock queue drained, handing off TTSStoppedFrame to audio queue" + ) self._clock_drained_event.set() else: # Push frame downstream. From edca44a9133348dc6320334c48f1858db3c0f3ff Mon Sep 17 00:00:00 2001 From: filipi87 Date: Fri, 27 Mar 2026 18:29:13 -0300 Subject: [PATCH 4/4] Adding changelog description to the fix. --- changelog/4178.fixed.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/4178.fixed.md diff --git a/changelog/4178.fixed.md b/changelog/4178.fixed.md new file mode 100644 index 0000000000..191a9c375e --- /dev/null +++ b/changelog/4178.fixed.md @@ -0,0 +1 @@ +- Fixed timed frames (e.g. word-boundary events) arriving out of order or too late relative to TTS audio playback. When a `TTSStoppedFrame` carries a presentation timestamp, the clock queue now flushes all pending timed frames immediately once the audio task finishes sending the preceding audio, ensuring timed events always reach downstream processors before the stop signal.