Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
507b8c4
Fix waveform tensor leak and redesign AudioDataFilterStage for all VA…
shubhamNvidia Apr 8, 2026
77e0bab
Address PR review: fix unreachable combo detection, clarify core vs p…
shubhamNvidia Apr 9, 2026
dca8b9c
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 15, 2026
dbe4813
Fix test_decompose_all_disabled_except_mono to expect TimestampMapper…
shubhamNvidia Apr 15, 2026
221a094
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 15, 2026
6e72c7c
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 16, 2026
b73652b
Merge branch 'main' into ADV_bug/6057972
sarahyurick Apr 16, 2026
dc010ad
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 17, 2026
bc9eb9a
Add non-serializable key sanitization and refactor extract_segments
shubhamNvidia Apr 17, 2026
a18002f
Merge remote-tracking branch 'upstream/main' into ADV_bug/6057972
shubhamNvidia Apr 17, 2026
0f9319d
Fix ruff lint errors in test_timestamp_mapper and extract_segments
shubhamNvidia Apr 17, 2026
55a389c
Address PR review: SpeakerResult NamedTuple, docs, passthrough_keys
shubhamNvidia Apr 17, 2026
b5a43d2
Fix test mocks to use SpeakerResult NamedTuple
shubhamNvidia Apr 17, 2026
ddbcbfd
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 19, 2026
9448042
refactor: move extract_segments logic into SegmentExtractionStage
shubhamNvidia Apr 20, 2026
efeb66d
fix: accumulate metadata rows across batches in SegmentExtractionStage
shubhamNvidia Apr 20, 2026
30d91b5
fix: persistent counters for all combos and single-worker constraint
shubhamNvidia Apr 20, 2026
12e3fdc
fix: NameError in run.py, lazy executor factory, reduce GPU allocatio…
shubhamNvidia Apr 20, 2026
1e8cb59
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 21, 2026
bc28496
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 21, 2026
2d8a779
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 22, 2026
cec9095
Merge branch 'main' into ADV_bug/6057972
ayushdg Apr 22, 2026
4f4c4a8
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 22, 2026
886c19e
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 22, 2026
10f0353
Merge branch 'main' into ADV_bug/6057972
shubhamNvidia Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ class AudioDataFilterStage(CompositeStage):
cross-file parallelism. Each stage owns its own default resource
Comment thread
mohammadaaftabv marked this conversation as resolved.
allocation. Use ``.with_()`` to override individual stage resources.

Supports four pipeline topologies based on which features are enabled:

- **Combo 1** (VAD=off, Speaker=off): MonoConversion → Filters → TimestampMapper
- **Combo 2** (VAD=on, Speaker=off): MonoConversion → VAD(fan-out) → Filters → TimestampMapper
- **Combo 3** (VAD=off, Speaker=on): MonoConversion → Filters → SpeakerSep → Filters → TimestampMapper
- **Combo 4** (VAD=on, Speaker=on): Full pipeline with SegmentConcat + TimestampMapper

