|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import asyncio |
| 4 | +from collections.abc import AsyncIterator |
4 | 5 |
|
5 | 6 | import numpy as np |
6 | 7 | import numpy.typing as npt |
@@ -82,6 +83,39 @@ async def run(self, text: str, settings: TTSModelSettings): |
82 | 83 | assert audio_chunks == [np.array([1], dtype=np.int16).tobytes()] |
83 | 84 |
|
84 | 85 |
|
| 86 | +@pytest.mark.asyncio |
| 87 | +async def test_streamed_audio_result_sends_short_custom_splitter_chunks() -> None: |
| 88 | + class RecordingTTS(FakeTTS): |
| 89 | + def __init__(self) -> None: |
| 90 | + super().__init__() |
| 91 | + self.texts: list[str] = [] |
| 92 | + |
| 93 | + async def run(self, text: str, settings: TTSModelSettings) -> AsyncIterator[bytes]: |
| 94 | + del settings |
| 95 | + self.texts.append(text) |
| 96 | + yield np.zeros(2, dtype=np.int16).tobytes() |
| 97 | + |
| 98 | + def split_immediately(text: str) -> tuple[str, str]: |
| 99 | + return text, "" |
| 100 | + |
| 101 | + fake_tts = RecordingTTS() |
| 102 | + result = StreamedAudioResult( |
| 103 | + fake_tts, |
| 104 | + TTSModelSettings(buffer_size=1, text_splitter=split_immediately), |
| 105 | + VoicePipelineConfig(), |
| 106 | + ) |
| 107 | + |
| 108 | + await result._add_text("ok") |
| 109 | + await result._turn_done() |
| 110 | + await result._done() |
| 111 | + |
| 112 | + events, audio_chunks = await extract_events(result) |
| 113 | + |
| 114 | + assert fake_tts.texts == ["ok"] |
| 115 | + assert events == ["turn_started", "audio", "turn_ended", "session_ended"] |
| 116 | + assert len(audio_chunks) == 1 |
| 117 | + |
| 118 | + |
85 | 119 | @pytest.mark.asyncio |
86 | 120 | async def test_voicepipeline_run_single_turn() -> None: |
87 | 121 | # Single turn. Should produce a single audio output, which is the TTS output for "out_1". |
|
0 commit comments