|
| 1 | +"""Audio chunking utilities for processing large audio files to prevent OOM errors.""" |
| 2 | + |
| 3 | +import os |
| 4 | +import logging |
| 5 | +from typing import List |
| 6 | +from pydub import AudioSegment |
| 7 | + |
| 8 | + |
| 9 | +class AudioChunker: |
| 10 | + """Handles splitting and merging of large audio files. |
| 11 | +
|
| 12 | + This class provides utilities to: |
| 13 | + - Split large audio files into fixed-duration chunks |
| 14 | + - Merge processed chunks back together with simple concatenation |
| 15 | + - Determine if a file should be chunked based on its duration |
| 16 | +
|
| 17 | + Example: |
| 18 | + >>> chunker = AudioChunker(chunk_duration_seconds=600) # 10-minute chunks |
| 19 | + >>> chunk_paths = chunker.split_audio("long_audio.wav", "/tmp/chunks") |
| 20 | + >>> # Process each chunk... |
| 21 | + >>> output_path = chunker.merge_chunks(processed_chunks, "output.wav") |
| 22 | + """ |
| 23 | + |
| 24 | + def __init__(self, chunk_duration_seconds: float, logger: logging.Logger = None): |
| 25 | + """Initialize the AudioChunker. |
| 26 | +
|
| 27 | + Args: |
| 28 | + chunk_duration_seconds: Duration of each chunk in seconds |
| 29 | + logger: Optional logger instance for logging operations |
| 30 | + """ |
| 31 | + self.chunk_duration_ms = int(chunk_duration_seconds * 1000) |
| 32 | + self.logger = logger or logging.getLogger(__name__) |
| 33 | + |
| 34 | + def split_audio(self, input_path: str, output_dir: str) -> List[str]: |
| 35 | + """Split audio file into fixed-size chunks. |
| 36 | +
|
| 37 | + Args: |
| 38 | + input_path: Path to the input audio file |
| 39 | + output_dir: Directory where chunk files will be saved |
| 40 | +
|
| 41 | + Returns: |
| 42 | + List of paths to the created chunk files |
| 43 | +
|
| 44 | + Raises: |
| 45 | + FileNotFoundError: If input file doesn't exist |
| 46 | + IOError: If there's an error reading or writing audio files |
| 47 | + """ |
| 48 | + if not os.path.exists(input_path): |
| 49 | + raise FileNotFoundError(f"Input file not found: {input_path}") |
| 50 | + |
| 51 | + if not os.path.exists(output_dir): |
| 52 | + os.makedirs(output_dir) |
| 53 | + |
| 54 | + self.logger.debug(f"Loading audio file: {input_path}") |
| 55 | + audio = AudioSegment.from_file(input_path) |
| 56 | + |
| 57 | + total_duration_ms = len(audio) |
| 58 | + chunk_paths = [] |
| 59 | + |
| 60 | + # Calculate number of chunks |
| 61 | + num_chunks = (total_duration_ms + self.chunk_duration_ms - 1) // self.chunk_duration_ms |
| 62 | + self.logger.info(f"Splitting {total_duration_ms / 1000:.1f}s audio into {num_chunks} chunks of {self.chunk_duration_ms / 1000:.1f}s each") |
| 63 | + |
| 64 | + # Get file extension from input |
| 65 | + _, ext = os.path.splitext(input_path) |
| 66 | + if not ext: |
| 67 | + ext = ".wav" # Default to WAV if no extension |
| 68 | + |
| 69 | + # Split into chunks |
| 70 | + for i in range(num_chunks): |
| 71 | + start_ms = i * self.chunk_duration_ms |
| 72 | + end_ms = min(start_ms + self.chunk_duration_ms, total_duration_ms) |
| 73 | + |
| 74 | + chunk = audio[start_ms:end_ms] |
| 75 | + chunk_filename = f"chunk_{i:04d}{ext}" |
| 76 | + chunk_path = os.path.join(output_dir, chunk_filename) |
| 77 | + |
| 78 | + self.logger.debug(f"Exporting chunk {i + 1}/{num_chunks}: {start_ms / 1000:.1f}s - {end_ms / 1000:.1f}s to {chunk_path}") |
| 79 | + chunk.export(chunk_path, format=ext.lstrip('.')) |
| 80 | + chunk_paths.append(chunk_path) |
| 81 | + |
| 82 | + return chunk_paths |
| 83 | + |
| 84 | + def merge_chunks(self, chunk_paths: List[str], output_path: str) -> str: |
| 85 | + """Merge processed chunks with simple concatenation. |
| 86 | +
|
| 87 | + Args: |
| 88 | + chunk_paths: List of paths to chunk files to merge |
| 89 | + output_path: Path where the merged output will be saved |
| 90 | +
|
| 91 | + Returns: |
| 92 | + Path to the merged output file |
| 93 | +
|
| 94 | + Raises: |
| 95 | + ValueError: If chunk_paths is empty |
| 96 | + FileNotFoundError: If any chunk file doesn't exist |
| 97 | + IOError: If there's an error reading or writing audio files |
| 98 | + """ |
| 99 | + if not chunk_paths: |
| 100 | + raise ValueError("Cannot merge empty list of chunks") |
| 101 | + |
| 102 | + # Verify all chunks exist |
| 103 | + for chunk_path in chunk_paths: |
| 104 | + if not os.path.exists(chunk_path): |
| 105 | + raise FileNotFoundError(f"Chunk file not found: {chunk_path}") |
| 106 | + |
| 107 | + self.logger.info(f"Merging {len(chunk_paths)} chunks into {output_path}") |
| 108 | + |
| 109 | + # Start with empty audio segment |
| 110 | + combined = AudioSegment.empty() |
| 111 | + |
| 112 | + # Concatenate all chunks |
| 113 | + for i, chunk_path in enumerate(chunk_paths): |
| 114 | + self.logger.debug(f"Loading chunk {i + 1}/{len(chunk_paths)}: {chunk_path}") |
| 115 | + chunk = AudioSegment.from_file(chunk_path) |
| 116 | + combined += chunk # Simple concatenation |
| 117 | + |
| 118 | + # Get output format from file extension |
| 119 | + _, ext = os.path.splitext(output_path) |
| 120 | + output_format = ext.lstrip('.') if ext else 'wav' |
| 121 | + |
| 122 | + self.logger.info(f"Exporting merged audio ({len(combined) / 1000:.1f}s) to {output_path}") |
| 123 | + combined.export(output_path, format=output_format) |
| 124 | + |
| 125 | + return output_path |
| 126 | + |
| 127 | + def should_chunk(self, audio_duration_seconds: float) -> bool: |
| 128 | + """Determine if file is large enough to benefit from chunking. |
| 129 | +
|
| 130 | + Args: |
| 131 | + audio_duration_seconds: Duration of the audio file in seconds |
| 132 | +
|
| 133 | + Returns: |
| 134 | + True if the file should be chunked, False otherwise |
| 135 | + """ |
| 136 | + return audio_duration_seconds > (self.chunk_duration_ms / 1000) |
0 commit comments