Args:
config_path: Path to a YAML config file. When *None* the
built-in ``default_config.yaml`` is used.
Expand All @@ -84,136 +91,160 @@ def __init__(
self._cfg = _deep_merge(self._cfg, config)

def decompose(self) -> list[ProcessingStage]:
"""Build a self-consistent pipeline topology based on enabled features."""
cfg = self._cfg
stages: list[ProcessingStage] = []

mc = cfg.get("mono_conversion", {})
enable_vad = cfg.get("vad", {}).get("enable", True)
enable_speaker = cfg.get("speaker_separation", {}).get("enable", True)

if enable_vad and enable_speaker:
stages = self._build_full_pipeline(cfg)
elif enable_vad:
stages = self._build_vad_only_pipeline(cfg)
elif enable_speaker:
stages = self._build_speaker_only_pipeline(cfg)
else:
stages = self._build_filters_only_pipeline(cfg)

enabled = get_enabled_stages(cfg)
logger.info(
f"AudioDataFilterStage decomposed into {len(stages)} stages "
f"(enabled: {enabled}, speaker_sep: {enable_speaker})"
)
return stages

# ------------------------------------------------------------------
# Topology builders (one per feature combination)
# ------------------------------------------------------------------

def _build_full_pipeline(self, cfg: dict) -> list[ProcessingStage]:
"""Combo 4: VAD=on, Speaker=on. Identical to the original design."""
stages: list[ProcessingStage] = [self._make_mono(cfg)]

stages.append(self._make_vad(cfg, suffix="", nested=True))
self._append_quality_filters(stages, cfg, suffix="")

concat = cfg.get("concatenation", {})
stages.append(
MonoConversionStage(
output_sample_rate=mc.get("output_sample_rate", 48000),
strict_sample_rate=mc.get("strict_sample_rate", True),
name="MonoConversion",
resources=Resources(cpus=mc.get("cpus", 1.0)),
SegmentConcatenationStage(
silence_duration_sec=concat.get("silence_duration_sec", 0.5),
name="SegmentConcat",
resources=Resources(cpus=concat.get("cpus", 1.0)),
)
)

vad = cfg.get("vad", {})
band = cfg.get("band_filter", {})
utmos = cfg.get("utmos", {})
sigmos = cfg.get("sigmos", {})
speaker = cfg.get("speaker_separation", {})
concat = cfg.get("concatenation", {})
ts = cfg.get("timestamp_mapper", {})
stages.append(self._make_speaker_sep(cfg))

enable_vad = vad.get("enable", True)
enable_band = band.get("enable", True)
enable_utmos = utmos.get("enable", True)
enable_sigmos = sigmos.get("enable", True)
enable_speaker = speaker.get("enable", True)

self._append_filter_stages(
stages,
vad,
band,
utmos,
sigmos,
enable_vad,
enable_band,
enable_utmos,
enable_sigmos,
suffix="",
)
stages.append(self._make_vad(cfg, suffix="_Speaker", nested=False))
self._append_quality_filters(stages, cfg, suffix="_Speaker")

if enable_speaker:
if enable_vad:
stages.append(
SegmentConcatenationStage(
silence_duration_sec=concat.get("silence_duration_sec", 0.5),
name="SegmentConcat",
resources=Resources(cpus=concat.get("cpus", 1.0)),
)
)
stages.append(self._make_timestamp_mapper(cfg))
return stages

stages.append(
SpeakerSeparationStage(
exclude_overlaps=speaker.get("exclude_overlaps", True),
min_duration=speaker.get("min_duration", 0.8),
gap_threshold=speaker.get("gap_threshold", 0.1),
buffer_time=speaker.get("buffer_time", 0.5),
name="SpeakerSeparation",
resources=Resources(
cpus=speaker.get("cpus", 1.0),
gpus=speaker.get("gpus", 1.0),
),
)
)
def _build_vad_only_pipeline(self, cfg: dict) -> list[ProcessingStage]:
"""Combo 2: VAD=on, Speaker=off. VAD fans out, OutputNormalizer cleans up."""
stages: list[ProcessingStage] = [self._make_mono(cfg)]

self._append_filter_stages(
stages,
vad,
band,
utmos,
sigmos,
enable_vad,
enable_band,
enable_utmos,
enable_sigmos,
suffix="_Speaker",
)
stages.append(self._make_vad(cfg, suffix="", nested=False))
self._append_quality_filters(stages, cfg, suffix="")

if enable_vad or enable_speaker:
stages.append(
TimestampMapperStage(
passthrough_keys=ts.get("passthrough_keys"),
name="TimestampMapper",
resources=Resources(cpus=ts.get("cpus", 1.0)),
)
)
stages.append(self._make_timestamp_mapper(cfg))
return stages

enabled = get_enabled_stages(cfg)
logger.info(
f"AudioDataFilterStage decomposed into {len(stages)} stages "
f"(enabled: {enabled}, speaker_sep: {enable_speaker})"
)
def _build_speaker_only_pipeline(self, cfg: dict) -> list[ProcessingStage]:
"""Combo 3: VAD=off, Speaker=on. SpeakerSep fans out with diar_segments."""
stages: list[ProcessingStage] = [self._make_mono(cfg)]

self._append_quality_filters(stages, cfg, suffix="")

stages.append(self._make_speaker_sep(cfg))

self._append_quality_filters(stages, cfg, suffix="_Speaker")

stages.append(self._make_timestamp_mapper(cfg))
return stages

def _build_filters_only_pipeline(self, cfg: dict) -> list[ProcessingStage]:
"""Combo 1: VAD=off, Speaker=off. Filters only, TimestampMapper cleans up."""
stages: list[ProcessingStage] = [self._make_mono(cfg)]

self._append_quality_filters(stages, cfg, suffix="")

stages.append(self._make_timestamp_mapper(cfg))
return stages

# ------------------------------------------------------------------
# Stage factories
# ------------------------------------------------------------------

@staticmethod
def _append_filter_stages( # noqa: PLR0913
def _make_mono(cfg: dict) -> MonoConversionStage:
mc = cfg.get("mono_conversion", {})
return MonoConversionStage(
output_sample_rate=mc.get("output_sample_rate", 48000),
strict_sample_rate=mc.get("strict_sample_rate", True),
name="MonoConversion",
resources=Resources(cpus=mc.get("cpus", 1.0)),
)

@staticmethod
def _make_vad(cfg: dict, *, suffix: str, nested: bool) -> VADSegmentationStage:
vad = cfg.get("vad", {})
return VADSegmentationStage(
min_duration_sec=vad.get("min_duration_sec", 2.0),
max_duration_sec=vad.get("max_duration_sec", 60.0),
threshold=vad.get("threshold", 0.5),
min_interval_ms=vad.get("min_interval_ms", 500),
speech_pad_ms=vad.get("speech_pad_ms", 300),
nested=nested,
name=f"VAD{suffix}",
resources=Resources(
cpus=vad.get("cpus", 1.0),
gpus=vad.get("gpus", 0.3),
),
)

@staticmethod
def _make_speaker_sep(cfg: dict) -> SpeakerSeparationStage:
speaker = cfg.get("speaker_separation", {})
return SpeakerSeparationStage(
exclude_overlaps=speaker.get("exclude_overlaps", True),
min_duration=speaker.get("min_duration", 0.8),
gap_threshold=speaker.get("gap_threshold", 0.1),
buffer_time=speaker.get("buffer_time", 0.5),
name="SpeakerSeparation",
resources=Resources(
cpus=speaker.get("cpus", 1.0),
gpus=speaker.get("gpus", 1.0),
),
)

@staticmethod
def _make_timestamp_mapper(cfg: dict) -> TimestampMapperStage:
ts = cfg.get("timestamp_mapper", {})
return TimestampMapperStage(
passthrough_keys=ts.get("passthrough_keys"),
name="TimestampMapper",
resources=Resources(cpus=ts.get("cpus", 1.0)),
)

# ------------------------------------------------------------------
# Quality filter helpers
# ------------------------------------------------------------------

@staticmethod
def _append_quality_filters(
stages: list[ProcessingStage],
vad: dict,
band: dict,
utmos: dict,
sigmos: dict,
enable_vad: bool,
enable_band: bool,
enable_utmos: bool,
enable_sigmos: bool,
cfg: dict,
*,
suffix: str,
) -> None:
"""Append VAD + quality filter stages to *stages* list."""
if enable_vad:
# Pre-speaker pass (suffix==""): nested=True so VAD stores segments
# inside the task for SegmentConcatenation to merge.
# Post-speaker pass (suffix=="_Speaker"): nested=False so VAD fans
# out into separate tasks for independent downstream processing.
stages.append(
VADSegmentationStage(
min_duration_sec=vad.get("min_duration_sec", 2.0),
max_duration_sec=vad.get("max_duration_sec", 60.0),
threshold=vad.get("threshold", 0.5),
min_interval_ms=vad.get("min_interval_ms", 500),
speech_pad_ms=vad.get("speech_pad_ms", 300),
nested=(suffix == ""),
name=f"VAD{suffix}",
resources=Resources(
cpus=vad.get("cpus", 1.0),
gpus=vad.get("gpus", 0.3),
),
)
)
"""Append quality filter stages (Band, UTMOS, SIGMOS) to *stages*."""
band = cfg.get("band_filter", {})
utmos = cfg.get("utmos", {})
sigmos = cfg.get("sigmos", {})

if enable_band:
if band.get("enable", True):
stages.append(
BandFilterStage(
band_value=band.get("band_value", "full_band"),
Expand All @@ -225,7 +256,7 @@ def _append_filter_stages( # noqa: PLR0913
)
)

if enable_utmos:
if utmos.get("enable", True):
stages.append(
UTMOSFilterStage(
mos_threshold=utmos.get("mos_threshold", 3.5),
Expand All @@ -237,7 +268,7 @@ def _append_filter_stages( # noqa: PLR0913
)
)

if enable_sigmos:
if sigmos.get("enable", True):
stages.append(
SIGMOSFilterStage(
noise_threshold=sigmos.get("noise_threshold", 4.0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ vad:
min_interval_ms: 500
speech_pad_ms: 300
cpus: 1.0
gpus: 0.3
gpus: 0.1

band_filter:
enable: true
Expand All @@ -31,7 +31,7 @@ utmos:
enable: true
mos_threshold: 3.4
cpus: 1.0
gpus: 0.5
gpus: 0.2

sigmos:
enable: true
Expand All @@ -43,7 +43,7 @@ sigmos:
loud_threshold: null
reverb_threshold: null
cpus: 1.0
gpus: 0.5
gpus: 0.2

concatenation:
silence_duration_sec: 0.5
Expand All @@ -56,7 +56,7 @@ speaker_separation:
gap_threshold: 0.1
buffer_time: 0.5
cpus: 1.0
gpus: 1.0
gpus: 0.4

timestamp_mapper:
passthrough_keys: null
Expand Down
2 changes: 2 additions & 0 deletions nemo_curator/stages/audio/filtering/band.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ def _process_single(self, task: AudioTask) -> AudioTask | None:
pred = self._predictor.predict_audio(waveform, sample_rate)
if isinstance(pred, str) and not pred.startswith("Error") and pred in ("full_band", "narrow_band"):
task.data["band_prediction"] = pred
else:
logger.warning(f"[{task.task_id}] BandFilter: unexpected prediction value: {pred!r}")
except Exception as e: # noqa: BLE001
logger.exception(f"[BandFilter] Prediction error: {e}")
return None
Expand Down
Loading
Loading