Fix waveform tensor leak in audio pipeline and redesign AudioDataFilterStage for all VAD/Speaker combinations#1765
Conversation
…D/Speaker combos Fixes the OverflowError crash in JsonlWriter caused by torch.Tensor waveform leaking through the pipeline into serialization. Root cause: When VAD and SpeakerSeparation were disabled, no cleanup stage ran before AudioToDocumentStage, so MonoConversionStage's waveform tensor leaked into the DataFrame and crashed ujson. Changes: - Redesign AudioDataFilterStage.decompose() as a topology selector with 4 distinct pipeline builders (one per VAD/Speaker combo), each producing a self-consistent stage sequence ending with TimestampMapper. - Separate VAD (_make_vad) from quality filters (_append_quality_filters) for clarity: VAD is a segmentation stage, not a filter. - Fix TimestampMapper to handle all 4 combos: 1. segment_mappings (full pipeline with SegmentConcat) 2. start_ms/end_ms (VAD fan-out) 3. diar_segments (SpeakerSep with speaker timing) 4. duration fallback (filters only, whole file) - Replace confusing _STRIP_KEYS blacklist with clean two-layer output control: _NEVER_PASS_KEYS safety net (blocks tensors always) + passthrough_keys whitelist (user-configurable, sensible default). - Fix SpeakerSeparation to propagate diar_segments from diarization model so Combo 3 output has meaningful speaker timing data. - Standardize duration key to "duration" across all stages. - Add BandFilter warning on unexpected prediction values. - Update extract_segments.py to handle all 4 combos with auto-detection, metadata.csv generation, and generic score extraction. - Add --execution-mode flag to pipeline.py for batch/streaming control. - Update README.md and pipeline.yaml with topology documentation.
Greptile SummaryThis PR fixes the waveform tensor leak by adding a Confidence Score: 4/5Safe to merge with awareness of the two open prior-thread data-integrity items (segment_index in CSV, Combo 1 detection in tutorial); core pipeline fix is solid. The primary bug (tensor leak → ujson OverflowError) is definitively fixed. The four-topology redesign is coherent and the test suite covers all combos. Three remaining findings are P2: incomplete outputs() declaration, BandFilter warning noise, and O(n²) CSV rewrites. The still-open prior-thread items (segment_index=0 in metadata CSV, Combo 1 extraction in tutorial) are data-integrity concerns in the extraction utility, not in the pipeline itself. timestamp_mapper.py (outputs() declaration), extract_segments.py (segment_index always 0 in metadata CSV for Combos 2/4), tutorials/audio/readspeech/extract_segments.py (Combo 1 unreachable detect_combo branch) Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
Input([AudioTask])
Mono[MonoConversionStage]
Input --> Mono
Mono --> C1{VAD + Speaker?}
subgraph Combo1["Combo 1: no VAD, no Speaker"]
QF1[Quality Filters] --> TM1[TimestampMapper]
end
subgraph Combo2["Combo 2: VAD only"]
VAD2[VADSegmentation fan-out] --> QF2[Quality Filters] --> TM2[TimestampMapper]
end
subgraph Combo3["Combo 3: Speaker only"]
QF3a[Quality Filters] --> SS3[SpeakerSeparation fan-out] --> QF3b[Quality Filters] --> TM3[TimestampMapper]
end
subgraph Combo4["Combo 4: VAD + Speaker"]
VAD4a[VADSegmentation nested] --> QF4a[Quality Filters] --> SC4[SegmentConcat] --> SS4[SpeakerSeparation] --> VAD4b[VADSegmentation fan-out] --> QF4b[Quality Filters] --> TM4[TimestampMapper]
end
C1 -->|off, no spk| Combo1
C1 -->|on, no spk| Combo2
C1 -->|off, spk on| Combo3
C1 -->|on, spk on| Combo4
TM1 & TM2 & TM3 & TM4 --> Sanitize[AudioToDocumentStage _sanitize removes tensors]
Sanitize --> JSONL([JSONL Output])
Reviews (25): Last reviewed commit: "Merge branch 'main' into ADV_bug/6057972" | Re-trigger Greptile |
| def detect_combo(entries: list) -> int: | ||
| """Detect which pipeline combo produced the manifest. | ||
|
|
||
| Returns 1, 2, 3, or 4. | ||
| """ | ||
| if not entries: | ||
| return 1 | ||
|
|
||
| def _write_segment( | ||
| output_path: str, segment_audio: np.ndarray, sample_rate: int, output_format: str | ||
| ) -> None: | ||
| first = entries[0] | ||
| has_speaker = "speaker_id" in first | ||
| has_diar = "diar_segments" in first | ||
| has_timestamps = "original_start_ms" in first and "original_end_ms" in first | ||
|
|
||
| if has_speaker and has_diar: | ||
| return 3 | ||
| if has_speaker and has_timestamps: | ||
| return 4 | ||
| if has_timestamps and not has_speaker: | ||
| return 2 | ||
| return 1 |
There was a problem hiding this comment.
detect_combo() Combo 1 branch is unreachable
TimestampMapperStage always emits original_start_ms and original_end_ms — even for Combo 1 (duration-fallback path in _build_output_item_no_mapping, lines 230–239 of timestamp_mapper.py). This means has_timestamps is always True for any manifest produced by the new pipeline, so the third guard (if has_timestamps and not has_speaker) fires for Combo 1 manifests, misidentifying them as Combo 2 and making return 1 unreachable.
Consequence: Combo 1 runs are extracted via extract_combo2 instead of extract_combo1, producing {name}_segment_000.wav files (re-encoded through soundfile) rather than direct copies of the original file.
One fix is to emit a small metadata key (e.g. "pipeline_combo") from TimestampMapperStage so detect_combo can read it unambiguously. Alternatively, add a heuristic — e.g. Combo 1 outputs always have original_start_ms == 0 and a single entry per original_file, so detect by counting entries per file:
def detect_combo(entries: list) -> int:
if not entries:
return 1
first = entries[0]
has_speaker = "speaker_id" in first
has_diar = "diar_segments" in first
has_timestamps = "original_start_ms" in first and "original_end_ms" in first
if has_speaker and has_diar:
return 3
if has_speaker and has_timestamps:
return 4
if has_timestamps and not has_speaker:
# Distinguish Combo 2 (multiple segments per file) from Combo 1 (whole file)
files_seen = {e.get("original_file") for e in entries}
entries_per_file = len(entries) / max(len(files_seen), 1)
if entries_per_file > 1.0:
return 2
# Single entry per file with start=0 is Combo 1
if first.get("original_start_ms", -1) == 0:
return 1
return 2
return 1Note: even this heuristic can be fooled (e.g. a Combo 2 run where every file produced exactly one segment). Emitting a "pipeline_combo" marker is the robust solution.
| diar_segments = item.get("diar_segments") | ||
| if diar_segments and len(diar_segments) > 0: | ||
| first_start = diar_segments[0][0] | ||
| last_end = diar_segments[-1][1] | ||
| result["original_start_ms"] = int(first_start * 1000) | ||
| result["original_end_ms"] = int(last_end * 1000) | ||
| result["duration_ms"] = int((last_end - first_start) * 1000) | ||
| result["duration"] = last_end - first_start | ||
| speaking = sum(end - start for start, end in diar_segments) | ||
| result["speaking_duration"] = round(speaking, 3) | ||
| result["diar_segments"] = [[round(s, 3), round(e, 3)] for s, e in diar_segments] | ||
| self._copy_passthrough(item, result) | ||
| return result |
There was a problem hiding this comment.
diar_segments/speaking_duration bypass passthrough_keys control
Both speaking_duration (line 223) and diar_segments (line 224) are written directly into result, so they are always present in Combo 3 output regardless of what the user sets in passthrough_keys. This is inconsistent with the documented contract — users who restrict passthrough_keys to a small set would still get these fields injected.
If these are intended to be mandatory Combo 3 output fields (which is reasonable), a brief note in the docstring clarifying that diar_segments and speaking_duration are always emitted when diar_segments is available would set correct expectations.
| first_start = diar_segments[0][0] | ||
| last_end = diar_segments[-1][1] | ||
| result["original_start_ms"] = int(first_start * 1000) | ||
| result["original_end_ms"] = int(last_end * 1000) | ||
| result["duration_ms"] = int((last_end - first_start) * 1000) | ||
| result["duration"] = last_end - first_start | ||
| speaking = sum(end - start for start, end in diar_segments) | ||
| result["speaking_duration"] = round(speaking, 3) | ||
| result["diar_segments"] = [[round(s, 3), round(e, 3)] for s, e in diar_segments] | ||
| self._copy_passthrough(item, result) |
There was a problem hiding this comment.
Unsorted diar_segments produces wrong span
This block uses diar_segments[0][0] as first_start and diar_segments[-1][1] as last_end, implicitly assuming the list is time-ordered. Neither the Sortformer post-processing stage nor the speaker-task builder guarantees chronological ordering. If a segment is out of order, original_start_ms, original_end_ms, and duration_ms in the manifest will be wrong — silently, since no error is raised.
The downstream extraction code in extract_segments.py already handles this correctly by explicitly sorting before iterating. The same sort should be applied here before computing the span to keep the manifest values consistent.
e437b78 to
77e0bab
Compare
…assthrough fields - Remove unreachable extract_combo1 (TimestampMapper always sets timestamps) - Merge combos 1 & 2 into extract_segments_by_timestamps - Rename extraction functions to be descriptive - Document diar_segments/speaking_duration as core fields in TimestampMapper docstring
|
/ok to test dca8b9c |
|
/ok to test 0f9319d |
- Replace fragile tuple return type in get_speaker_audio_data with SpeakerResult NamedTuple for self-documenting named field access. - Update caller in speaker_separation.py to use result.audio, result.duration, result.diar_segments instead of positional unpacking. - Document _INHERITED_DROP_KEYS explaining why duration/num_samples are dropped from parent tasks during speaker separation. - Add comments in pipeline.py and pipeline.yaml documenting the default passthrough_keys and how to restrict output columns. - Add speaker separation note to README explaining dropped keys.
|
/ok to test 55a389c |
| speaker_data = { | ||
| "speaker_0": (_make_audio_segment(3000), 3.0), | ||
| "speaker_1": (_make_audio_segment(4000), 4.0), | ||
| "speaker_0": (_make_audio_segment(3000), 3.0, [(0.0, 3.0)]), | ||
| "speaker_1": (_make_audio_segment(4000), 4.0, [(0.0, 4.0)]), | ||
| } |
There was a problem hiding this comment.
Test mocks use plain tuples instead of
SpeakerResult NamedTuples
_build_speaker_tasks now accesses result.audio, result.duration, and result.diar_segments as named attributes (speaker_separation.py lines 177–188). Plain tuples don't have these attributes, so every test that calls stage.process() here will raise AttributeError: 'tuple' object has no attribute 'duration'. The same pattern appears in test_process_output_keys and test_min_duration_filters_short_speakers on the same file.
| speaker_data = { | |
| "speaker_0": (_make_audio_segment(3000), 3.0), | |
| "speaker_1": (_make_audio_segment(4000), 4.0), | |
| "speaker_0": (_make_audio_segment(3000), 3.0, [(0.0, 3.0)]), | |
| "speaker_1": (_make_audio_segment(4000), 4.0, [(0.0, 4.0)]), | |
| } | |
| speaker_data = { | |
| "speaker_0": SpeakerResult(_make_audio_segment(3000), 3.0, [(0.0, 3.0)]), | |
| "speaker_1": SpeakerResult(_make_audio_segment(4000), 4.0, [(0.0, 4.0)]), | |
| } |
Add the following import at the top of the test file:
from nemo_curator.stages.audio.segmentation.speaker_separation_module.speaker_sep import SpeakerResultThe same fix is needed on lines 68–70 and 88–91 in this file.
Update test_speaker_separation.py to construct SpeakerResult instead of plain tuples, matching the named attribute access introduced in speaker_separation.py.
|
/ok to test b5a43d2 |
|
/ok to test ddbcbfd |
Move reusable segment extraction functions from the tutorial script into the nemo_curator package as a proper ProcessingStage. The tutorial becomes a thin CLI wrapper importing from the package. - Add nemo_curator/stages/audio/io/extract_segments.py with SegmentExtractionStage (ProcessingStage[AudioTask, AudioTask]) - Slim tutorials/audio/readspeech/extract_segments.py to CLI wrapper - Add tests/stages/audio/io/test_extract_segments.py with clean imports
|
/ok to test 9448042 |
|
/ok to test efeb66d |
| def _extract_by_timestamps( | ||
| self, entries: list[dict], | ||
| ) -> tuple[int, float, dict[str, int], list[dict]]: | ||
| """Combo 2: extract by original_start_ms / original_end_ms.""" | ||
| counter: dict[str, int] = defaultdict(int) | ||
|
|
||
| def _make_filename(name: str, _entry: dict, _seg_idx: int) -> str: | ||
| idx = counter[name] | ||
| counter[name] += 1 | ||
| return f"{name}_segment_{idx:03d}.{self.output_format}" | ||
|
|
||
| return self._extract_file_segments( | ||
| entries, | ||
| sort_key=lambda x: x.get("original_start_ms", 0), | ||
| get_intervals=_intervals_from_timestamps, | ||
| make_filename=_make_filename, | ||
| ) |
There was a problem hiding this comment.
Segment filename collision across batches (combos 2 & 4)
counter (combo 2) and per_speaker_count (combo 4, line 393) are local to each _extract_by_timestamps / _extract_speaker_timestamps call, so they reset to 0 on every process_batch invocation. When a single original_file produces more segments than batch_size (e.g., a long recording that generates 100 VAD segments with batch_size=64), the second batch resets the counter and silently overwrites {name}_segment_000.wav … {name}_segment_035.wav written by the first batch.
extract_from_manifest is fine because it processes all entries in one call. The bug only affects the stage-based pipeline path. Promoting the counters to instance-level state (similar to _all_metadata_rows) would fix both combos atomically.
|
/ok to test 30d91b5 |
…ns to fit single GPU Signed-off-by: shbhawsar <shbhawsar@nvidia.com>
|
/ok to test 12e3fdc |
|
/ok to test bc28496 |
| for entry in file_entries: | ||
| intervals = get_intervals(entry) | ||
| for seg_idx, (start_ms, end_ms, dur) in enumerate(intervals): | ||
| out_filename = make_filename(original_name, entry, seg_idx) | ||
| output_path = os.path.join(self.output_dir, out_filename) | ||
|
|
||
| try: | ||
| audio = _read_segment(original_file, start_ms, end_ms, info.samplerate) | ||
| sf.write(output_path, audio, info.samplerate, subtype=SOUNDFILE_FORMATS[self.output_format]) | ||
| extracted += 1 | ||
| total_dur += dur | ||
|
|
||
| speaker_id = entry.get("speaker_id") | ||
| if speaker_id: | ||
| speaker_counts[speaker_id] += 1 | ||
|
|
||
| metadata_rows.append( | ||
| _base_metadata(out_filename, original_file, entry, seg_idx, start_ms, end_ms, dur) | ||
| ) |
There was a problem hiding this comment.
segment_index is always 0 in metadata CSV for Combos 2 and 4
For Combos 2 and 4, _intervals_from_timestamps returns a single-element list, so enumerate(intervals) always yields seg_idx=0. Every row in the CSV will have segment_index=0 while the extracted filenames correctly use the globally-incrementing counter (e.g., _segment_000, _segment_001, …).
Any downstream script reading segment_index from metadata.csv to reconstruct which file corresponds to which row will get incorrect data. One fix is to return the global index from make_filename so it can be passed to _base_metadata.
|
/ok to test 10f0353 |
Summary
Fixes [NMFW][26.04][nightly][Curator] audio/readspeech issue: Waveform tensor leaks through the entire pipeline into JsonlWriter (Bug ID: 6057972)
Root cause: When VAD and/or SpeakerSeparation were disabled,
MonoConversionStageadded atorch.Tensorwaveform totask.datathat was never cleaned up before reachingAudioToDocumentStage→JsonlWriter, causingOverflowError: Maximum recursion level reachedwhen ujson tried to serialize it.Fix: Redesigned
AudioDataFilterStage.decompose()to produce four distinct, self-consistent pipeline topologies and enhancedTimestampMapperStageto run in all combinations as the output normalization boundary.Changes
AudioDataFilterStage: Replaced monolithicdecompose()with four topology builders (_build_full_pipeline,_build_vad_only_pipeline,_build_speaker_only_pipeline,_build_filters_only_pipeline). Separated VAD from quality filters (_make_vad+_append_quality_filters).TimestampMapperStage: Now runs in all 4 combos. Added whitelist-based output control (passthrough_keyswith_DEFAULT_PASSTHROUGH_KEYS) and safety net (_NEVER_PASS_KEYSblocks non-serializable fields). Handlesdiar_segmentsfrom SpeakerSep for combo 3 (speaker-only).SpeakerSeparationStage: Propagates diarization timestamps (diar_segments) from the Sortformer model to task data. Drops inheritedduration/num_samplesto prevent misleading values.speaker_sep.py:get_speaker_audio_data()now returns segment timestamps alongside audio.BandFilterStage: Added warning log for unexpected prediction values.vad_segmentation.py/mono_conversion.py: Standardized duration key to"duration"everywhere.extract_segments.py: Auto-detects pipeline combo from manifest schema, extracts audio accordingly, generatesmetadata.csv.pipeline.py: Added--execution-modeflag (batch/streaming).pipeline.yaml: Simplified config (passthrough_keys uses built-in defaults).README.md: Documented all 4 topologies, output schemas, passthrough_keys customization, and extraction workflow.Pipeline topologies
--enable-vad--enable-speaker-separation--enable-vad --enable-speaker-separationTest plan
waveform/segments/audio_datain any JSONL output