Skip to content

Fix waveform tensor leak in audio pipeline and redesign AudioDataFilterStage for all VAD/Speaker combinations#1765

Merged
sarahyurick merged 25 commits intoNVIDIA-NeMo:mainfrom
shubhamNvidia:ADV_bug/6057972
Apr 23, 2026
Merged

Fix waveform tensor leak in audio pipeline and redesign AudioDataFilterStage for all VAD/Speaker combinations#1765
sarahyurick merged 25 commits intoNVIDIA-NeMo:mainfrom
shubhamNvidia:ADV_bug/6057972

Conversation

@shubhamNvidia
Copy link
Copy Markdown
Contributor

@shubhamNvidia shubhamNvidia commented Apr 8, 2026

Summary

Fixes [NMFW][26.04][nightly][Curator] audio/readspeech issue: Waveform tensor leaks through the entire pipeline into JsonlWriter (Bug ID: 6057972)

Root cause: When VAD and/or SpeakerSeparation were disabled, MonoConversionStage added a torch.Tensor waveform to task.data that was never cleaned up before reaching AudioToDocumentStageJsonlWriter, causing OverflowError: Maximum recursion level reached when ujson tried to serialize it.

Fix: Redesigned AudioDataFilterStage.decompose() to produce four distinct, self-consistent pipeline topologies and enhanced TimestampMapperStage to run in all combinations as the output normalization boundary.

Changes

  • AudioDataFilterStage: Replaced monolithic decompose() with four topology builders (_build_full_pipeline, _build_vad_only_pipeline, _build_speaker_only_pipeline, _build_filters_only_pipeline). Separated VAD from quality filters (_make_vad + _append_quality_filters).
  • TimestampMapperStage: Now runs in all 4 combos. Added whitelist-based output control (passthrough_keys with _DEFAULT_PASSTHROUGH_KEYS) and safety net (_NEVER_PASS_KEYS blocks non-serializable fields). Handles diar_segments from SpeakerSep for combo 3 (speaker-only).
  • SpeakerSeparationStage: Propagates diarization timestamps (diar_segments) from the Sortformer model to task data. Drops inherited duration/num_samples to prevent misleading values.
  • speaker_sep.py: get_speaker_audio_data() now returns segment timestamps alongside audio.
  • BandFilterStage: Added warning log for unexpected prediction values.
  • vad_segmentation.py/mono_conversion.py: Standardized duration key to "duration" everywhere.
  • extract_segments.py: Auto-detects pipeline combo from manifest schema, extracts audio accordingly, generates metadata.csv.
  • pipeline.py: Added --execution-mode flag (batch/streaming).
  • pipeline.yaml: Simplified config (passthrough_keys uses built-in defaults).
  • README.md: Documented all 4 topologies, output schemas, passthrough_keys customization, and extraction workflow.

Pipeline topologies

Combo Flags Pipeline Output
1 (none) Mono → Filters → TimestampMapper 1 row/file
2 --enable-vad Mono → VAD(fan-out) → Filters → TimestampMapper N rows (segments)
3 --enable-speaker-separation Mono → Filters → SpeakerSep → Filters → TimestampMapper K rows (speakers + diar_segments)
4 --enable-vad --enable-speaker-separation Full pipeline (unchanged) K*M rows (speaker x segment)

Test plan

  • 60 unit tests pass (TimestampMapper, SpeakerSep, VAD, MonoConversion, AudioToDocumentStage)
  • Tensor leak verification: no waveform/segments/audio_data in any JSONL output
  • Duration consistency verification: all timestamps and durations valid
  • Schema verification: all expected keys present per combo
  • Extract segments works for all 4 combos with metadata.csv generation
  • Ruff lint + format clean

…D/Speaker combos

Fixes the OverflowError crash in JsonlWriter caused by torch.Tensor
waveform leaking through the pipeline into serialization.

Root cause: When VAD and SpeakerSeparation were disabled, no cleanup
stage ran before AudioToDocumentStage, so MonoConversionStage's
waveform tensor leaked into the DataFrame and crashed ujson.

Changes:

- Redesign AudioDataFilterStage.decompose() as a topology selector
  with 4 distinct pipeline builders (one per VAD/Speaker combo),
  each producing a self-consistent stage sequence ending with
  TimestampMapper.

- Separate VAD (_make_vad) from quality filters (_append_quality_filters)
  for clarity: VAD is a segmentation stage, not a filter.

