Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f44d39c
JsonlAudioReaderStage and TarredAudioManifestReader
ssh-meister Apr 9, 2026
df28b5c
Fix GitHub Actions / ruff errors
ssh-meister Apr 9, 2026
7ef229b
Fix GitHub Actions / ruff errors
ssh-meister Apr 9, 2026
f2f7065
Fix GitHub Actions / ruff errors
ssh-meister Apr 9, 2026
8d9ff06
fix tarred audio cleanup and task metadata handling
ssh-meister Apr 9, 2026
133fc4f
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister Apr 9, 2026
cb90c3a
fix tarred audio segmentation and reader typing
ssh-meister Apr 9, 2026
64630e7
fix tarred audio segmentation and reader typing
ssh-meister Apr 9, 2026
76dd039
Fix GitHub Actions / ruff errors
ssh-meister Apr 9, 2026
e1f41bb
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister Apr 16, 2026
8bbd25b
Added stable and optional durable materialization
ssh-meister Apr 16, 2026
41bfc0e
fix linter errors
ssh-meister Apr 16, 2026
589b548
fix linter errors
ssh-meister Apr 16, 2026
b57c979
fix _PipeStream.__exit__ raises on SIGPIPE after early break
ssh-meister Apr 16, 2026
fab3340
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister Apr 16, 2026
a365731
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister Apr 24, 2026
0b45679
Refactor remote audio file handling into generic file IO stages, add …
ssh-meister Apr 30, 2026
5dc6cdb
merge origin/main
ssh-meister Apr 30, 2026
6aba05b
fix linter errros
ssh-meister Apr 30, 2026
8ff6d36
fix strict=True zip raises ValueError when blank lines are skipped
ssh-meister Apr 30, 2026
df24763
CleanupTemporaryAudioStage imported from wrong module fix
ssh-meister Apr 30, 2026
c7b00fc
Wrong monkeypatch target fix
ssh-meister Apr 30, 2026
3487b00
Computed sample key is lost during serialization fix
ssh-meister Apr 30, 2026
f0f825b
Test assertion will fail — sample_key is injected into filtered data fix
ssh-meister Apr 30, 2026
4f40be7
removed double tar traversal for remote shards
ssh-meister Apr 30, 2026
409557e
Raise error for duration <= 0 and double file materialization prevention
ssh-meister Apr 30, 2026
35f8b5a
sample_key declaration
ssh-meister Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions nemo_curator/stages/audio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@
SIGMOSFilterStage,
UTMOSFilterStage,
)
from nemo_curator.stages.audio.postprocessing import (
TimestampMapperStage,
from nemo_curator.stages.audio.io import (
AudioManifestReader,
AudioToDocumentStage,
CleanupTemporaryAudioStage,
MaterializeTarredAudioStage,
TarredAudioManifestReader,
)
from nemo_curator.stages.audio.postprocessing import TimestampMapperStage
from nemo_curator.stages.audio.preprocessing import (
MonoConversionStage,
SegmentConcatenationStage,
Expand All @@ -52,15 +57,20 @@
"ALMDataBuilderStage",
"ALMDataOverlapStage",
"AudioDataFilterStage",
"AudioManifestReader",
"AudioToDocumentStage",
"BandFilterStage",
"CleanupTemporaryAudioStage",
"GetAudioDurationStage",
"ManifestReader",
"ManifestWriterStage",
"MaterializeTarredAudioStage",
"MonoConversionStage",
"PreserveByValueStage",
"SIGMOSFilterStage",
"SegmentConcatenationStage",
"SpeakerSeparationStage",
"TarredAudioManifestReader",
"TimestampMapperStage",
"UTMOSFilterStage",
"VADSegmentationStage",
Expand Down
34 changes: 34 additions & 0 deletions nemo_curator/stages/audio/io/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nemo_curator.stages.audio.io.convert import AudioToDocumentStage
from nemo_curator.stages.audio.io.manifest import AudioManifestReader, AudioManifestReaderStage
from nemo_curator.stages.audio.io.materialize import CleanupTemporaryAudioStage
from nemo_curator.stages.audio.io.tarred import (
MaterializeTarredAudioStage,
TarredAudioManifestPartitionStage,
TarredAudioManifestReader,
TarredAudioManifestReaderStage,
)

__all__ = [
"AudioManifestReader",
"AudioManifestReaderStage",
"AudioToDocumentStage",
"CleanupTemporaryAudioStage",
"MaterializeTarredAudioStage",
"TarredAudioManifestPartitionStage",
"TarredAudioManifestReader",
"TarredAudioManifestReaderStage",
]
123 changes: 123 additions & 0 deletions nemo_curator/stages/audio/io/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import json
from dataclasses import dataclass, field
from typing import Any, Literal

from nemo_curator.stages.base import CompositeStage, ProcessingStage
from nemo_curator.stages.file_partitioning import FilePartitioningStage
from nemo_curator.tasks import AudioTask, FileGroupTask, _EmptyTask
from nemo_curator.tasks.audio_task import build_audio_sample_key
from nemo_curator.utils.remote_io import open_text_stream


@dataclass
class AudioManifestReaderStage(ProcessingStage[FileGroupTask, AudioTask]):
fields: list[str] | None = None
storage_options: dict[str, Any] | None = None
transport: Literal["auto", "fsspec", "pipe"] = "auto"
audio_filepath_key: str = "audio_filepath"
manifest_path_key: str = "_manifest_path"
source_type_key: str = "_audio_source_type"
source_type_value: str = "manifest"
name: str = "audio_manifest_reader"

def inputs(self) -> tuple[list[str], list[str]]:
return [], []

def outputs(self) -> tuple[list[str], list[str]]:
output_fields = list(self.fields or [])
if self.audio_filepath_key not in output_fields:
output_fields.append(self.audio_filepath_key)
output_fields.extend([self.manifest_path_key, self.source_type_key])
return ["sample_key"], output_fields

def process(self, task: FileGroupTask) -> list[AudioTask]:
results: list[AudioTask] = []
for manifest_index, manifest_path in enumerate(task.data):
with open_text_stream(
manifest_path,
storage_options=self.storage_options,
transport=self.transport,
) as fin:
for entry_index, line in enumerate(fin):
if not line.strip():
continue
raw_entry = json.loads(line)
sample_key = build_audio_sample_key(raw_entry, dataset_name=task.dataset_name)
entry = dict(raw_entry)
if self.fields is not None:
entry = {field: entry[field] for field in self.fields if field in entry}
if self.audio_filepath_key in raw_entry and self.audio_filepath_key not in entry:
entry[self.audio_filepath_key] = raw_entry[self.audio_filepath_key]
entry[self.manifest_path_key] = manifest_path
entry[self.source_type_key] = self.source_type_value
results.append(
AudioTask(
task_id=f"{task.task_id}_{manifest_index}_{entry_index}",
dataset_name=task.dataset_name,
data=entry,
sample_key=sample_key,
_metadata=task._metadata,
_stage_perf=list(task._stage_perf),
)
)
return results


@dataclass
class AudioManifestReader(CompositeStage[_EmptyTask, AudioTask]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the existing manifest reader?

manifest_paths: str | list[str]
files_per_partition: int | None = 1
blocksize: int | str | None = None
file_extensions: list[str] = field(default_factory=lambda: [".jsonl", ".json"])
fields: list[str] | None = None
storage_options: dict[str, Any] | None = None
transport: Literal["auto", "fsspec", "pipe"] = "auto"
limit: int | None = None
name: str = "audio_manifest_reader"

def __post_init__(self) -> None:
super().__init__()
if not self.manifest_paths:
msg = "manifest_paths is required for AudioManifestReader"
raise ValueError(msg)

def decompose(self) -> list[ProcessingStage]:
return [
FilePartitioningStage(
file_paths=self.manifest_paths,
files_per_partition=self.files_per_partition,
blocksize=self.blocksize,
file_extensions=self.file_extensions,
storage_options=self.storage_options,
limit=self.limit,
),
AudioManifestReaderStage(
fields=self.fields,
storage_options=self.storage_options,
transport=self.transport,
),
]

def get_description(self) -> str:
parts = [f"Read audio manifests from {self.manifest_paths}"]
if self.files_per_partition:
parts.append(f"with {self.files_per_partition} files per partition")
elif self.blocksize:
parts.append(f"with target blocksize {self.blocksize}")
return ", ".join(parts)
Loading
Loading