diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 2d2d5401..ee7ce48f 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -68,7 +68,7 @@ jobs: - name: Install Python dependencies (Ubuntu, <=3.13) if: matrix.os == 'ubuntu-latest' && matrix.python-version != '3.14' run: | - python -m pip install .[dev,audio,pocketsphinx,google-cloud,whisper-local,faster-whisper,openai,groq,vosk,cohere-api] + python -m pip install .[dev,audio,pocketsphinx,google-cloud,whisper-local,faster-whisper,openai,groq,vosk,cohere-api,audio-split] - name: Install Python dependencies (Ubuntu, 3.14) if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.14' run: | @@ -76,7 +76,7 @@ jobs: - name: Install Python dependencies (Windows) if: matrix.os == 'windows-latest' run: | - python -m pip install .[dev,whisper-local,faster-whisper,google-cloud,openai,groq,vosk,cohere-api] + python -m pip install .[dev,whisper-local,faster-whisper,google-cloud,openai,groq,vosk,cohere-api,audio-split] - name: Set up vosk model run: python -m speech_recognition.cli download vosk - name: Test with pytest @@ -120,6 +120,9 @@ jobs: install-spec: .[dev,vosk] setup-vosk-model: true verify-command: pytest -s -v tests/recognizers/test_vosk.py + - extra: audio-split + install-spec: .[dev,audio-split] + verify-command: pytest -s -v tests/test_audio.py steps: - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Set up Python 3.11 diff --git a/.gitignore b/.gitignore index 93e8d09f..1dcb7656 100644 --- a/.gitignore +++ b/.gitignore @@ -174,3 +174,8 @@ pocketsphinx-python/ examples/TEST.py *.geany *.out + +# Local AI tooling state (Claude Code, Cursor): worktree pointers, caches, +# session memory. Should never be committed. +.claude/ +.cursor/ diff --git a/pyproject.toml b/pyproject.toml index 2f75219f..b226f4c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,10 @@ cohere-api = [ ] assemblyai = ["requests"] vosk = ["vosk"] +audio-split = [ + "librosa>=0.10.2,<1.0", + "numpy>=1.26.0", +] [tool.setuptools] include-package-data = false diff --git a/speech_recognition/audio.py b/speech_recognition/audio.py index c35f9797..18614d12 100644 --- a/speech_recognition/audio.py +++ b/speech_recognition/audio.py @@ -24,6 +24,8 @@ class AudioData(object): Usually, instances of this class are obtained from ``recognizer_instance.record`` or ``recognizer_instance.listen``, or in the callback for ``recognizer_instance.listen_in_background``, rather than instantiating them directly. """ + _WAV_HEADER_OVERHEAD = 44 + def __init__(self, frame_data, sample_rate, sample_width): assert sample_rate > 0, "Sample rate must be a positive integer" assert ( @@ -72,6 +74,215 @@ def get_segment(self, start_ms=None, end_ms=None): self.sample_width, ) + def split( + self, max_bytes: int, *, silence_aware: bool = False + ) -> list[AudioData]: + """ + Splits this audio into a list of ``AudioData`` chunks targeting ``max_bytes`` per chunk when serialized as WAV (via ``get_wav_data()``). + + When ``silence_aware=False`` (the default), splits the audio mechanically on sample boundaries; each returned chunk's WAV-serialized size is guaranteed to be at most ``max_bytes``. No optional dependency is required. + + When ``silence_aware=True``, chooses chunk boundaries near silences via ``librosa.effects.split`` while keeping every chunk within ``max_bytes`` (the boundary search looks only before the target, never past it). When no suitable silence boundary is found in the look-back window, the chunk is cut at the size-derived target the same way as the fixed-time mode. Requires ``librosa`` and ``numpy``; raises ``SetupError`` if they are not installed or fail to initialize at runtime. + + Raises ``ValueError`` if ``len(frame_data)`` is not a multiple of ``sample_width`` (which ``AudioData`` would otherwise accept), since enforcing the ``max_bytes`` cap requires sample-aligned input. + + Returns ``[self]`` unchanged when the audio already fits within ``max_bytes`` (even when ``silence_aware=True``, in which case the librosa import is skipped). + + Example:: + + chunks = audio.split(max_bytes=24 * 1024 * 1024) + texts = [r.recognize_openai(c) for c in chunks] + """ + min_required = self._WAV_HEADER_OVERHEAD + self.sample_width + if max_bytes < min_required: + raise ValueError( + "``max_bytes`` must be at least " + f"{min_required} bytes (WAV header + one sample) for " + f"sample_width={self.sample_width}; got {max_bytes}" + ) + if len(self.frame_data) % self.sample_width != 0: + raise ValueError( + "``split`` requires ``frame_data`` length to be a multiple " + f"of sample_width ({self.sample_width}); got " + f"{len(self.frame_data)} bytes. Trim the audio to a sample " + "boundary before calling ``split``." + ) + + if ( + len(self.frame_data) + self._WAV_HEADER_OVERHEAD + <= max_bytes + ): + return [self] + + if silence_aware: + return self._split_silence_aware(max_bytes) + return self._split_fixed(max_bytes) + + def _split_fixed(self, max_bytes: int) -> list[AudioData]: + max_payload = max_bytes - self._WAV_HEADER_OVERHEAD + chunk_size = (max_payload // self.sample_width) * self.sample_width + + chunks: list[AudioData] = [] + for start in range(0, len(self.frame_data), chunk_size): + chunks.append( + AudioData( + self.frame_data[start:start + chunk_size], + self.sample_rate, + self.sample_width, + ) + ) + return chunks + + def _split_silence_aware(self, max_bytes: int) -> list[AudioData]: + # Force-load the exact dependencies we use so that lazy import or + # numba-style runtime errors from librosa surface here as a single + # ``SetupError`` rather than escaping later mid-loop. + try: + import numpy as np + from librosa.effects import split as librosa_split + except Exception as exc: + from speech_recognition.exceptions import SetupError + + if isinstance(exc, ImportError): + hint = ( + "install them with `pip install " + "SpeechRecognition[audio-split]`" + ) + else: + hint = ( + "the package(s) appear installed but failed to " + "initialize; check environment-specific issues such " + "as a non-writable numba cache directory" + ) + raise SetupError( + "silence-aware splitting could not initialize librosa/numpy: " + f"{type(exc).__name__}: {exc}. {hint}." + ) from exc + + target_payload = max_bytes - self._WAV_HEADER_OVERHEAD + chunk_samples = target_payload // self.sample_width + + sw = self.sample_width + total_samples = len(self.frame_data) // sw + silence_top_db = 40.0 + min_progress_samples = self.sample_rate // 2 + # Search window stays entirely before ``target`` so ``max_bytes`` is + # a hard ceiling on chunk size. Quality is recovered by snapping to + # silence within the look-back window instead of overshooting. + search_before = min(chunk_samples // 2, 10 * self.sample_rate) + + boundaries = [0] + start = 0 + while start < total_samples: + target = min(start + chunk_samples, total_samples) + if target >= total_samples: + boundaries.append(total_samples) + break + + search_start = max(start, target - search_before) + search_end = target + + proposed_end = target + if search_end > search_start: + # Materialize only the search window as float to keep peak + # memory bounded by the window size (≈ seconds of audio), + # not the entire recording (potentially hours). + segment = self._to_float_ndarray( + np, + raw=self.frame_data[ + search_start * sw:search_end * sw + ], + ) + # Call-time numba JIT/cache failures inside librosa can + # raise long after our import probe; translate them into + # the same SetupError surface. + try: + nonsilent_ranges = librosa_split( + segment, + top_db=silence_top_db, + frame_length=2048, + hop_length=512, + ) + except Exception as exc: + from speech_recognition.exceptions import SetupError + + raise SetupError( + "librosa.effects.split failed during invocation: " + f"{type(exc).__name__}: {exc}. The package is " + "installed but its runtime backend (numba/llvmlite) " + "could not initialize in this environment." + ) from exc + segment_len = len(segment) + candidates = [] + for nonsilent_range in nonsilent_ranges: + start_idx = int(nonsilent_range[0]) + end_idx = int(nonsilent_range[1]) + if start_idx > 0: + candidates.append(search_start + start_idx) + if end_idx < segment_len: + candidates.append(search_start + end_idx) + + min_allowed = start + min_progress_samples + valid = [ + c for c in candidates if min_allowed < c <= search_end + ] + if valid: + proposed_end = min(valid, key=lambda c: abs(c - target)) + + if proposed_end <= start: + proposed_end = min(start + chunk_samples, total_samples) + if proposed_end <= start: + break + + boundaries.append(proposed_end) + start = proposed_end + + chunks: list[AudioData] = [] + for i in range(len(boundaries) - 1): + sample_start = boundaries[i] + sample_end = boundaries[i + 1] + byte_start = sample_start * self.sample_width + byte_end = sample_end * self.sample_width + chunks.append( + AudioData( + self.frame_data[byte_start:byte_end], + self.sample_rate, + self.sample_width, + ) + ) + return chunks + + def _to_float_ndarray(self, np, raw=None): + # WAV PCM frame data is little-endian; use explicit byte-order + # dtypes so the conversion is correct on big-endian hosts. + if raw is None: + raw = self.frame_data + sw = self.sample_width + if sw == 1: + raw = audioop.bias(raw, 1, -128) + return np.frombuffer(raw, dtype=np.int8).astype(np.float32) / 128.0 + if sw == 2: + return ( + np.frombuffer(raw, dtype=" WAV header (44 bytes), so + # max_bytes=45 with sample_width=2 silently produced empty chunks + # when asserts were stripped (python -O). + for sample_width in (1, 2, 3, 4): + payload = b"\x00" * (sample_width * 100) + audio = sr.AudioData( + payload, sample_rate=16000, sample_width=sample_width + ) + min_required = sr.AudioData._WAV_HEADER_OVERHEAD + sample_width + with self.assertRaises(ValueError): + audio.split(max_bytes=min_required - 1) + with self.assertRaises(ValueError): + audio.split( + max_bytes=min_required - 1, silence_aware=True + ) + # boundary: exact minimum must NOT raise; should produce chunks + chunks = audio.split(max_bytes=min_required) + self.assertGreater(len(chunks), 0) + + def test_fixed_split_chunks_fit_within_max_bytes(self): + payload = b"\x00\x01" * 5_000 + audio = sr.AudioData(payload, sample_rate=16000, sample_width=2) + max_bytes = 2_048 + chunks = audio.split(max_bytes=max_bytes) + + self.assertGreater(len(chunks), 1) + for chunk in chunks: + self.assertLessEqual(len(chunk.get_wav_data()), max_bytes) + + joined = b"".join(c.frame_data for c in chunks) + self.assertEqual(joined, payload) + + for chunk in chunks: + self.assertEqual(chunk.sample_rate, 16000) + self.assertEqual(chunk.sample_width, 2) + + def test_fixed_split_aligns_to_sample_boundary(self): + payload = b"\x00\x00\x01\x00\x02\x00\x03\x00" * 1_000 + audio = sr.AudioData(payload, sample_rate=8000, sample_width=2) + chunks = audio.split(max_bytes=200) + for chunk in chunks: + self.assertEqual(len(chunk.frame_data) % 2, 0) + + def test_silence_aware_raises_setup_error_without_librosa(self): + # Pre-load numpy so mock.patch.dict's exit-time restore does not + # remove a freshly-imported numpy entry; numpy 2.x refuses to + # re-initialize once unloaded mid-process. + try: + import numpy # noqa: F401 + except ImportError: + pass + + payload = b"\x00\x01" * 5_000 + audio = sr.AudioData(payload, sample_rate=16000, sample_width=2) + with mock.patch.dict( + sys.modules, + {"librosa": None, "librosa.effects": None}, + ): + with self.assertRaises(SetupError): + audio.split(max_bytes=2_048, silence_aware=True) + + def test_silence_aware_translates_call_time_errors_to_setup_error(self): + # Regression: even after the dependency import succeeds, librosa's + # numba-backed implementation can raise mid-call (e.g. cache + # creation failures in read-only environments). Those must also + # surface as SetupError, not as a raw RuntimeError, so users get a + # single actionable failure mode. + try: + import numpy # noqa: F401 + import librosa.effects # noqa: F401 + except ImportError: + self.skipTest("librosa not installed; cannot exercise call-time error path") + + payload = b"\x00\x01" * 5_000 + audio = sr.AudioData(payload, sample_rate=16000, sample_width=2) + + def _boom(*args, **kwargs): + raise RuntimeError("simulated numba JIT failure") + + with mock.patch("librosa.effects.split", side_effect=_boom): + with self.assertRaises(SetupError): + audio.split(max_bytes=2_048, silence_aware=True) + + def test_silence_aware_translates_lazy_runtime_errors_to_setup_error(self): + # Regression: previously librosa.effects.split was looked up lazily + # at call time, so numba/librosa lazy-import RuntimeErrors escaped + # the SetupError guard. Now the dependency is force-loaded inside + # the guard, so any initialization-time error becomes SetupError. + try: + import numpy # noqa: F401 + except ImportError: + pass + + payload = b"\x00\x01" * 5_000 + audio = sr.AudioData(payload, sample_rate=16000, sample_width=2) + + import types + + class _RaisingEffects(types.ModuleType): + def __getattr__(self, name): + raise RuntimeError("simulated lazy load failure") + + fake_librosa = types.ModuleType("librosa") + fake_effects = _RaisingEffects("librosa.effects") + fake_librosa.effects = fake_effects + with mock.patch.dict( + sys.modules, + {"librosa": fake_librosa, "librosa.effects": fake_effects}, + ): + with self.assertRaises(SetupError): + audio.split(max_bytes=2_048, silence_aware=True) + + +class TestAudioDataSplitSilenceAware(unittest.TestCase): + def setUp(self): + # Probe the exact callable used at runtime so this also skips when + # librosa is installed but its numba-backed initialization fails + # (e.g., read-only cache directory). + try: + import numpy # noqa: F401 + from librosa.effects import split as _librosa_split # noqa: F401 + except Exception as exc: + raise unittest.SkipTest( + "silence-aware split tests require a functional librosa " + f"and numpy: {exc}" + ) + + def test_to_float_ndarray_normalizes_each_sample_width(self): + import numpy as np + + # Build payloads as explicit little-endian bytes so the assertions + # are independent of host byte order (WAV PCM is little-endian). + cases = { + 1: bytes([0, 128, 255, 64]), # WAV unsigned 8-bit + 2: b"".join( + int(v).to_bytes(2, "little", signed=True) + for v in (0, 32767, -32768, 100) + ), + 4: b"".join( + int(v).to_bytes(4, "little", signed=True) + for v in (0, (1 << 31) - 1, -(1 << 31), 1000) + ), + } + for sw, payload in cases.items(): + audio = sr.AudioData(payload, sample_rate=16000, sample_width=sw) + arr = audio._to_float_ndarray(np) + self.assertEqual(arr.dtype, np.float32) + self.assertTrue(np.all(np.abs(arr) <= 1.0 + 1e-6), f"sw={sw}") + + def test_to_float_ndarray_decodes_little_endian_regardless_of_host(self): + import numpy as np + + # Hand-built little-endian byte sequences with known values; the + # test fails on big-endian hosts if dtype lacks an explicit `<`. + payload_16 = b"\x01\x00" + b"\xff\xff" # +1, -1 + audio16 = sr.AudioData(payload_16, sample_rate=16000, sample_width=2) + arr16 = audio16._to_float_ndarray(np) + self.assertAlmostEqual(float(arr16[0]), 1 / 32768.0, places=6) + self.assertAlmostEqual(float(arr16[1]), -1 / 32768.0, places=6) + + payload_32 = b"\x01\x00\x00\x00" + b"\xff\xff\xff\xff" # +1, -1 + audio32 = sr.AudioData(payload_32, sample_rate=16000, sample_width=4) + arr32 = audio32._to_float_ndarray(np) + self.assertAlmostEqual(float(arr32[0]), 1 / (1 << 31), places=10) + self.assertAlmostEqual(float(arr32[1]), -1 / (1 << 31), places=10) + + def test_to_float_ndarray_24bit_sign_extension(self): + import numpy as np + + positive = (0x123456).to_bytes(3, "little", signed=False) + negative = (-0x123456).to_bytes(3, "little", signed=True) + zero = b"\x00\x00\x00" + payload = positive + negative + zero + audio = sr.AudioData(payload, sample_rate=16000, sample_width=3) + arr = audio._to_float_ndarray(np) + self.assertEqual(arr.shape, (3,)) + self.assertGreater(arr[0], 0) + self.assertLess(arr[1], 0) + self.assertEqual(arr[2], 0.0) + self.assertTrue(np.all(np.abs(arr) <= 1.0 + 1e-6)) + + def test_silence_aware_uses_single_nonsilent_range_boundary(self): + import numpy as np + + sample_rate = 16000 + silence = np.zeros(int(sample_rate * 1.5)) + tone = ( + np.sin(2 * np.pi * 440 * np.arange(int(sample_rate * 3)) / sample_rate) + * 0.5 + ) + more_silence = np.zeros(int(sample_rate * 1.0)) + combined = np.concatenate([silence, tone, more_silence, tone]) + pcm = (combined * 32767).astype(np.int16).tobytes() + audio = sr.AudioData(pcm, sample_rate=sample_rate, sample_width=2) + + target_seconds = 2.5 + max_bytes = ( + int(target_seconds * sample_rate * 2) + sr.AudioData._WAV_HEADER_OVERHEAD + ) + chunks = audio.split(max_bytes=max_bytes, silence_aware=True) + + self.assertGreater(len(chunks), 1) + # First chunk boundary must land on a silence sample (not mid-tone). + # The first silence ends near sample 24000 (1.5s); the cut should fall + # at or before that, so the last sample of chunk[0] is silence (≈0). + first_chunk_samples = np.frombuffer( + chunks[0].frame_data, dtype=np.int16 + ) + self.assertLess(abs(int(first_chunk_samples[-1])), 1000) + + joined = b"".join(c.frame_data for c in chunks) + self.assertEqual(joined, pcm) + + def test_silence_aware_respects_byte_budget_strictly(self): + # Regression: max_bytes must be a hard ceiling for silence-aware + # mode, not a soft target. The search window is now constrained to + # the look-back side of the target, so chunks cannot exceed the cap. + import numpy as np + + sample_rate = 16000 + sample_width = 2 + silence = np.zeros(int(sample_rate * 5)) + pcm = silence.astype(np.int16).tobytes() + audio = sr.AudioData( + pcm, sample_rate=sample_rate, sample_width=sample_width + ) + + max_bytes = 200 + chunks = audio.split(max_bytes=max_bytes, silence_aware=True) + + for chunk in chunks: + self.assertLessEqual(len(chunk.get_wav_data()), max_bytes) + + joined = b"".join(c.frame_data for c in chunks) + self.assertEqual(joined, pcm) + + def test_silence_aware_respects_byte_budget_on_realistic_audio(self): + # Same strict-cap invariant against audio that contains both + # speech-like and silence segments so the boundary search exercises + # the librosa path. + import numpy as np + + sample_rate = 16000 + sample_width = 2 + tone_a = ( + np.sin(2 * np.pi * 440 * np.arange(int(sample_rate * 2.0)) / sample_rate) + * 0.5 + ) + silence = np.zeros(int(sample_rate * 1.5)) + tone_b = ( + np.sin(2 * np.pi * 440 * np.arange(int(sample_rate * 2.0)) / sample_rate) + * 0.5 + ) + combined = np.concatenate([tone_a, silence, tone_b]) + pcm = (combined * 32767).astype(np.int16).tobytes() + audio = sr.AudioData( + pcm, sample_rate=sample_rate, sample_width=sample_width + ) + + target_seconds = 2.5 + max_bytes = ( + int(target_seconds * sample_rate * sample_width) + + sr.AudioData._WAV_HEADER_OVERHEAD + ) + chunks = audio.split(max_bytes=max_bytes, silence_aware=True) + for chunk in chunks: + self.assertLessEqual(len(chunk.get_wav_data()), max_bytes) + + def test_silence_aware_snaps_to_speech_end_within_lookback(self): + # Regression: the boundary search must consider the end of a + # nonsilent range (speech-to-silence transition), not just the + # start. When the most recent speech ends shortly before the + # target, the cleanest cut is at that speech end (which is also + # the start of trailing silence) — strictly before the target so + # the chunk stays within the byte budget. + import numpy as np + + sample_rate = 16000 + # 2.0s tone, then 1.5s silence, then 2.0s tone. Target ~2.5s, so the + # target falls inside the silence right after the first tone ends. + tone_a = ( + np.sin(2 * np.pi * 440 * np.arange(int(sample_rate * 2.0)) / sample_rate) + * 0.5 + ) + silence = np.zeros(int(sample_rate * 1.5)) + tone_b = ( + np.sin(2 * np.pi * 440 * np.arange(int(sample_rate * 2.0)) / sample_rate) + * 0.5 + ) + combined = np.concatenate([tone_a, silence, tone_b]) + pcm = (combined * 32767).astype(np.int16).tobytes() + audio = sr.AudioData(pcm, sample_rate=sample_rate, sample_width=2) + + target_seconds = 2.5 + max_bytes = ( + int(target_seconds * sample_rate * 2) + sr.AudioData._WAV_HEADER_OVERHEAD + ) + chunks = audio.split(max_bytes=max_bytes, silence_aware=True) + + self.assertGreater(len(chunks), 1) + # The first chunk should end inside the silence region (sample range + # ~32000-56000), not mid-speech in tone_b. + first_chunk_end_sample = len(chunks[0].frame_data) // 2 + self.assertGreaterEqual(first_chunk_end_sample, int(sample_rate * 2.0) - 200) + self.assertLessEqual(first_chunk_end_sample, int(sample_rate * 3.5) + 200) + + joined = b"".join(c.frame_data for c in chunks) + self.assertEqual(joined, pcm) + + def test_silence_aware_splits_at_silence_boundary(self): + import numpy as np + + sample_rate = 16000 + tone = ( + np.sin(2 * np.pi * 440 * np.arange(int(sample_rate * 2)) / sample_rate) + * 0.5 + ) + silence = np.zeros(int(sample_rate * 1.5)) + combined = np.concatenate([tone, silence, tone, silence, tone]) + pcm = (combined * 32767).astype(np.int16).tobytes() + audio = sr.AudioData(pcm, sample_rate=sample_rate, sample_width=2) + + target_seconds = 2.5 + max_bytes = ( + int(target_seconds * sample_rate * 2) + sr.AudioData._WAV_HEADER_OVERHEAD + ) + chunks = audio.split(max_bytes=max_bytes, silence_aware=True) + + self.assertGreater(len(chunks), 1) + joined = b"".join(c.frame_data for c in chunks) + self.assertEqual(joined, pcm) + + if __name__ == "__main__": unittest.main()