- Fix TimestampMapper to handle all 4 combos:
  1. segment_mappings (full pipeline with SegmentConcat)
  2. start_ms/end_ms (VAD fan-out)
  3. diar_segments (SpeakerSep with speaker timing)
  4. duration fallback (filters only, whole file)

- Replace confusing _STRIP_KEYS blacklist with clean two-layer output
  control: _NEVER_PASS_KEYS safety net (blocks tensors always) +
  passthrough_keys whitelist (user-configurable, sensible default).

- Fix SpeakerSeparation to propagate diar_segments from diarization
  model so Combo 3 output has meaningful speaker timing data.

- Standardize duration key to "duration" across all stages.

- Add BandFilter warning on unexpected prediction values.

- Update extract_segments.py to handle all 4 combos with auto-detection,
  metadata.csv generation, and generic score extraction.

- Add --execution-mode flag to pipeline.py for batch/streaming control.

- Update README.md and pipeline.yaml with topology documentation.
@shubhamNvidia shubhamNvidia requested a review from a team as a code owner April 8, 2026 14:58
@shubhamNvidia shubhamNvidia requested review from abhinavg4 and removed request for a team April 8, 2026 14:58
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented Apr 8, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 8, 2026

Greptile Summary

This PR fixes the waveform tensor leak by adding a _sanitize() guard in AudioToDocumentStage and redesigns AudioDataFilterStage.decompose() into four self-consistent topology builders (Combo 1–4), with TimestampMapperStage now serving as the serialization boundary for all combinations. The instance-level _segment_counter / _speaker_segment_counter in SegmentExtractionStage correctly resolves the prior-thread filename-collision bug, and the adoption of SpeakerResult NamedTuple in tests resolves the prior-thread AttributeError concern. Remaining prior-thread items (segment_index always 0 in metadata CSV for Combos 2/4, Combo 1 detection in the tutorial script) are still open but do not affect the core pipeline correctness.

Confidence Score: 4/5

Safe to merge with awareness of the two open prior-thread data-integrity items (segment_index in CSV, Combo 1 detection in tutorial); core pipeline fix is solid.

The primary bug (tensor leak → ujson OverflowError) is definitively fixed. The four-topology redesign is coherent and the test suite covers all combos. Three remaining findings are P2: incomplete outputs() declaration, BandFilter warning noise, and O(n²) CSV rewrites. The still-open prior-thread items (segment_index=0 in metadata CSV, Combo 1 extraction in tutorial) are data-integrity concerns in the extraction utility, not in the pipeline itself.

timestamp_mapper.py (outputs() declaration), extract_segments.py (segment_index always 0 in metadata CSV for Combos 2/4), tutorials/audio/readspeech/extract_segments.py (Combo 1 unreachable detect_combo branch)

Important Files Changed

