diff --git a/app/core/audio_processing.py b/app/core/audio_processing.py index 4c8cbd7..6e2a4ab 100644 --- a/app/core/audio_processing.py +++ b/app/core/audio_processing.py @@ -1,391 +1,310 @@ """ -Audio processing utilities for long text TTS concatenation +Audio processing utilities for long text TTS concatenation. + +This module is configurable via environment variables and includes features like +parallel processing, automatic retries, and a production-ready in-memory cache. + +Cache Configuration: +- AUDIO_CACHE_MAX_SIZE_MB: Max in-memory cache size in MB. Evicts old entries when full. (Default: 256) +- AUDIO_CACHE_CLEAR_INTERVAL_S: Automatically clear cache periodically (in seconds). 0 disables. (Default: 3600) + +Performance & Limits: +- AUDIO_SILENCE_PADDING_MS: Default silence duration in ms. (Default: 250) +- AUDIO_MAX_FILES_TO_CONCATENATE: Max number of files per job. (Default: 5000) +- AUDIO_MAX_TOTAL_SIZE_MB: Max combined file size in MB. (Default: 2048) +- AUDIO_USE_PARALLEL_PROCESSING: Set 'true' or '1' to enable parallel mode. (Default: false) +- AUDIO_MAX_PARALLEL_WORKERS: Max threads for parallel mode. (Default: CPU cores) +- AUDIO_LARGE_FILE_THRESHOLD_MB: Warn if a single file exceeds this size. (Default: 100) """ +import concurrent.futures +import gc import logging import os -import tempfile +import sys +import time from pathlib import Path -from typing import List, Optional, Union +from typing import Callable, Dict, List, Optional, Tuple, TypedDict, Union try: from pydub import AudioSegment - from pydub.silence import split_on_silence - from pydub.utils import make_chunks PYDUB_AVAILABLE = True except ImportError as e: - PYDUB_AVAILABLE = False - AudioSegment = None - # Log the import error for debugging + PYDUB_AVAILABLE = False; AudioSegment = None import logging logging.getLogger(__name__).warning(f"pydub import failed: {e}") except Exception as e: - PYDUB_AVAILABLE = False - AudioSegment = None - # Log any other errors for debugging + PYDUB_AVAILABLE = False; AudioSegment = None import logging logging.getLogger(__name__).error(f"Unexpected error importing pydub: {e}") -from app.config import Config - logger = logging.getLogger(__name__) -class AudioConcatenationError(Exception): - """Exception raised when audio concatenation fails""" - pass +# --- Configuration from Environment Variables --- +def _get_env_var_as_int(name: str, default: int) -> int: + value_str = os.getenv(name) + if value_str is None: return default + try: return int(value_str) + except (ValueError, TypeError): + logger.warning(f"Invalid value for env var {name}: '{value_str}'. Using default: {default}.") + return default -def check_pydub_availability(): - """Check if pydub is available and properly configured""" - if not PYDUB_AVAILABLE: - raise AudioConcatenationError( - "pydub is not available. Please install it with: pip install pydub" - ) +def _get_env_var_as_bool(name: str, default: bool = False) -> bool: + value_str = os.getenv(name, '').lower() + if value_str in ('true', '1', 'yes', 'on'): return True + if value_str in ('false', '0', 'no', 'off', ''): return False + logger.warning(f"Invalid value for boolean env var {name}: '{value_str}'. Using default: {default}.") + return default - # Test basic functionality - try: - # Create a small test audio segment - test_audio = AudioSegment.silent(duration=100) # 100ms of silence - return True - except Exception as e: - raise AudioConcatenationError(f"pydub is not properly configured: {e}") +# Performance & Limits +SILENCE_PADDING_MS = _get_env_var_as_int('AUDIO_SILENCE_PADDING_MS', 250) +MAX_FILES = _get_env_var_as_int('AUDIO_MAX_FILES_TO_CONCATENATE', 5000) +MAX_SIZE_MB = _get_env_var_as_int('AUDIO_MAX_TOTAL_SIZE_MB', 2048) +MAX_TOTAL_SIZE_BYTES = MAX_SIZE_MB * 1024 * 1024 +USE_PARALLEL_PROCESSING_DEFAULT = _get_env_var_as_bool('AUDIO_USE_PARALLEL_PROCESSING') +MAX_WORKERS = _get_env_var_as_int('AUDIO_MAX_PARALLEL_WORKERS', os.cpu_count() or 4) +LARGE_FILE_THRESHOLD_MB = _get_env_var_as_int('AUDIO_LARGE_FILE_THRESHOLD_MB', 100) +LARGE_FILE_THRESHOLD_BYTES = LARGE_FILE_THRESHOLD_MB * 1024 * 1024 + +# Cache Configuration +CACHE_MAX_SIZE_MB = _get_env_var_as_int('AUDIO_CACHE_MAX_SIZE_MB', 256) +CACHE_MAX_SIZE_BYTES = CACHE_MAX_SIZE_MB * 1024 * 1024 +CACHE_CLEAR_INTERVAL_S = _get_env_var_as_int('AUDIO_CACHE_CLEAR_INTERVAL_S', 3600) + + +# --- Type Definitions & Caching --- + +class AudioMetadata(TypedDict): + output_path: str; duration_seconds: float; file_size_bytes: int; sample_rate: int; channels: int + +class AudioConcatenationError(Exception): pass +_segment_cache: Dict[Tuple, AudioSegment] = {} +_last_cache_clear_time = time.time() + + +# --- Core Functions --- def concatenate_audio_files(audio_files: List[Union[str, Path]], output_path: Union[str, Path], output_format: str = "mp3", - silence_duration_ms: Optional[int] = None, - crossfade_duration_ms: int = 0, - normalize_volume: bool = True, - remove_source_files: bool = False) -> dict: + **kwargs) -> AudioMetadata: """ Concatenate multiple audio files into a single output file. - - Args: - audio_files: List of paths to audio files to concatenate - output_path: Path where the concatenated audio will be saved - output_format: Output format ('mp3', 'wav', etc.) - silence_duration_ms: Duration of silence between chunks (defaults to config) - crossfade_duration_ms: Duration of crossfade between chunks (0 for no crossfade) - normalize_volume: Whether to normalize volume across all chunks - remove_source_files: Whether to delete source files after concatenation - - Returns: - Dictionary with metadata about the concatenated audio: - { - 'output_path': str, - 'duration_seconds': float, - 'file_size_bytes': int, - 'sample_rate': int, - 'channels': int - } - - Raises: - AudioConcatenationError: If concatenation fails + This is the main entry point function. """ + # This function is now a wrapper to keep the signature clean + # while passing all arguments to the core logic. + return _concatenate_audio_files_core( + audio_files, output_path, output_format, **kwargs + ) + +def _concatenate_audio_files_core( + audio_files: List[Union[str, Path]], output_path: Union[str, Path], + output_format: str, silence_duration_ms: Optional[int] = None, + crossfade_duration_ms: int = 0, normalize_volume: bool = True, + remove_source_files: bool = False, quality: str = 'medium', + use_parallel_processing: bool = USE_PARALLEL_PROCESSING_DEFAULT, + progress_callback: Optional[Callable[[int, int], None]] = None +) -> AudioMetadata: + start_time = time.time() check_pydub_availability() + _validate_concatenation_params(audio_files) + _check_for_large_files(audio_files) - if not audio_files: - raise AudioConcatenationError("No audio files provided for concatenation") + if use_parallel_processing and _estimate_memory_usage(audio_files) > 1 * 1024 * 1024 * 1024: + logger.warning("High memory usage expected for this parallel job. Consider sequential mode if issues occur.") - if silence_duration_ms is None: - silence_duration_ms = Config.LONG_TEXT_SILENCE_PADDING_MS - - logger.info(f"Concatenating {len(audio_files)} audio files with {silence_duration_ms}ms silence padding") + silence_ms = silence_duration_ms if silence_duration_ms is not None else SILENCE_PADDING_MS + processing_mode = "parallel" if use_parallel_processing else "sequential" + logger.info(f"Concatenating {len(audio_files)} files in {processing_mode} mode (workers={MAX_WORKERS}).") + result = None try: - # Load all audio segments - segments = [] - for i, audio_file in enumerate(audio_files): - file_path = Path(audio_file) - if not file_path.exists(): - raise AudioConcatenationError(f"Audio file not found: {audio_file}") - - try: - # Detect format from extension - file_format = file_path.suffix.lower().lstrip('.') - if file_format == 'wav': - audio = AudioSegment.from_wav(str(file_path)) - elif file_format == 'mp3': - audio = AudioSegment.from_mp3(str(file_path)) - elif file_format in ['m4a', 'aac']: - audio = AudioSegment.from_file(str(file_path), format='m4a') - else: - # Try to auto-detect - audio = AudioSegment.from_file(str(file_path)) - - segments.append(audio) - logger.debug(f"Loaded audio segment {i+1}/{len(audio_files)}: " - f"{len(audio)} ms, {audio.frame_rate} Hz, {audio.channels} channels") - - except Exception as e: - raise AudioConcatenationError(f"Failed to load audio file {audio_file}: {e}") - - if not segments: - raise AudioConcatenationError("No valid audio segments loaded") - - # Normalize audio properties if requested - if normalize_volume: - segments = _normalize_audio_levels(segments) - - # Ensure all segments have the same sample rate and channels - segments = _standardize_audio_properties(segments) - - # Create silence segment for padding - silence = AudioSegment.silent( - duration=silence_duration_ms, - frame_rate=segments[0].frame_rate - ) - - # Concatenate segments with silence or crossfade - result = segments[0] - - for segment in segments[1:]: - if crossfade_duration_ms > 0: - # Add crossfade between segments - result = result.append(segment, crossfade=crossfade_duration_ms) + all_segments = [None] * len(audio_files) + all_segments[0] = _load_from_cache_or_disk(audio_files[0], normalize_volume) + if progress_callback: progress_callback(1, len(audio_files)) + + target_frame_rate, target_channels = all_segments[0].frame_rate, all_segments[0].channels + + if len(audio_files) > 1: + segments_to_process = audio_files[1:] + if use_parallel_processing: + # Parallel processing logic + completed_count = 1 + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + future_map = {executor.submit(_load_from_cache_or_disk, fp, normalize_volume, target_frame_rate, target_channels): i + 1 for i, fp in enumerate(segments_to_process)} + for future in concurrent.futures.as_completed(future_map): + index = future_map[future] + try: all_segments[index] = future.result() + except Exception as exc: raise AudioConcatenationError(f"Failed to process {audio_files[index]}") from exc + completed_count += 1 + if progress_callback: progress_callback(completed_count, len(audio_files)) else: - # Add silence then append segment - if silence_duration_ms > 0: - result = result + silence - result = result + segment + # Sequential processing logic + for i, file_path in enumerate(segments_to_process): + all_segments[i + 1] = _load_from_cache_or_disk(file_path, normalize_volume, target_frame_rate, target_channels) + if progress_callback: progress_callback(i + 2, len(audio_files)) + + # Concatenation and export logic... + silence = AudioSegment.silent(duration=silence_ms, frame_rate=target_frame_rate) + result = all_segments[0] + for segment in all_segments[1:]: + if crossfade_duration_ms > 0: result = result.append(segment, crossfade=crossfade_duration_ms) + else: + if silence_ms > 0: result += silence + result += segment - # Export the concatenated audio output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) + result.export(str(output_path), format=output_format, **_get_export_parameters(output_format, quality)) - # Set export parameters based on format - export_params = _get_export_parameters(output_format) - - result.export( - str(output_path), - format=output_format, - **export_params - ) - - # Get file metadata - file_size = output_path.stat().st_size - duration_seconds = len(result) / 1000.0 - - metadata = { - 'output_path': str(output_path), - 'duration_seconds': duration_seconds, - 'file_size_bytes': file_size, - 'sample_rate': result.frame_rate, - 'channels': result.channels + duration_s = time.time() - start_time + metadata: AudioMetadata = { + 'output_path': str(output_path), 'duration_seconds': len(result) / 1000.0, + 'file_size_bytes': output_path.stat().st_size, 'sample_rate': result.frame_rate, 'channels': result.channels } + logger.info(f"Concatenation successful in {duration_s:.2f}s. Rate: {len(audio_files) / duration_s:.1f} files/sec.") - logger.info(f"Audio concatenation successful: {duration_seconds:.1f}s, " - f"{file_size:,} bytes, saved to {output_path}") - - # Clean up source files if requested if remove_source_files: for audio_file in audio_files: - try: - Path(audio_file).unlink() - logger.debug(f"Removed source file: {audio_file}") - except Exception as e: - logger.warning(f"Failed to remove source file {audio_file}: {e}") - + try: Path(audio_file).unlink() + except OSError as e: logger.warning(f"Failed to remove source file {audio_file}: {e}") return metadata - except AudioConcatenationError: - raise except Exception as e: raise AudioConcatenationError(f"Audio concatenation failed: {e}") + finally: + del result; gc.collect() -def _normalize_audio_levels(segments: List[AudioSegment]) -> List[AudioSegment]: - """Normalize volume levels across all audio segments""" - if not segments: - return segments - - try: - # Calculate average dBFS across all segments - total_dbfs = sum(segment.dBFS for segment in segments if segment.dBFS is not None) - avg_dbfs = total_dbfs / len(segments) - - # Target level (slightly below 0 dBFS to prevent clipping) - target_dbfs = -3.0 - - # Normalize each segment - normalized_segments = [] - for segment in segments: - if segment.dBFS is not None: - # Calculate gain adjustment - gain_adjustment = target_dbfs - segment.dBFS - # Apply gain with some limits to prevent extreme adjustments - gain_adjustment = max(-20, min(20, gain_adjustment)) - normalized_segment = segment.apply_gain(gain_adjustment) - else: - normalized_segment = segment - - normalized_segments.append(normalized_segment) - - logger.debug(f"Normalized {len(segments)} audio segments") - return normalized_segments - - except Exception as e: - logger.warning(f"Failed to normalize audio levels: {e}") - return segments - - -def _standardize_audio_properties(segments: List[AudioSegment]) -> List[AudioSegment]: - """Ensure all segments have the same sample rate and channel count""" - if not segments: - return segments - - # Use properties from the first segment as reference - reference_segment = segments[0] - target_frame_rate = reference_segment.frame_rate - target_channels = reference_segment.channels - - standardized_segments = [] - - for i, segment in enumerate(segments): - standardized_segment = segment +# --- Cache Management --- - # Convert to target frame rate if needed - if segment.frame_rate != target_frame_rate: - standardized_segment = standardized_segment.set_frame_rate(target_frame_rate) - logger.debug(f"Converted segment {i} from {segment.frame_rate} Hz to {target_frame_rate} Hz") +def _manage_cache_size(): + """Evicts the oldest entries from the cache if it exceeds the configured size limit.""" + if not _segment_cache or CACHE_MAX_SIZE_BYTES == 0: + return - # Convert to target channel count if needed - if segment.channels != target_channels: - if target_channels == 1 and segment.channels == 2: - # Convert stereo to mono - standardized_segment = standardized_segment.set_channels(1) - elif target_channels == 2 and segment.channels == 1: - # Convert mono to stereo - standardized_segment = standardized_segment.set_channels(2) - logger.debug(f"Converted segment {i} from {segment.channels} to {target_channels} channels") + # Estimate size by summing the size of each AudioSegment object. + current_size = sum(sys.getsizeof(seg.raw_data) for seg in _segment_cache.values()) - standardized_segments.append(standardized_segment) - - return standardized_segments - - -def _get_export_parameters(output_format: str) -> dict: - """Get optimal export parameters for the given format""" - export_params = {} - - if output_format.lower() == 'mp3': - export_params.update({ - 'bitrate': '128k', - 'parameters': ['-q:a', '2'] # High quality VBR - }) - elif output_format.lower() == 'wav': - export_params.update({ - 'parameters': ['-acodec', 'pcm_s16le'] # 16-bit PCM - }) - - return export_params - - -def create_silence_audio(duration_ms: int, - sample_rate: int = 22050, - channels: int = 1, - output_path: Optional[Union[str, Path]] = None, - output_format: str = "wav") -> Optional[str]: - """ - Create a silence audio file of specified duration. - - Args: - duration_ms: Duration of silence in milliseconds - sample_rate: Sample rate for the audio - channels: Number of audio channels - output_path: Path to save the silence file (optional) - output_format: Format for the output file - - Returns: - Path to the created silence file if output_path is specified, None otherwise - """ - check_pydub_availability() + if current_size > CACHE_MAX_SIZE_BYTES: + logger.warning( + f"Cache size ({current_size // 1024**2}MB) exceeds limit ({CACHE_MAX_SIZE_MB}MB). Evicting oldest entries." + ) + # Sort keys by last modified time (the second element in the tuple key) + sorted_keys = sorted(_segment_cache.keys(), key=lambda k: k[1]) + # Evict the oldest 20% of entries + num_to_evict = max(1, len(sorted_keys) // 5) + keys_to_remove = sorted_keys[:num_to_evict] + for key in keys_to_remove: + del _segment_cache[key] + logger.info(f"Evicted {len(keys_to_remove)} entries from cache.") + +def _load_from_cache_or_disk(file_path: Union[str, Path], normalize: bool, + target_rate: Optional[int] = None, target_ch: Optional[int] = None) -> AudioSegment: + """Wrapper to use in-memory cache for loading segments, with automated management.""" + global _last_cache_clear_time + if CACHE_CLEAR_INTERVAL_S > 0 and (time.time() - _last_cache_clear_time > CACHE_CLEAR_INTERVAL_S): + logger.info(f"Clearing segment cache due to interval ({CACHE_CLEAR_INTERVAL_S}s).") + _segment_cache.clear() + _last_cache_clear_time = time.time() try: - silence = AudioSegment.silent( - duration=duration_ms, - frame_rate=sample_rate - ).set_channels(channels) - - if output_path: - output_path = Path(output_path) - output_path.parent.mkdir(parents=True, exist_ok=True) - - export_params = _get_export_parameters(output_format) - silence.export(str(output_path), format=output_format, **export_params) - - return str(output_path) - - return None - - except Exception as e: - raise AudioConcatenationError(f"Failed to create silence audio: {e}") - - -def validate_audio_file(file_path: Union[str, Path]) -> dict: - """ - Validate and get metadata for an audio file. - - Args: - file_path: Path to the audio file - - Returns: - Dictionary with audio file metadata: - { - 'valid': bool, - 'duration_seconds': float, - 'sample_rate': int, - 'channels': int, - 'format': str, - 'file_size_bytes': int, - 'error': str (if valid=False) - } - """ - file_path = Path(file_path) - - if not file_path.exists(): - return {'valid': False, 'error': f'File not found: {file_path}'} - + path_obj = Path(file_path) + last_modified = path_obj.stat().st_mtime + cache_key = (str(path_obj), last_modified, normalize, target_rate, target_ch) + if cache_key in _segment_cache: + logger.debug(f"Cache hit for {path_obj.name}") + return _segment_cache[cache_key].copy() + except FileNotFoundError: pass + + segment = _load_and_prep_segment_with_retry(file_path, normalize, target_rate, target_ch) + + if 'cache_key' in locals(): + _segment_cache[cache_key] = segment.copy() + _manage_cache_size() # Check and manage cache size after adding a new item. + + return segment + +# --- Other Helper & Internal Functions --- + +def _load_and_prep_segment_with_retry(file_path: Union[str, Path], normalize: bool, + target_rate: Optional[int] = None, target_ch: Optional[int] = None, + max_retries: int = 2) -> AudioSegment: + """Loads a segment, retrying on transient failures.""" + last_exception = None + for attempt in range(max_retries + 1): + try: + return _load_and_prep_segment(file_path, normalize, target_rate, target_ch) + except Exception as e: + last_exception = e + if attempt < max_retries: + logger.warning(f"Attempt {attempt + 1} failed for {Path(file_path).name}: {e}. Retrying...") + time.sleep(0.1 * (attempt + 1)) + raise last_exception + +def _load_and_prep_segment(file_path: Union[str, Path], normalize: bool, + target_rate: Optional[int] = None, target_ch: Optional[int] = None) -> AudioSegment: + """The core logic to load, standardize, and normalize a single audio segment.""" try: - check_pydub_availability() - - # Load the audio file - audio = AudioSegment.from_file(str(file_path)) - - return { - 'valid': True, - 'duration_seconds': len(audio) / 1000.0, - 'sample_rate': audio.frame_rate, - 'channels': audio.channels, - 'format': file_path.suffix.lower().lstrip('.'), - 'file_size_bytes': file_path.stat().st_size, - 'error': None - } - + path = Path(file_path) + segment = AudioSegment.from_file(str(path), format=detect_audio_format(path)) + if target_rate and target_ch: + segment = _standardize_segment_properties(segment, target_rate, target_ch) + if normalize: + segment = _normalize_segment_peak(segment) + return segment except Exception as e: - return {'valid': False, 'error': str(e)} + raise AudioConcatenationError(f"Failed to process segment {Path(file_path).name}") from e + +def _normalize_segment_peak(segment: AudioSegment) -> AudioSegment: + if segment.max_dBFS == float('-inf'): return segment + return segment.apply_gain(-1.0 - segment.max_dBFS) + +def _standardize_segment_properties(segment: AudioSegment, target_rate: int, target_ch: int) -> AudioSegment: + if segment.frame_rate != target_rate: segment = segment.set_frame_rate(target_rate) + if segment.channels != target_ch: segment = segment.set_channels(target_ch) + return segment + +def _get_export_parameters(output_format: str, quality: str = 'medium') -> dict: + presets = { + 'mp3': {'low': {'bitrate': '96k'}, 'medium': {'bitrate': '128k'}, 'high': {'bitrate': '192k'}, 'lossless': {'bitrate': '320k'}}, + 'opus': {'low': {'bitrate': '64k'}, 'medium': {'bitrate': '96k'}, 'high': {'bitrate': '128k'}}, + 'wav': {'medium': {'parameters': ['-acodec', 'pcm_s16le']}} + } + fmt = output_format.lower() + if fmt in presets: return presets[fmt].get(quality, presets[fmt].get('medium', {})) + return {} +def check_pydub_availability(): + if not PYDUB_AVAILABLE: raise AudioConcatenationError("pydub not available. Install with: pip install pydub") + try: AudioSegment.silent(duration=10) + except Exception as e: raise AudioConcatenationError(f"pydub not configured correctly: {e}") -def estimate_concatenation_time(num_files: int, total_duration_seconds: float) -> int: - """ - Estimate the time required to concatenate audio files. - - Args: - num_files: Number of files to concatenate - total_duration_seconds: Total duration of all audio files - - Returns: - Estimated processing time in seconds - """ - # Base processing time: 0.1 seconds per second of audio - base_time = total_duration_seconds * 0.1 - - # File I/O overhead: 1 second per file - io_overhead = num_files * 1 - - # Additional overhead for format conversion, normalization, etc. - processing_overhead = 5 +def _validate_concatenation_params(audio_files: list): + if not audio_files: raise AudioConcatenationError("No audio files provided.") + if len(audio_files) > MAX_FILES: raise AudioConcatenationError(f"File count exceeds limit of {MAX_FILES}.") + try: + total_size = sum(Path(f).stat().st_size for f in audio_files) + if total_size > MAX_TOTAL_SIZE_BYTES: raise AudioConcatenationError(f"Total file size exceeds limit of {MAX_SIZE_MB} MB.") + except FileNotFoundError as e: raise AudioConcatenationError(f"Audio file not found: {e.filename}") - return max(10, int(base_time + io_overhead + processing_overhead)) \ No newline at end of file +def _check_for_large_files(audio_files: List[Union[str, Path]]): + try: + for fp in audio_files: + path = Path(fp) + if path.stat().st_size > LARGE_FILE_THRESHOLD_BYTES: + logger.warning(f"Processing large file: {path.name} ({path.stat().st_size // (1024*1024)} MB).") + except FileNotFoundError: pass + +def _estimate_memory_usage(audio_files: list) -> int: + try: return int(sum(Path(f).stat().st_size * 2.5 for f in audio_files)) + except FileNotFoundError: return 0 + +def detect_audio_format(file_path: Path) -> Optional[str]: + ext = file_path.suffix.lower() + if ext in {'.wav', '.mp3', '.m4a', '.aac', '.ogg', '.flac', '.opus'}: return ext[1:] + return None diff --git a/app/core/text_processing.py b/app/core/text_processing.py index d90c90f..994aa7a 100644 --- a/app/core/text_processing.py +++ b/app/core/text_processing.py @@ -1,609 +1,766 @@ """ Text processing utilities for TTS +- Robust sentence splitting (abbrev/decimals/quotes/ellipses), bullet handling, non-verbal cues +- TTS-friendly normalization baked in (°, ℃/℉/K, primes, %, currencies, fractions, ellipses, µ/Ω, per-slash, etc.) +- Enhanced with ordinal/roman numeral/time normalization, performance optimizations, and robust error handling. +- Uses `num2words` library if available for superior number-to-word conversion. +- Correctly verbalizes large, comma-formatted numbers and handles parenthetical content gracefully. +- Intelligently handles monetary values with magnitude words (e.g., "$4.65 billion"). +- Expands common timezone abbreviations (e.g., "ET" -> "Eastern Time"). +- Normalizes dates ("November 4" -> "November fourth") and number ranges ("2018-2019" -> "2018 to 2019"). +- Handles full dates ("November 3, 2025") as a single unit for natural prosody. +- Splits sentences at headline-style colons for natural pauses. +- Includes phonetic hints, scientific notation, chemical formulas, and other advanced edge cases. +- Verbalizes appended symbols like in "OPEC+". +- Converts parentheticals, em-dashes, and semicolons to comma-separated clauses for improved prosody. +- Expands common symbols like 'vs.', '>', '<', '~' and mathematical operators (±, ×, ÷, ≈, ≠, ≤, ≥) for more natural speech. """ +from __future__ import annotations import gc -import torch +import logging import re -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Set, Dict +from functools import lru_cache + +import torch from app.config import Config from app.models.long_text import LongTextChunk +logger = logging.getLogger(__name__) + +# ============================================================================= +# OPTIONAL DEPENDENCY: num2words +# ============================================================================= +try: + from num2words import num2words + _NUM2WORDS_AVAILABLE = True + logger.info("num2words library found. Using advanced number-to-word conversion.") +except ImportError: + _NUM2WORDS_AVAILABLE = False + logger.info("num2words library not found. Falling back to basic number-to-words conversion.") + +# ============================================================================= +# CUSTOMIZATION: PHONETIC HINTS +# ============================================================================= +# Add custom pronunciations for acronyms, jargon, or brand names. +# Keys are case-insensitive and treated as whole words. +PHONETIC_HINTS = { + "SQL": "sequel", + "GIF": "jiff", + "NGINX": "engine-x", + "LLM": "L L M", + "API": "A P I", +} +# Pre-process hints for regex +_PHONETIC_HINTS_UPPER = {k.upper(): v for k, v in PHONETIC_HINTS.items()} +_PHONETIC_RE = re.compile(r"\b(" + "|".join(_PHONETIC_HINTS_UPPER.keys()) + r")\b", re.IGNORECASE) + + +# ============================================================================= +# PERFORMANCE: PRE-COMPILED REGEX PATTERNS +# ============================================================================= + +_URL_RE = re.compile(r"""(?P(?:(?:https?|ftp)://)[^\s<>'"()]+)""", re.IGNORECASE | re.VERBOSE) +_EMAIL_RE = re.compile(r"""(?P\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b)""", re.VERBOSE) +_ELLIPSIS_RE = re.compile(r"\u2026") # … +_MANUAL_ELLIPSIS_RE = re.compile(r"(?-?[\d,]+(?:\.\d+)?)\s*(?:°\s*C|℃)\b", re.IGNORECASE) +_TEMP_F_RE = re.compile(r"(?P-?[\d,]+(?:\.\d+)?)\s*(?:°\s*F|℉)\b", re.IGNORECASE) +_TEMP_K_RE = re.compile(r"(?P-?[\d,]+(?:\.\d+)?)\s*K\b") +_DEGREE_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*°(?!\s*[CFcf])") +_DMS_LONG_RE = re.compile(r"(?P\d{1,3})\s*°\s*(?P\d{1,2})\s*[′']\s*(?P\d{1,2})\s*[″\"]\s*(?P[NSEW])?", re.IGNORECASE) +_DMS_SHORT_RE = re.compile(r"(?P\d{1,3})\s*°\s*(?P\d{1,2})\s*[′']\s*(?P[NSEW])?", re.IGNORECASE) +_FEET_INCHES_RE = re.compile(r"(?P\d{1,2})\s*[′']\s*(?P\d{1,2})\s*[″\"]\b") +_FEET_RE = re.compile(r"(?P\d{1,2})\s*[′']\b") +_INCHES_RE = re.compile(r"(?P\d{1,2})\s*[″\"]\b") +_PERCENT_RE = re.compile(r"(?P-?[\d,]+(?:\.\d+)?)\s*%") +_PERMILLE_RE = re.compile(r"(?P-?[\d,]+(?:\.\d+)?)\s*‰") +_BASIS_PTS_RE = re.compile(r"(?P-?[\d,]+(?:\.\d+)?)\s*ⱀ") +_CURRENCY_MAGNITUDE_RE = re.compile(r"(?P[$€£¥₹₩₦₽₪])\s?(?P[\d,]+(?:\.\d+)?)\s*(?Pmillion|billion|trillion)\b", re.IGNORECASE) +_CURRENCY_PRE_RE = re.compile(r"(?[$€£¥₹₩₦₽₪])\s?(?P[\d,]+(?:\.\d+)?)(?!\s*(?:million|billion|trillion)\b)", re.IGNORECASE) +_CURRENCY_POST_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s?(?P[€£¥₹₩₦₽₪])\b") +_AMPERSAND_RE = re.compile(r"\s*&\s*") +_WORD_PLUS_RE = re.compile(r"\b([A-Z][a-zA-Z0-9]*)\+(?!\w)") +_POSSESSIVE_S_RE = re.compile(r"(\b\d{4})'s\b") # e.g. 1980's +_MICRO_UNITS_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*[µμ]\s?(?P[A-Za-z]+)\b") +_KILOOHM_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*kΩ\b", re.IGNORECASE) +_MEGAOHM_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*MΩ\b", re.IGNORECASE) +_OHM_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*Ω\b") +_PER_SLASH_RE = re.compile(r"\b(?P[A-Za-z]{1,6})\s*/\s*(?P[A-Za-z]{1,6})\b") +_HASHTAG_NUM_RE = re.compile(r"#(?P[\d,]+)\b") +_HASHTAG_TAG_RE = re.compile(r"#(?P[A-Za-z_][A-Za-z0-9_]*)") +_MENTION_RE = re.compile(r"@(?P[A-Za-z0-9_]{2,})\b") +_ORDINAL_RE = re.compile(r"\b(\d+)(st|nd|rd|th)\b") +_TIME_12H_RE = re.compile(r"\b(\d{1,2}):(\d{2})\s*([AP]\.?M\.?)\b", re.IGNORECASE) +_TIME_24H_RE = re.compile(r"\b([01]?\d|2[0-3]):([0-5]\d)\b") +_YEAR_RANGE_RE = re.compile(r"\b(\d{4})\s?[–-]\s?(\d{4})\b") +_NUMBER_RANGE_RE = re.compile(r"\b(\d+)\s?[–-]\s?(\d+)\b") +_ROMAN_NUMERAL_RE = re.compile(r"\b(X|IX|IV|V?I{0,3})\b") # Common cases up to 10 +_FORMATTED_NUMBER_RE = re.compile(r'\b\d{1,3}(?:,\d{3})+(?:\.\d+)?\b') +_SIMPLE_NUMBER_RE = re.compile(r'\b\d+\b') +_PARENS_ACRONYM_RE = re.compile(r"\s+\(([A-Z]{2,6})\)") +_SCIENTIFIC_NOTATION_RE = re.compile(r"\b([\d\.]+)\s?[xXeE]\s?10\^([\d\.\-]+)\b", re.IGNORECASE) +_US_PHONE_RE = re.compile(r"\b\(?(\d{3})\)?[\s.-]?(\d{3})[\s.-]?(\d{4})\b") +_CHEM_FORMULA_RE = re.compile(r"\b([A-Z][a-z]?)(\d+)\b") +_VS_RE = re.compile(r"\b(vs\.?|v\.?)\b", re.IGNORECASE) +_RELATIONAL_RE = re.compile(r"\s*([<>])\s*") +_APPROX_RE = re.compile(r"~\s*") +_WHITESPACE_RE = re.compile(r"\s{2,}") +_COMMA_CLEANUP_RE = re.compile(r"(\s*,\s*){2,}") + +_MONTH_NAMES = r"January|February|March|April|May|June|July|August|September|October|November|December" +_DATE_FULL_RE = re.compile(fr"\b({_MONTH_NAMES})\s+(\d{{1,2}})(?:st|nd|rd|th)?,\s+(\d{{4}})\b", re.IGNORECASE) +_DATE_MONTH_DAY_RE = re.compile(fr"\b({_MONTH_NAMES})\s+(\d{{1,2}})(?!st|nd|rd|th)\b", re.IGNORECASE) + +_TIMEZONES = { + "ET": "Eastern Time", "EST": "Eastern Standard Time", "EDT": "Eastern Daylight Time", + "CT": "Central Time", "CST": "Central Standard Time", "CDT": "Central Daylight Time", + "MT": "Mountain Time", "MST": "Mountain Standard Time", "MDT": "Mountain Daylight Time", + "PT": "Pacific Time", "PST": "Pacific Standard Time", "PDT": "Pacific Daylight Time", + "UTC": "Coordinated Universal Time", "GMT": "Greenwich Mean Time", +} +_TIMEZONE_RE = re.compile(r"\b(" + "|".join(_TIMEZONES.keys()) + r")\b", re.IGNORECASE) + +# Expanded unit patterns +_MPH_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*mph\b", re.IGNORECASE) +_KPH_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*kph\b", re.IGNORECASE) +_INHG_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*inHg\b", re.IGNORECASE) +_MB_RE = re.compile(r"(?P[\d,]+(?:\.\d+)?)\s*mb\b", re.IGNORECASE) + +# Mathematical operators +_MATH_OPS = { + "±": " plus or minus ", "×": " times ", "÷": " divided by ", + "≈": " is approximately equal to ", "≠": " is not equal to ", + "≤": " is less than or equal to ", "≥": " is greater than or equal to " +} + + +# ============================================================================= +# NORMALIZATION +# ============================================================================= + +def _mask(text: str) -> Tuple[str, Dict[str, str]]: + """ + Mask URLs and emails to prevent them from being normalized. -def split_text_into_chunks(text: str, max_length: int = None) -> list: - """Split text into manageable chunks for TTS processing""" - if max_length is None: - max_length = Config.MAX_CHUNK_LENGTH - - if len(text) <= max_length: - return [text] - - # Try to split at sentence boundaries first - sentence_endings = ['. ', '! ', '? ', '.\n', '!\n', '?\n'] - chunks = [] - current_chunk = "" - - # Split into sentences - sentences = [] - temp_text = text - - while temp_text: - best_split = len(temp_text) - best_ending = "" + Returns: + Tuple of (masked_text, mapping_dict) where mapping_dict + maps placeholder tokens to original URLs/emails. + """ + mapping: Dict[str, str] = {} + idx = 0 + + def repl_url(m): + nonlocal idx + key = f"__URL_{idx}__" + mapping[key] = m.group("url") + idx += 1 + return key + + def repl_email(m): + nonlocal idx + key = f"__EMAIL_{idx}__" + mapping[key] = m.group("email") + idx += 1 + return key + + text = _URL_RE.sub(repl_url, text) + text = _EMAIL_RE.sub(repl_email, text) + return text, mapping + +def _unmask(text: str, mapping: Dict[str, str], *, read_urls: bool) -> str: + """Replaces placeholder tokens with original or spoken URLs/emails.""" + for key, val in mapping.items(): + if read_urls: + spoken = val.replace("://", " colon slash slash ").replace("/", " slash ") + spoken = spoken.replace(".", " dot ").replace("-", " dash ") + text = text.replace(key, spoken) + else: + text = text.replace(key, val) + return text + +def _pluralize(unit: str, value: str) -> str: + """Pluralizes a unit based on the numeric value.""" + try: + clean_value = value.replace(",", "") + v = float(clean_value) + except (ValueError, AttributeError) as e: + logger.warning(f"Could not parse value for pluralization: {value}. Details: {e}") + return unit + return unit if abs(v) == 1 else unit + "s" + +@lru_cache(maxsize=1024) +def _verbalize_number(num_str: str, to_year: bool = False) -> str: + """Converts a number string to words, with special handling for years. Cached for performance.""" + if not num_str: + return "" + + clean_num_str = num_str.replace(',', '') + + try: + # Handle decimals + if '.' in clean_num_str: + integer_part, decimal_part = clean_num_str.split('.', 1) + integer_words = _verbalize_number(integer_part) if integer_part and integer_part != '0' else "zero" + if not integer_part: integer_words = "zero" # Handles cases like ".5" + decimal_words = ' '.join(_verbalize_number(c) for c in decimal_part) + return f"{integer_words} point {decimal_words}" + + num = int(clean_num_str) - for ending in sentence_endings: - pos = temp_text.find(ending) - if pos != -1 and pos < best_split: - best_split = pos + len(ending) - best_ending = ending + if _NUM2WORDS_AVAILABLE: + return num2words(num, to='year' if to_year else 'cardinal') - if best_split == len(temp_text): - # No sentence ending found, take the rest - sentences.append(temp_text) - break - else: - sentences.append(temp_text[:best_split]) - temp_text = temp_text[best_split:] - - # Group sentences into chunks - for sentence in sentences: - sentence = sentence.strip() - if not sentence: - continue - - if len(current_chunk) + len(sentence) <= max_length: - current_chunk += (" " if current_chunk else "") + sentence - else: - if current_chunk: - chunks.append(current_chunk.strip()) - - # If single sentence is too long, split it further - if len(sentence) > max_length: - # Split at commas, semicolons, etc. - sub_delimiters = [', ', '; ', ' - ', ' — '] - sub_chunks = [sentence] - - for delimiter in sub_delimiters: - new_sub_chunks = [] - for chunk in sub_chunks: - if len(chunk) <= max_length: - new_sub_chunks.append(chunk) - else: - parts = chunk.split(delimiter) - current_part = "" - for part in parts: - if len(current_part) + len(delimiter) + len(part) <= max_length: - current_part += (delimiter if current_part else "") + part - else: - if current_part: - new_sub_chunks.append(current_part) - current_part = part - if current_part: - new_sub_chunks.append(current_part) - sub_chunks = new_sub_chunks - - # Add sub-chunks - for sub_chunk in sub_chunks: - if len(sub_chunk) <= max_length: - chunks.append(sub_chunk.strip()) - else: - # Last resort: split by words - words = sub_chunk.split() - current_word_chunk = "" - for word in words: - if len(current_word_chunk) + len(word) + 1 <= max_length: - current_word_chunk += (" " if current_word_chunk else "") + word - else: - if current_word_chunk: - chunks.append(current_word_chunk) - current_word_chunk = word - if current_word_chunk: - chunks.append(current_word_chunk) - current_chunk = "" - else: - current_chunk = sentence - - if current_chunk: - chunks.append(current_chunk.strip()) - - # Filter out empty chunks - chunks = [chunk for chunk in chunks if chunk.strip()] - - return chunks - - -def split_text_for_streaming( + # Fallback implementation + if to_year and 1000 <= num <= 2999: + if num % 100 == 0 and num % 1000 != 0: return f"{_verbalize_number(str(num//100))} hundred" + return f"{_verbalize_number(str(num//100))} {_verbalize_number(str(num%100))}" + + if num < 0: return f"minus {_verbalize_number(str(abs(num)))}" + if num < 1000: + if num == 0: return "zero" + ones = ["", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"] + teens = ["ten", "eleven", "twelve", "thirteen", "fourteen", "fifteen", "sixteen", "seventeen", "eighteen", "nineteen"] + tens = ["", "", "twenty", "thirty", "forty", "fifty", "sixty", "seventy", "eighty", "ninety"] + if 1 <= num < 10: return ones[num] + if 10 <= num < 20: return teens[num - 10] + if 20 <= num < 100: return tens[num // 10] + (" " + ones[num % 10] if num % 10 else "") + if 100 <= num < 1000: return ones[num // 100] + " hundred" + (" " + _verbalize_number(str(num % 100)) if num % 100 else "") + + if num < 1_000_000: + thousands = num // 1000 + remainder = num % 1000 + result = f"{_verbalize_number(str(thousands))} thousand" + if remainder: result += f" {_verbalize_number(str(remainder))}" + return result + + if num < 1_000_000_000: + millions = num // 1_000_000 + remainder = num % 1_000_000 + result = f"{_verbalize_number(str(millions))} million" + if remainder: result += f" {_verbalize_number(str(remainder))}" + return result + + logger.warning(f"Basic number to words fallback cannot handle: {num_str}") + return clean_num_str + + except (ValueError, AttributeError): + return num_str + +def _to_ordinal_word(num: int) -> str: + """Converts an integer to its ordinal word form.""" + if _NUM2WORDS_AVAILABLE: + return num2words(num, to='ordinal') + # Fallback logic + if 11 <= num % 100 <= 13: return f"{_verbalize_number(str(num))}th" + last_digit = num % 10 + if last_digit == 1: return f"{_verbalize_number(str(num))}st" + if last_digit == 2: return f"{_verbalize_number(str(num))}nd" + if last_digit == 3: return f"{_verbalize_number(str(num))}rd" + return f"{_verbalize_number(str(num))}th" + +def normalize_for_tts( text: str, - chunk_size: Optional[int] = None, - strategy: Optional[str] = None, - quality: Optional[str] = None -) -> List[str]: - """ - Split text into chunks optimized for streaming with different strategies. - - Args: - text: Input text to split - chunk_size: Target chunk size (characters) - strategy: Splitting strategy ('sentence', 'paragraph', 'fixed', 'word') - quality: Quality preset ('fast', 'balanced', 'high') - - Returns: - List of text chunks optimized for streaming - """ - # Apply quality presets - if quality: - if quality == "fast": - chunk_size = chunk_size or 100 - strategy = strategy or "word" - elif quality == "balanced": - chunk_size = chunk_size or 200 - strategy = strategy or "sentence" - elif quality == "high": - chunk_size = chunk_size or 300 - strategy = strategy or "paragraph" - - # Set defaults - chunk_size = chunk_size or 200 - strategy = strategy or "sentence" - - # Apply strategy-specific splitting - if strategy == "paragraph": - return _split_by_paragraphs(text, chunk_size) - elif strategy == "sentence": - return _split_by_sentences(text, chunk_size) - elif strategy == "word": - return _split_by_words(text, chunk_size) - elif strategy == "fixed": - return _split_by_fixed_size(text, chunk_size) + *, + speak_marks: bool = False, + convert_ascii_fractions: bool = False, + read_urls: bool = False, +) -> str: + """Normalize symbols to spoken words for TTS. Safe defaults; URLs/emails protected.""" + if not text or text.isspace(): + return text + + s, maskmap = _mask(text) + + # Apply phonetic hints first to override any other rules + s = _PHONETIC_RE.sub(lambda m: _PHONETIC_HINTS_UPPER[m.group(1).upper()], s) + + # Prosody improvements: dashes, semicolons, ellipses -> commas + s = _DASHES_RE.sub(", ", s) + s = _SEMICOLON_RE.sub(", ", s) + s = _ELLIPSIS_RE.sub(", ", s) + s = _MANUAL_ELLIPSIS_RE.sub(", ", s) + + # Chemical formulas and subscripts + _SUBSCRIPT_MAP = {'₀':'0', '₁':'1', '₂':'2', '₃':'3', '₄':'4', '₅':'5', '₆':'6', '₇':'7', '₈':'8', '₉':'9'} + s = "".join(_SUBSCRIPT_MAP.get(c, c) for c in s) + s = _CHEM_FORMULA_RE.sub(lambda m: f"{m.group(1)} {_verbalize_number(m.group(2))}", s) + + # Temperature (°C/℉), Kelvin + s = _TEMP_C_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} degrees Celsius", s) + s = _TEMP_F_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} degrees Fahrenheit", s) + s = _TEMP_K_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} kelvins", s) + + # Bare degree (angles) + s = _DEGREE_RE.sub(lambda m: f"{_verbalize_number(m.group('deg'))} degrees", s) + + # Specific units (mph, inHg, etc.) + s = _MPH_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} miles per hour", s) + s = _KPH_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} kilometers per hour", s) + s = _INHG_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} inches of mercury", s) + s = _MB_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} millibars", s) + + # Mathematical operators + for op, spoken in _MATH_OPS.items(): + s = s.replace(op, spoken) + + # DMS angles & primes + def repl_dms(m): + deg, minutes, seconds, hemi = m.group("d"), m.group("m"), m.group("s"), m.group("h") + parts = [f"{_verbalize_number(deg)} degrees"] + if minutes: parts.append(f"{_verbalize_number(minutes)} minutes") + if seconds: parts.append(f"{_verbalize_number(seconds)} seconds") + if hemi: parts.append(hemi.strip()) + return " ".join(parts) + s = _DMS_LONG_RE.sub(repl_dms, s) + s = _DMS_SHORT_RE.sub( + lambda m: f"{_verbalize_number(m.group('d'))} degrees {_verbalize_number(m.group('m'))} minutes" + (f" {m.group('h')}" if m.group('h') else ""), + s + ) + + # Heights 5′10″ / 5'10" + s = _FEET_INCHES_RE.sub( + lambda m: f"{_verbalize_number(m.group('ft'))} {_pluralize('foot', m.group('ft'))} " + f"{_verbalize_number(m.group('in'))} {_pluralize('inch', m.group('in'))}", + s + ) + s = _FEET_RE.sub(lambda m: f"{_verbalize_number(m.group('ft'))} {_pluralize('foot', m.group('ft'))}", s) + s = _INCHES_RE.sub(lambda m: f"{_verbalize_number(m.group('inch'))} {_pluralize('inch', m.group('inch'))}", s) + + # Percent / permille / basis points + s = _PERCENT_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} percent", s) + s = _PERMILLE_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} per mille", s) + s = _BASIS_PTS_RE.sub(lambda m: f"{_verbalize_number(m.group('val'))} basis points", s) + + # Currencies + _CURRENCY_NAMES = {"$": "dollar", "€": "euro", "£": "pound", "¥": "yen", "₹": "rupee", "₩": "won", "₦": "naira", "₽": "ruble", "₪": "shekel"} + def repl_currency_magnitude(m): + amount = _verbalize_number(m.group('amt')) + magnitude = m.group('mag').lower() + currency_name = _CURRENCY_NAMES.get(m.group('sym'), 'currency') + pluralized_currency = _pluralize(currency_name, "2") # The amount is > 1 + return f"{amount} {magnitude} {pluralized_currency}" + s = _CURRENCY_MAGNITUDE_RE.sub(repl_currency_magnitude, s) + s = _CURRENCY_PRE_RE.sub(lambda m: f"{_verbalize_number(m.group('amt'))} {_pluralize(_CURRENCY_NAMES.get(m.group('sym'), 'currency'), m.group('amt'))}", s) + s = _CURRENCY_POST_RE.sub(lambda m: f"{_verbalize_number(m.group('amt'))} {_pluralize(_CURRENCY_NAMES.get(m.group('sym'), 'currency'), m.group('amt'))}", s) + + # Unicode fractions + _FRACTIONS = {"½": "one half", "⅓": "one third", "⅔": "two thirds", "¼": "one quarter", "¾": "three quarters", "⅛": "one eighth", "⅜": "three eighths", "⅝": "five eighths", "⅞": "seven eighths", "⅕": "one fifth", "⅖": "two fifths", "⅗": "three fifths", "⅘": "four fifths", "⅙": "one sixth", "⅚": "five sixths", "⅐": "one seventh", "⅑": "one ninth", "⅒": "one tenth"} + s = "".join(_FRACTIONS.get(ch, ch) for ch in s) + if convert_ascii_fractions: + s = re.sub(r"\b1/2\b", "one half", s); s = re.sub(r"\b1/4\b", "one quarter", s); s = re.sub(r"\b3/4\b", "three quarters", s) + + # Dates and ordinals (most specific rule first) + s = _DATE_FULL_RE.sub(lambda m: f"{m.group(1)} {_to_ordinal_word(int(m.group(2)))} {_verbalize_number(m.group(3), to_year=True)}", s) + s = _ORDINAL_RE.sub(lambda m: _to_ordinal_word(int(m.group(1))), s) + s = _DATE_MONTH_DAY_RE.sub(lambda m: f"{m.group(1)} {_to_ordinal_word(int(m.group(2)))}", s) + + # Handle possessives on years, e.g. 1980's -> nineteen eighties + s = _POSSESSIVE_S_RE.sub(lambda m: f"{_verbalize_number(m.group(1), to_year=True)}s", s) + + # Time, timezones, and ranges + def repl_time_12h(m): + hour, minute, ampm = int(m.group(1)), int(m.group(2)), m.group(3) + ampm_spoken = " ".join(list(ampm.replace('.','').upper())) + if hour == 12 and minute == 0 and 'PM' in ampm_spoken: return "noon" + if hour == 12 and minute == 0 and 'AM' in ampm_spoken: return "midnight" + h_word = _verbalize_number(str(hour)); m_word = "o'clock" if minute == 0 else f"oh {_verbalize_number(str(minute))}" if minute < 10 else _verbalize_number(str(minute)) + return f"{h_word} {m_word} {ampm_spoken}" + def repl_time_24h(m): + hour, minute = int(m.group(1)), int(m.group(2)) + if hour == 0 and minute == 0: return "midnight" + if hour == 12 and minute == 0: return "noon" + h_word = _verbalize_number(str(hour)); m_word = f"oh {_verbalize_number(str(minute))}" if minute < 10 else _verbalize_number(str(minute)) + if minute == 0: return f"{h_word} hundred hours" + return f"{h_word} {m_word}" + s = _TIME_12H_RE.sub(repl_time_12h, s) + s = _TIME_24H_RE.sub(repl_time_24h, s) + s = _TIMEZONE_RE.sub(lambda m: _TIMEZONES[m.group(1).upper()], s) + s = _YEAR_RANGE_RE.sub(lambda m: f"{_verbalize_number(m.group(1), to_year=True)} to {_verbalize_number(m.group(2), to_year=True)}", s) + s = _NUMBER_RANGE_RE.sub(lambda m: f"{_verbalize_number(m.group(1))} to {_verbalize_number(m.group(2))}", s) + + # Other complex patterns + s = _SCIENTIFIC_NOTATION_RE.sub(lambda m: f"{_verbalize_number(m.group(1))} times ten to the power of {_verbalize_number(m.group(2))}", s) + s = _US_PHONE_RE.sub(lambda m: " ".join([_verbalize_number(c) for c in f"{m.group(1)}{m.group(2)}{m.group(3)}"]), s) + + # Standalone numbers + s = _FORMATTED_NUMBER_RE.sub(lambda m: _verbalize_number(m.group(0)), s) + s = _SIMPLE_NUMBER_RE.sub(lambda m: _verbalize_number(m.group(0)), s) # Catch numbers missed by other rules + + # Roman numerals (common cases) + _ROMAN_MAP = {"I": "one", "II": "two", "III": "three", "IV": "four", "V": "five", "VI": "six", "VII": "seven", "VIII": "eight", "IX": "nine", "X": "ten"} + s = _ROMAN_NUMERAL_RE.sub(lambda m: _ROMAN_MAP.get(m.group(1), m.group(1)), s) + + # Ampersand and other symbols + s = _AMPERSAND_RE.sub(" and ", s) + s = _WORD_PLUS_RE.sub(lambda m: f"{m.group(1)} plus", s) + s = _VS_RE.sub(" versus ", s) + s = _APPROX_RE.sub("about ", s) + s = _RELATIONAL_RE.sub(lambda m: f" { 'greater than' if m.group(1) == '>' else 'less than' } ", s) + + # Section/Paragraph signs + s = re.sub(r"§\s*", "section ", s); s = re.sub(r"¶\s*", "paragraph ", s) + + # µ/μ + units, Ω/kΩ/MΩ + s = _MICRO_UNITS_RE.sub(lambda m: f"{_verbalize_number(m.group('num'))} micro{m.group('u')}", s) + s = _KILOOHM_RE.sub(lambda m: f"{_verbalize_number(m.group('num'))} kiloohms", s) + s = _MEGAOHM_RE.sub(lambda m: f"{_verbalize_number(m.group('num'))} megaohms", s) + s = _OHM_RE.sub(lambda m: f"{_verbalize_number(m.group('num'))} ohms", s) + + # unit per slash (URLs are masked) + s = _PER_SLASH_RE.sub(lambda m: f"{m.group('a')} per {m.group('b')}", s) + + # hashtags & mentions + def repl_hashtag(m): + tag = m.group('tag') + spoken_tag = re.sub(r'([A-Z])', r' \1', tag).strip() + return f"hashtag {spoken_tag}" + s = _HASHTAG_NUM_RE.sub(lambda m: f"number {_verbalize_number(m.group('num'))}", s) + s = _HASHTAG_TAG_RE.sub(repl_hashtag, s) + s = _MENTION_RE.sub(lambda m: f"at {m.group('user')}", s) + + # TM / R / © + if speak_marks: + s = s.replace("™", " trademark ").replace("®", " registered ").replace("©", " copyright ") else: - # Default to sentence splitting - return _split_by_sentences(text, chunk_size) + s = s.replace("™", "").replace("®", "").replace("©", "") + + # Handle parentheses for prosody + s = _PARENS_ACRONYM_RE.sub(lambda m: ", " + " ".join(list(m.group(1))) + ",", s) + s = s.replace("(", ", ").replace(")", ", ") + + # Final cleanup + s = _COMMA_CLEANUP_RE.sub(", ", s) # Clean up duplicate commas from parenthesis replacement + s = re.sub(r"^,\s*", "", s) # Remove leading comma + s = re.sub(r"\s*,$", "", s) # Remove trailing comma + s = _WHITESPACE_RE.sub(" ", s).strip() + s = _unmask(s, maskmap, read_urls=read_urls) + return s + + +# ============================================================================= +# ADVANCED SPLITTING +# ============================================================================= + +ABBREVIATIONS: Set[str] = {"mr.", "mrs.", "ms.", "dr.", "prof.", "rev.", "hon.", "st.", "etc.", "e.g.", "i.e.", "vs.", "approx.", "apt.", "dept.", "fig.", "gen.", "gov.", "inc.", "jr.", "sr.", "ltd.", "no.", "p.", "pp.", "vol.", "op.", "cit.", "ca.", "cf.", "ed.", "esp.", "et.", "al.", "ibid.", "id.", "inf.", "sup.", "viz.", "sc.", "fl.", "d.", "b.", "r.", "c.", "v.", "u.s.", "u.k.", "a.m.", "p.m.", "a.d.", "b.c."} +TITLES_NO_PERIOD: Set[str] = {"mr", "mrs", "ms", "dr", "prof", "rev", "hon", "st", "sgt", "capt", "lt", "col", "gen"} +NUMBER_DOT_NUMBER_PATTERN = re.compile(r"(? bool: + """Determines if a period marks a true sentence end.""" + if (period_index > 0 and text[period_index - 1] == ".") or \ + (period_index + 1 < len(text) and text[period_index + 1] == "."): + return False + word_start = period_index - 1 + scan_limit = max(0, period_index - 20) + while word_start >= scan_limit and not text[word_start].isspace(): + word_start -= 1 + word_with_dot = text[word_start + 1: period_index + 1].lower() + if word_with_dot in ABBREVIATIONS: + return False + context_start = max(0, period_index - 20); context_end = min(len(text), period_index + 20) + context = text[context_start:context_end] + rel_idx = period_index - context_start + for pattern in (NUMBER_DOT_NUMBER_PATTERN, VERSION_PATTERN): + for m in pattern.finditer(context): + if m.start() <= rel_idx < m.end(): + is_last_char = (rel_idx == m.end() - 1) + is_followed_by_space_or_eos = (period_index + 1 == len(text) or text[period_index + 1].isspace()) + if not (is_last_char and is_followed_by_space_or_eos): + return False + return True + +def _split_text_by_punctuation(text: str) -> List[str]: + """Splits text based on punctuation, respecting abbreviations and numbers.""" + sentences: List[str] = []; last_split = 0 + i = 0 + while i < len(text): + if text[i] == UNICODE_ELLIPSIS: + j = i + 1 + if j >= len(text) or text[j].isspace(): + seg = text[last_split:j].strip() + if seg: sentences.append(seg) + last_split = j + i += 1 + continue + i += 1 + for m in POTENTIAL_END_PATTERN.finditer(text): + punc_idx = m.start(1); punc = text[punc_idx] + cut_after = m.start(1) + 1 + (len(m.group(2)) if m.group(2) else 0) + if punc in ("!", "?"): + seg = text[last_split:cut_after].strip() + if seg: sentences.append(seg) + last_split = m.end() + continue + if punc == ".": + if _is_valid_sentence_end(text, punc_idx): + seg = text[last_split:cut_after].strip() + if seg: sentences.append(seg) + last_split = m.end() + if punc == ":": + next_char_index = m.end() + if next_char_index < len(text) and text[next_char_index].isupper(): + seg = text[last_split:cut_after].strip() + if seg: sentences.append(seg) + last_split = m.end() + remainder = text[last_split:].strip() + if remainder: sentences.append(remainder) + return [s for s in sentences if s] + +def _advanced_split_into_sentences(text: str) -> List[str]: + """Splits text into sentences, handling bullet points and normalizing line breaks.""" + if not text or text.isspace(): return [] + t = text.replace("\r\n", "\n").replace("\r", "\n") + bullet_matches = list(BULLET_POINT_PATTERN.finditer(t)) + collected: List[str] = [] + def _append_sentences_from(segment: str): + for s in _split_text_by_punctuation(segment.strip()): + if s: collected.append(s) + if bullet_matches: + cur = 0 + for i, bm in enumerate(bullet_matches): + start = bm.start() + if i == 0 and start > cur: + pre = t[cur:start].strip() + if pre: _append_sentences_from(pre) + next_start = bullet_matches[i + 1].start() if i + 1 < len(bullet_matches) else len(t) + bullet_seg = t[start:next_start].strip() + if bullet_seg: collected.append(bullet_seg) + cur = next_start + if cur < len(t): + post = t[cur:].strip() + if post: _append_sentences_from(post) + return collected + return _split_text_by_punctuation(t) + +def _preprocess_and_segment_text_simple(full_text: str) -> List[str]: + """Segments text into sentences.""" + if not full_text or full_text.isspace(): return [] + return _advanced_split_into_sentences(full_text) + +# ============================================================================= +# PUBLIC API +# ============================================================================= +def split_text_into_chunks(text: str, max_length: int = None) -> list: + if max_length is None: max_length = Config.MAX_CHUNK_LENGTH + text = normalize_for_tts(text) + if len(text) <= max_length: return [text] + chunks: List[str] = []; current = "" + sentences = _preprocess_and_segment_text_simple(text) + for sentence in sentences: + s = sentence.strip() + if not s: continue + if len(current) + (1 if current else 0) + len(s) <= max_length: + current = (current + " " + s) if current else s + else: + if current: chunks.append(current.strip()) + if len(s) > max_length: + chunks.extend(_split_long_sentence(s, max_length)) + current = "" + else: + current = s + if current: chunks.append(current.strip()) + return [c for c in chunks if c.strip()] + +def split_text_for_streaming(text: str, chunk_size: Optional[int] = None, strategy: Optional[str] = None, quality: Optional[str] = None) -> List[str]: + text = normalize_for_tts(text) + settings = get_streaming_settings(chunk_size, strategy, quality) + chunk_size = settings["chunk_size"]; strategy = settings["strategy"] + if strategy == "paragraph": return _split_by_paragraphs(text, chunk_size) + elif strategy == "sentence": return _split_by_sentences(text, chunk_size) + elif strategy == "word": return _split_by_words(text, chunk_size) + elif strategy == "fixed": return _split_by_fixed_size(text, chunk_size) + else: return _split_by_sentences(text, chunk_size) def _split_by_paragraphs(text: str, max_length: int) -> List[str]: - """Split text by paragraph breaks, respecting max length""" - # Split by double newlines (paragraph breaks) paragraphs = re.split(r'\n\s*\n', text.strip()) - chunks = [] - current_chunk = "" - - for paragraph in paragraphs: - paragraph = paragraph.strip() - if not paragraph: - continue - - # If paragraph fits with current chunk - if len(current_chunk) + len(paragraph) + 2 <= max_length: # +2 for paragraph break - if current_chunk: - current_chunk += "\n\n" + paragraph - else: - current_chunk = paragraph + chunks: List[str] = []; current = "" + for p in [p.strip() for p in paragraphs if p.strip()]: + if len(current) + (2 if current else 0) + len(p) <= max_length: + current = (current + "\n\n" + p) if current else p else: - # Save current chunk if it exists - if current_chunk: - chunks.append(current_chunk.strip()) - - # If paragraph is too long, split it by sentences - if len(paragraph) > max_length: - sentence_chunks = _split_by_sentences(paragraph, max_length) - chunks.extend(sentence_chunks) - current_chunk = "" - else: - current_chunk = paragraph - - if current_chunk: - chunks.append(current_chunk.strip()) - - return [chunk for chunk in chunks if chunk.strip()] - - -def _split_by_sentences(text: str, max_length: int) -> List[str]: - """Split text by sentence boundaries, respecting max length""" - # Enhanced sentence splitting regex - sentence_pattern = r'(?<=[.!?])\s+' - sentences = re.split(sentence_pattern, text.strip()) - - chunks = [] - current_chunk = "" - - for sentence in sentences: - sentence = sentence.strip() - if not sentence: - continue - - # If sentence fits with current chunk - if len(current_chunk) + len(sentence) + 1 <= max_length: # +1 for space - if current_chunk: - current_chunk += " " + sentence + if current: chunks.append(current.strip()) + if len(p) > max_length: + chunks.extend(_split_by_sentences(p, max_length)) + current = "" else: - current_chunk = sentence + current = p + if current: chunks.append(current.strip()) + return [c for c in chunks if c.strip()] + +def _pack_sentences_to_chunks(sentences: List[str], max_length: int) -> List[str]: + chunks: List[str] = []; cur_parts: List[str] = []; cur_len = 0 + for s in [s.strip() for s in sentences if s.strip()]: + if not cur_parts: + cur_parts = [s]; cur_len = len(s) + elif cur_len + 1 + len(s) <= max_length: + cur_parts.append(s); cur_len += 1 + len(s) else: - # Save current chunk if it exists - if current_chunk: - chunks.append(current_chunk.strip()) - - # If sentence is too long, split it further - if len(sentence) > max_length: - sub_chunks = _split_long_sentence(sentence, max_length) - chunks.extend(sub_chunks) - current_chunk = "" - else: - current_chunk = sentence - - if current_chunk: - chunks.append(current_chunk.strip()) - - return [chunk for chunk in chunks if chunk.strip()] + chunks.append(" ".join(cur_parts)) + cur_parts = [s]; cur_len = len(s) + if cur_len > max_length and len(cur_parts) == 1: + chunks.extend(_split_long_sentence(cur_parts[0], max_length)) + cur_parts = []; cur_len = 0 + if cur_parts: chunks.append(" ".join(cur_parts)) + return [c for c in chunks if c.strip()] +def _split_by_sentences(text: str, max_length: int) -> List[str]: + return _pack_sentences_to_chunks(_preprocess_and_segment_text_simple(text), max_length) def _split_by_words(text: str, max_length: int) -> List[str]: - """Split text by word boundaries, respecting max length""" - words = text.split() - chunks = [] - current_chunk = "" - + words = text.split(); chunks: List[str] = []; current = "" for word in words: - # If word fits with current chunk - if len(current_chunk) + len(word) + 1 <= max_length: # +1 for space - if current_chunk: - current_chunk += " " + word - else: - current_chunk = word + if len(current) + (1 if current else 0) + len(word) <= max_length: + current = (current + " " + word) if current else word else: - # Save current chunk if it exists - if current_chunk: - chunks.append(current_chunk.strip()) - - # If single word is too long, force it into its own chunk + if current: chunks.append(current.strip()) if len(word) > max_length: - # Split very long words at character boundaries for i in range(0, len(word), max_length): - chunks.append(word[i:i + max_length]) - current_chunk = "" + if piece := word[i:i + max_length]: chunks.append(piece) + current = "" else: - current_chunk = word - - if current_chunk: - chunks.append(current_chunk.strip()) - - return [chunk for chunk in chunks if chunk.strip()] - + current = word + if current: chunks.append(current.strip()) + return [c for c in chunks if c.strip()] def _split_by_fixed_size(text: str, chunk_size: int) -> List[str]: - """Split text into fixed-size chunks""" - chunks = [] - for i in range(0, len(text), chunk_size): - chunk = text[i:i + chunk_size].strip() - if chunk: - chunks.append(chunk) - - return chunks - + return [chunk for i in range(0, len(text), chunk_size) if (chunk := text[i:i + chunk_size].strip())] def _split_long_sentence(sentence: str, max_length: int) -> List[str]: - """Split a long sentence at natural break points""" - # Try to split at commas, semicolons, etc. - delimiters = [', ', '; ', ' - ', ' — ', ': ', ' and ', ' or ', ' but '] - - chunks = [sentence] - - for delimiter in delimiters: - new_chunks = [] - for chunk in chunks: - if len(chunk) <= max_length: - new_chunks.append(chunk) + delimiters = [', ', '; ', ' - ', ' — ', ': ', ' and ', ' or ', ' but ']; chunks = [sentence] + for delim in delimiters: + new_chunks: List[str] = [] + for ch in chunks: + if len(ch) <= max_length: new_chunks.append(ch) else: - parts = chunk.split(delimiter) - current_part = "" - for part in parts: - if len(current_part) + len(delimiter) + len(part) <= max_length: - current_part += (delimiter if current_part else "") + part + parts = ch.split(delim); cur = "" + for i, part in enumerate(parts): + prospective = (delim if i > 0 else "") + part + if len(cur) + len(prospective) <= max_length: cur += prospective else: - if current_part: - new_chunks.append(current_part) - current_part = part - if current_part: - new_chunks.append(current_part) + if cur: new_chunks.append(cur) + cur = part + if cur: new_chunks.append(cur) chunks = new_chunks - - # Final fallback: split by words - final_chunks = [] - for chunk in chunks: - if len(chunk) <= max_length: - final_chunks.append(chunk) - else: - word_chunks = _split_by_words(chunk, max_length) - final_chunks.extend(word_chunks) - - return [chunk.strip() for chunk in final_chunks if chunk.strip()] - - -def get_streaming_settings( - streaming_chunk_size: Optional[int], - streaming_strategy: Optional[str], - streaming_quality: Optional[str] -) -> dict: - """ - Get optimized streaming settings based on parameters. - - Returns a dictionary with optimized settings for streaming. - """ - settings = { - "chunk_size": streaming_chunk_size or 200, - "strategy": streaming_strategy or "sentence", - "quality": streaming_quality or "balanced" - } - - # Apply quality presets if not explicitly overridden + final_chunks: List[str] = [] + for ch in chunks: + if len(ch) <= max_length: final_chunks.append(ch) + else: final_chunks.extend(_split_by_words(ch, max_length)) + return [c.strip() for c in final_chunks if c.strip()] + +def get_streaming_settings(streaming_chunk_size: Optional[int], streaming_strategy: Optional[str], streaming_quality: Optional[str]) -> dict: + settings = {"chunk_size": streaming_chunk_size or 200, "strategy": streaming_strategy or "sentence", "quality": streaming_quality or "balanced"} if streaming_quality and not streaming_chunk_size: - if streaming_quality == "fast": - settings["chunk_size"] = 100 - elif streaming_quality == "high": - settings["chunk_size"] = 300 - + if streaming_quality == "fast": settings["chunk_size"] = 100 + elif streaming_quality == "high": settings["chunk_size"] = 300 if streaming_quality and not streaming_strategy: - if streaming_quality == "fast": - settings["strategy"] = "word" - elif streaming_quality == "high": - settings["strategy"] = "paragraph" - + if streaming_quality == "fast": settings["strategy"] = "word" + elif streaming_quality == "high": settings["strategy"] = "paragraph" return settings - def concatenate_audio_chunks(audio_chunks: list, sample_rate: int) -> torch.Tensor: - """Concatenate multiple audio tensors with proper memory management""" - if len(audio_chunks) == 1: - return audio_chunks[0] - - # Add small silence between chunks (0.1 seconds) + if not audio_chunks: return torch.tensor([]) + if len(audio_chunks) == 1: return audio_chunks[0] silence_samples = int(0.1 * sample_rate) - - # Create silence tensor on the same device as audio chunks device = audio_chunks[0].device if hasattr(audio_chunks[0], 'device') else 'cpu' silence = torch.zeros(1, silence_samples, device=device) - - # Use torch.no_grad() to prevent gradient tracking with torch.no_grad(): concatenated = audio_chunks[0] - for i, chunk in enumerate(audio_chunks[1:], 1): - # Concatenate current result with silence and next chunk concatenated = torch.cat([concatenated, silence, chunk], dim=1) - - # Optional: cleanup intermediate tensors for very long sequences - if i % 10 == 0: # Every 10 chunks + if i % 10 == 0: gc.collect() - - # Clean up silence tensor - del silence - + if torch.cuda.is_available(): torch.cuda.empty_cache() return concatenated - -def split_text_for_long_generation(text: str, - max_chunk_size: Optional[int] = None, - overlap_chars: int = 0) -> List[LongTextChunk]: - """ - Split long text into chunks optimized for TTS generation with intelligent boundaries. - - This function implements a hierarchical splitting strategy: - 1. First attempt: Split at paragraph boundaries (double newlines) - 2. Second attempt: Split at sentence boundaries (. ! ?) - 3. Third attempt: Split at clause boundaries (, ; : - —) - 4. Last resort: Split at word boundaries - - Args: - text: Input text to split (should be > 3000 characters) - max_chunk_size: Maximum characters per chunk (defaults to Config.LONG_TEXT_CHUNK_SIZE) - overlap_chars: Number of characters to overlap between chunks for context - - Returns: - List of LongTextChunk objects with metadata - """ - if max_chunk_size is None: - max_chunk_size = Config.LONG_TEXT_CHUNK_SIZE - - # Ensure we don't exceed the regular TTS limit - effective_max = min(max_chunk_size, Config.MAX_TOTAL_LENGTH - 100) # Leave some buffer - - chunks = [] - chunk_index = 0 - remaining_text = text.strip() - - while remaining_text: - if len(remaining_text) <= effective_max: - # Last chunk - chunk_text = remaining_text - remaining_text = "" +def split_text_for_long_generation(text: str, max_chunk_size: Optional[int] = None, overlap_chars: int = 0) -> List[LongTextChunk]: + if max_chunk_size is None: max_chunk_size = Config.LONG_TEXT_CHUNK_SIZE + text = normalize_for_tts(text) + effective_max = min(max_chunk_size, Config.MAX_TOTAL_LENGTH - 100) + sane_overlap = min(overlap_chars, effective_max // 2) + chunks: List[LongTextChunk] = []; idx = 0; remaining = text.strip() + while remaining: + if len(remaining) <= effective_max: + chunk_text = remaining; remaining = "" else: - # Find the best split point - chunk_text, remaining_text = _find_best_split_point( - remaining_text, effective_max, overlap_chars - ) - - # Create chunk metadata - chunk = LongTextChunk( - index=chunk_index, - text=chunk_text, - text_preview=chunk_text[:50] + ("..." if len(chunk_text) > 50 else ""), - character_count=len(chunk_text) - ) - - chunks.append(chunk) - chunk_index += 1 - + chunk_text, remaining = _find_best_split_point(remaining, effective_max, sane_overlap) + chunk = LongTextChunk(index=idx, text=chunk_text, text_preview=chunk_text[:50] + ("..." if len(chunk_text) > 50 else ""), character_count=len(chunk_text)) + chunks.append(chunk); idx += 1 return chunks - def _find_best_split_point(text: str, max_length: int, overlap_chars: int = 0) -> Tuple[str, str]: - """ - Find the best point to split text while preserving semantic boundaries. - - Returns: - Tuple of (chunk_text, remaining_text) - """ - if len(text) <= max_length: - return text, "" - - # Strategy 1: Split at paragraph boundaries - split_result = _try_split_at_paragraphs(text, max_length, overlap_chars) - if split_result: - return split_result - - # Strategy 2: Split at sentence boundaries - split_result = _try_split_at_sentences(text, max_length, overlap_chars) - if split_result: - return split_result - - # Strategy 3: Split at clause boundaries - split_result = _try_split_at_clauses(text, max_length, overlap_chars) - if split_result: - return split_result - - # Strategy 4: Split at word boundaries (last resort) + if len(text) <= max_length: return text, "" + if r := _try_split_at_paragraphs(text, max_length, overlap_chars): return r + if r := _try_split_at_sentences(text, max_length, overlap_chars): return r + if r := _try_split_at_clauses(text, max_length, overlap_chars): return r return _split_at_words(text, max_length, overlap_chars) - def _try_split_at_paragraphs(text: str, max_length: int, overlap_chars: int) -> Optional[Tuple[str, str]]: - """Try to split at paragraph boundaries (double newlines)""" - # Find all paragraph breaks - paragraph_pattern = r'\n\s*\n' - matches = list(re.finditer(paragraph_pattern, text)) - - if not matches: - return None - - # Find the best paragraph break within our limit - best_split = None - for match in matches: - split_pos = match.end() - if split_pos <= max_length: - best_split = split_pos - else: - break - - if best_split and best_split > max_length * 0.5: # Don't take chunks that are too small - chunk_text = text[:best_split].strip() - remaining_text = text[max(0, best_split - overlap_chars):].strip() - return chunk_text, remaining_text - + matches = list(re.finditer(r'\n\s*\n', text)); best = None + if not matches: return None + for m in matches: + if (split_pos := m.end()) <= max_length: best = split_pos + else: break + if best and best > max_length * 0.5: + chunk_text = text[:best].strip() + remaining = text[max(0, best - overlap_chars):].strip() + return chunk_text, remaining return None - def _try_split_at_sentences(text: str, max_length: int, overlap_chars: int) -> Optional[Tuple[str, str]]: - """Try to split at sentence boundaries""" - # Enhanced sentence boundary detection - sentence_endings = ['. ', '! ', '? ', '.\n', '!\n', '?\n', '."', '!"', '?"', ".'", "!'", "?'"] - - best_split = None - for ending in sentence_endings: - pos = 0 - while pos < len(text): - found = text.find(ending, pos) - if found == -1: - break - - split_pos = found + len(ending) - if split_pos <= max_length: - best_split = split_pos - pos = found + 1 - else: - break - - if best_split and best_split > max_length * 0.4: # Don't take chunks that are too small - chunk_text = text[:best_split].strip() - remaining_text = text[max(0, best_split - overlap_chars):].strip() - return chunk_text, remaining_text - + sentences = _preprocess_and_segment_text_simple(text) + if not sentences: return None + cum = 0; last_ok_idx = -1 + for i, s in enumerate(sentences): + add = (1 if cum > 0 else 0) + len(s) + if cum + add <= max_length: cum += add; last_ok_idx = i + else: break + if last_ok_idx >= 0 and cum > max_length * 0.4: + chunk_text = " ".join(sentences[:last_ok_idx + 1]).strip() + original_start_pos = text.find(sentences[last_ok_idx + 1]) if last_ok_idx + 1 < len(sentences) else len(chunk_text) + remaining = text[max(0, original_start_pos - overlap_chars):].strip() + return chunk_text, remaining return None - def _try_split_at_clauses(text: str, max_length: int, overlap_chars: int) -> Optional[Tuple[str, str]]: - """Try to split at clause boundaries (commas, semicolons, etc.)""" - clause_delimiters = [', ', '; ', ': ', ' - ', ' — ', ' and ', ' or ', ' but ', ' while ', ' when '] - - best_split = None - for delimiter in clause_delimiters: - pos = 0 - while pos < len(text): - found = text.find(delimiter, pos) - if found == -1: - break - - split_pos = found + len(delimiter) - if split_pos <= max_length: - best_split = split_pos - pos = found + 1 - else: - break - - if best_split and best_split > max_length * 0.3: # Don't take chunks that are too small + clause_delims = [', ', '; ', ': ', ' - ', ' — ', ' and ', ' or ', ' but ', ' while ', ' when '] + best_split = 0 + for d in clause_delims: + if (pos := text.rfind(d, 0, max_length)) != -1: + best_split = max(best_split, pos + len(d)) + if best_split and best_split > max_length * 0.3: chunk_text = text[:best_split].strip() - remaining_text = text[max(0, best_split - overlap_chars):].strip() - return chunk_text, remaining_text - + remaining = text[max(0, best_split - overlap_chars):].strip() + return chunk_text, remaining return None - def _split_at_words(text: str, max_length: int, overlap_chars: int) -> Tuple[str, str]: - """Split at word boundaries as last resort""" - if len(text) <= max_length: - return text, "" - - # Find the last space before our limit + if len(text) <= max_length: return text, "" split_pos = text.rfind(' ', 0, max_length) - - if split_pos == -1: # No space found, force split - split_pos = max_length - + if split_pos == -1: split_pos = max_length chunk_text = text[:split_pos].strip() - remaining_text = text[max(0, split_pos - overlap_chars):].strip() - - return chunk_text, remaining_text - + remaining = text[max(0, split_pos - overlap_chars):].strip() + return chunk_text, remaining def estimate_processing_time(text_length: int, avg_chars_per_second: float = 25.0) -> int: - """ - Estimate processing time for long text TTS generation. - - Args: - text_length: Total characters in text - avg_chars_per_second: Average processing rate (characters per second) - - Returns: - Estimated processing time in seconds - """ - # Base estimate + overhead for chunking and concatenation base_time = text_length / avg_chars_per_second - - # Add overhead: 5 seconds for setup + 2 seconds per chunk + 10 seconds for concatenation num_chunks = max(1, (text_length + Config.LONG_TEXT_CHUNK_SIZE - 1) // Config.LONG_TEXT_CHUNK_SIZE) overhead = 5 + (num_chunks * 2) + 10 - return int(base_time + overhead) - def validate_long_text_input(text: str) -> Tuple[bool, str]: - """ - Validate text for long text TTS generation. - - Returns: - Tuple of (is_valid, error_message) - """ - if not text or not text.strip(): - return False, "Input text cannot be empty" - + if not text or not text.strip(): return False, "Input text cannot be empty" text_length = len(text.strip()) - - if text_length <= Config.MAX_TOTAL_LENGTH: - return False, f"Text is {text_length} characters. Use regular TTS for texts under {Config.MAX_TOTAL_LENGTH} characters" - - if text_length > Config.LONG_TEXT_MAX_LENGTH: - return False, f"Text is too long ({text_length} characters). Maximum allowed: {Config.LONG_TEXT_MAX_LENGTH}" - - # Check for excessive repetition (potential spam/abuse) + if text_length <= Config.MAX_TOTAL_LENGTH: return False, f"Text is {text_length} characters. Use regular TTS for texts under {Config.MAX_TOTAL_LENGTH} characters" + if text_length > Config.LONG_TEXT_MAX_LENGTH: return False, f"Text is too long ({text_length} characters). Maximum allowed: {Config.LONG_TEXT_MAX_LENGTH}" words = text.split() - if len(set(words)) < len(words) * 0.1: # Less than 10% unique words - return False, "Text appears to be excessively repetitive" - - return True, "" \ No newline at end of file + if len(words) > 50 and len(set(words)) < len(words) * 0.1: return False, "Text appears to be excessively repetitive" + return True, "" diff --git a/requirements.txt b/requirements.txt index 766e527..0ab1682 100755 --- a/requirements.txt +++ b/requirements.txt @@ -30,4 +30,7 @@ psutil>=5.9.0 pydub>=0.25.1 # Testing Dependencies -requests>=2.28.0 \ No newline at end of file +requests>=2.28.0 + +# num2words library - Convert numbers to words in multiple languages +num2words