Filename Overview
nemo_curator/stages/audio/advanced_pipelines/audio_data_filter/audio_data_filter.py Core refactor replacing monolithic decompose() with four topology builder methods; factory methods are clean and well-scoped.
nemo_curator/stages/audio/postprocessing/timestamp_mapper.py New stage that normalises the pipeline output boundary; outputs() declaration is incomplete—diar_segments and speaking_duration emitted for Combo 3 are not listed.
nemo_curator/stages/audio/io/extract_segments.py New file with instance-level counters (fixing prior-thread batch reset bug); segment_index in metadata CSV is always 0 for Combos 2/4 (prior thread, still present).
nemo_curator/stages/audio/segmentation/speaker_separation.py Propagates diar_segments from SpeakerResult; drops inherited duration/num_samples correctly; waveform cleanup in finally block is correct.
nemo_curator/stages/audio/segmentation/speaker_separation_module/speaker_sep.py Adds SpeakerResult NamedTuple with audio/duration/diar_segments; get_speaker_audio_data returns it correctly; temp file lifecycle is safe.
nemo_curator/stages/audio/io/convert.py Adds _sanitize() to strip non-serializable keys (tensors, audio arrays) before DataFrame conversion—directly fixes the tensor-leak root cause.
nemo_curator/stages/audio/filtering/band.py Added else-branch warning for unexpected predictions; warning also fires on expected "Error: ..." predictor returns, which may be noisy.
nemo_curator/stages/audio/segmentation/vad_segmentation.py Standardised duration key to "duration"; waveform cleanup in nested mode (del task.data[waveform_key]) is correct and pre-existing.
tests/stages/audio/segmentation/test_speaker_separation.py Correctly imports and uses SpeakerResult NamedTuple in all mocks, resolving the prior-thread AttributeError concern.
tests/stages/audio/postprocessing/test_timestamp_mapper.py Comprehensive tests for _translate_to_original and TimestampMapperStage covering all four fallback paths.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    Input([AudioTask])
    Mono[MonoConversionStage]
    Input --> Mono

    Mono --> C1{VAD + Speaker?}

    subgraph Combo1["Combo 1: no VAD, no Speaker"]
        QF1[Quality Filters] --> TM1[TimestampMapper]
    end

    subgraph Combo2["Combo 2: VAD only"]
        VAD2[VADSegmentation fan-out] --> QF2[Quality Filters] --> TM2[TimestampMapper]
    end

    subgraph Combo3["Combo 3: Speaker only"]
        QF3a[Quality Filters] --> SS3[SpeakerSeparation fan-out] --> QF3b[Quality Filters] --> TM3[TimestampMapper]
    end

    subgraph Combo4["Combo 4: VAD + Speaker"]
        VAD4a[VADSegmentation nested] --> QF4a[Quality Filters] --> SC4[SegmentConcat] --> SS4[SpeakerSeparation] --> VAD4b[VADSegmentation fan-out] --> QF4b[Quality Filters] --> TM4[TimestampMapper]
    end

    C1 -->|off, no spk| Combo1
    C1 -->|on, no spk| Combo2
    C1 -->|off, spk on| Combo3
    C1 -->|on, spk on| Combo4

    TM1 & TM2 & TM3 & TM4 --> Sanitize[AudioToDocumentStage _sanitize removes tensors]
    Sanitize --> JSONL([JSONL Output])
Loading

Reviews (25): Last reviewed commit: "Merge branch 'main' into ADV_bug/6057972" | Re-trigger Greptile

Comment on lines +152 to +171
def detect_combo(entries: list) -> int:
"""Detect which pipeline combo produced the manifest.

Returns 1, 2, 3, or 4.
"""
if not entries:
return 1

def _write_segment(
output_path: str, segment_audio: np.ndarray, sample_rate: int, output_format: str
) -> None:
first = entries[0]
has_speaker = "speaker_id" in first
has_diar = "diar_segments" in first
has_timestamps = "original_start_ms" in first and "original_end_ms" in first

if has_speaker and has_diar:
return 3
if has_speaker and has_timestamps:
return 4
if has_timestamps and not has_speaker:
return 2
return 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 detect_combo() Combo 1 branch is unreachable

TimestampMapperStage always emits original_start_ms and original_end_ms — even for Combo 1 (duration-fallback path in _build_output_item_no_mapping, lines 230–239 of timestamp_mapper.py). This means has_timestamps is always True for any manifest produced by the new pipeline, so the third guard (if has_timestamps and not has_speaker) fires for Combo 1 manifests, misidentifying them as Combo 2 and making return 1 unreachable.

Consequence: Combo 1 runs are extracted via extract_combo2 instead of extract_combo1, producing {name}_segment_000.wav files (re-encoded through soundfile) rather than direct copies of the original file.

One fix is to emit a small metadata key (e.g. "pipeline_combo") from TimestampMapperStage so detect_combo can read it unambiguously. Alternatively, add a heuristic — e.g. Combo 1 outputs always have original_start_ms == 0 and a single entry per original_file, so detect by counting entries per file:

def detect_combo(entries: list) -> int:
    if not entries:
        return 1

    first = entries[0]
    has_speaker = "speaker_id" in first
    has_diar = "diar_segments" in first
    has_timestamps = "original_start_ms" in first and "original_end_ms" in first

    if has_speaker and has_diar:
        return 3
    if has_speaker and has_timestamps:
        return 4
    if has_timestamps and not has_speaker:
        # Distinguish Combo 2 (multiple segments per file) from Combo 1 (whole file)
        files_seen = {e.get("original_file") for e in entries}
        entries_per_file = len(entries) / max(len(files_seen), 1)
        if entries_per_file > 1.0:
            return 2
        # Single entry per file with start=0 is Combo 1
        if first.get("original_start_ms", -1) == 0:
            return 1
        return 2
    return 1

Note: even this heuristic can be fooled (e.g. a Combo 2 run where every file produced exactly one segment). Emitting a "pipeline_combo" marker is the robust solution.

Comment on lines +214 to +226
diar_segments = item.get("diar_segments")
if diar_segments and len(diar_segments) > 0:
first_start = diar_segments[0][0]
last_end = diar_segments[-1][1]
result["original_start_ms"] = int(first_start * 1000)
result["original_end_ms"] = int(last_end * 1000)
result["duration_ms"] = int((last_end - first_start) * 1000)
result["duration"] = last_end - first_start
speaking = sum(end - start for start, end in diar_segments)
result["speaking_duration"] = round(speaking, 3)
result["diar_segments"] = [[round(s, 3), round(e, 3)] for s, e in diar_segments]
self._copy_passthrough(item, result)
return result
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 diar_segments/speaking_duration bypass passthrough_keys control

Both speaking_duration (line 223) and diar_segments (line 224) are written directly into result, so they are always present in Combo 3 output regardless of what the user sets in passthrough_keys. This is inconsistent with the documented contract — users who restrict passthrough_keys to a small set would still get these fields injected.

If these are intended to be mandatory Combo 3 output fields (which is reasonable), a brief note in the docstring clarifying that diar_segments and speaking_duration are always emitted when diar_segments is available would set correct expectations.

Comment on lines +222 to +231
first_start = diar_segments[0][0]
last_end = diar_segments[-1][1]
result["original_start_ms"] = int(first_start * 1000)
result["original_end_ms"] = int(last_end * 1000)
result["duration_ms"] = int((last_end - first_start) * 1000)
result["duration"] = last_end - first_start
speaking = sum(end - start for start, end in diar_segments)
result["speaking_duration"] = round(speaking, 3)
result["diar_segments"] = [[round(s, 3), round(e, 3)] for s, e in diar_segments]
self._copy_passthrough(item, result)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unsorted diar_segments produces wrong span

This block uses diar_segments[0][0] as first_start and diar_segments[-1][1] as last_end, implicitly assuming the list is time-ordered. Neither the Sortformer post-processing stage nor the speaker-task builder guarantees chronological ordering. If a segment is out of order, original_start_ms, original_end_ms, and duration_ms in the manifest will be wrong — silently, since no error is raised.

The downstream extraction code in extract_segments.py already handles this correctly by explicitly sorting before iterating. The same sort should be applied here before computing the span to keep the manifest values consistent.

@shubhamNvidia shubhamNvidia force-pushed the ADV_bug/6057972 branch 2 times, most recently from e437b78 to 77e0bab Compare April 9, 2026 05:57
…assthrough fields

- Remove unreachable extract_combo1 (TimestampMapper always sets timestamps)
- Merge combos 1 & 2 into extract_segments_by_timestamps
- Rename extraction functions to be descriptive
- Document diar_segments/speaking_duration as core fields in TimestampMapper docstring
@sarahyurick sarahyurick added the r1.2.0 Pick this label for auto cherry-picking into r1.2.0 label Apr 13, 2026
@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test dca8b9c

@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 0f9319d

- Replace fragile tuple return type in get_speaker_audio_data with
  SpeakerResult NamedTuple for self-documenting named field access.
- Update caller in speaker_separation.py to use result.audio,
  result.duration, result.diar_segments instead of positional unpacking.
- Document _INHERITED_DROP_KEYS explaining why duration/num_samples
  are dropped from parent tasks during speaker separation.
- Add comments in pipeline.py and pipeline.yaml documenting the
  default passthrough_keys and how to restrict output columns.
- Add speaker separation note to README explaining dropped keys.
@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 55a389c

Comment on lines 45 to 48
speaker_data = {
"speaker_0": (_make_audio_segment(3000), 3.0),
"speaker_1": (_make_audio_segment(4000), 4.0),
"speaker_0": (_make_audio_segment(3000), 3.0, [(0.0, 3.0)]),
"speaker_1": (_make_audio_segment(4000), 4.0, [(0.0, 4.0)]),
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Test mocks use plain tuples instead of SpeakerResult NamedTuples

_build_speaker_tasks now accesses result.audio, result.duration, and result.diar_segments as named attributes (speaker_separation.py lines 177–188). Plain tuples don't have these attributes, so every test that calls stage.process() here will raise AttributeError: 'tuple' object has no attribute 'duration'. The same pattern appears in test_process_output_keys and test_min_duration_filters_short_speakers on the same file.

Suggested change
speaker_data = {
"speaker_0": (_make_audio_segment(3000), 3.0),
"speaker_1": (_make_audio_segment(4000), 4.0),
"speaker_0": (_make_audio_segment(3000), 3.0, [(0.0, 3.0)]),
"speaker_1": (_make_audio_segment(4000), 4.0, [(0.0, 4.0)]),
}
speaker_data = {
"speaker_0": SpeakerResult(_make_audio_segment(3000), 3.0, [(0.0, 3.0)]),
"speaker_1": SpeakerResult(_make_audio_segment(4000), 4.0, [(0.0, 4.0)]),
}

Add the following import at the top of the test file:

from nemo_curator.stages.audio.segmentation.speaker_separation_module.speaker_sep import SpeakerResult

The same fix is needed on lines 68–70 and 88–91 in this file.

Update test_speaker_separation.py to construct SpeakerResult
instead of plain tuples, matching the named attribute access
introduced in speaker_separation.py.
@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test b5a43d2

@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test ddbcbfd

Move reusable segment extraction functions from the tutorial script
into the nemo_curator package as a proper ProcessingStage. The tutorial
becomes a thin CLI wrapper importing from the package.

- Add nemo_curator/stages/audio/io/extract_segments.py with
  SegmentExtractionStage (ProcessingStage[AudioTask, AudioTask])
- Slim tutorials/audio/readspeech/extract_segments.py to CLI wrapper
- Add tests/stages/audio/io/test_extract_segments.py with clean imports
@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 9448042

Comment thread nemo_curator/stages/audio/io/extract_segments.py Outdated
@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test efeb66d

Comment on lines +355 to +371
def _extract_by_timestamps(
self, entries: list[dict],
) -> tuple[int, float, dict[str, int], list[dict]]:
"""Combo 2: extract by original_start_ms / original_end_ms."""
counter: dict[str, int] = defaultdict(int)

def _make_filename(name: str, _entry: dict, _seg_idx: int) -> str:
idx = counter[name]
counter[name] += 1
return f"{name}_segment_{idx:03d}.{self.output_format}"

return self._extract_file_segments(
entries,
sort_key=lambda x: x.get("original_start_ms", 0),
get_intervals=_intervals_from_timestamps,
make_filename=_make_filename,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Segment filename collision across batches (combos 2 & 4)

counter (combo 2) and per_speaker_count (combo 4, line 393) are local to each _extract_by_timestamps / _extract_speaker_timestamps call, so they reset to 0 on every process_batch invocation. When a single original_file produces more segments than batch_size (e.g., a long recording that generates 100 VAD segments with batch_size=64), the second batch resets the counter and silently overwrites {name}_segment_000.wav{name}_segment_035.wav written by the first batch.

extract_from_manifest is fine because it processes all entries in one call. The bug only affects the stage-based pipeline path. Promoting the counters to instance-level state (similar to _all_metadata_rows) would fix both combos atomically.

@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 30d91b5

…ns to fit single GPU

Signed-off-by: shbhawsar <shbhawsar@nvidia.com>
@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test 12e3fdc

@shubhamNvidia
Copy link
Copy Markdown
Contributor Author

/ok to test bc28496

Comment on lines +448 to +466
for entry in file_entries:
intervals = get_intervals(entry)
for seg_idx, (start_ms, end_ms, dur) in enumerate(intervals):
out_filename = make_filename(original_name, entry, seg_idx)
output_path = os.path.join(self.output_dir, out_filename)

try:
audio = _read_segment(original_file, start_ms, end_ms, info.samplerate)
sf.write(output_path, audio, info.samplerate, subtype=SOUNDFILE_FORMATS[self.output_format])
extracted += 1
total_dur += dur

speaker_id = entry.get("speaker_id")
if speaker_id:
speaker_counts[speaker_id] += 1

metadata_rows.append(
_base_metadata(out_filename, original_file, entry, seg_idx, start_ms, end_ms, dur)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 segment_index is always 0 in metadata CSV for Combos 2 and 4

For Combos 2 and 4, _intervals_from_timestamps returns a single-element list, so enumerate(intervals) always yields seg_idx=0. Every row in the CSV will have segment_index=0 while the extracted filenames correctly use the globally-incrementing counter (e.g., _segment_000, _segment_001, …).

Any downstream script reading segment_index from metadata.csv to reconstruct which file corresponds to which row will get incorrect data. One fix is to return the global index from make_filename so it can be passed to _base_metadata.

@sarahyurick
Copy link
Copy Markdown
Contributor

/ok to test 10f0353

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

r1.2.0 Pick this label for auto cherry-picking into r1.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants