-
Notifications
You must be signed in to change notification settings - Fork 264
Add audio JSONL reader and tarred ASR dataset support #1780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ssh-meister
wants to merge
27
commits into
main
Choose a base branch
from
ameister/reader
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
f44d39c
JsonlAudioReaderStage and TarredAudioManifestReader
ssh-meister df28b5c
Fix GitHub Actions / ruff errors
ssh-meister 7ef229b
Fix GitHub Actions / ruff errors
ssh-meister f2f7065
Fix GitHub Actions / ruff errors
ssh-meister 8d9ff06
fix tarred audio cleanup and task metadata handling
ssh-meister 133fc4f
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister cb90c3a
fix tarred audio segmentation and reader typing
ssh-meister 64630e7
fix tarred audio segmentation and reader typing
ssh-meister 76dd039
Fix GitHub Actions / ruff errors
ssh-meister e1f41bb
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister 8bbd25b
Added stable and optional durable materialization
ssh-meister 41bfc0e
fix linter errors
ssh-meister 589b548
fix linter errors
ssh-meister b57c979
fix _PipeStream.__exit__ raises on SIGPIPE after early break
ssh-meister fab3340
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister a365731
Merge remote-tracking branch 'origin/main' into ameister/reader
ssh-meister 0b45679
Refactor remote audio file handling into generic file IO stages, add …
ssh-meister 5dc6cdb
merge origin/main
ssh-meister 6aba05b
fix linter errros
ssh-meister 8ff6d36
fix strict=True zip raises ValueError when blank lines are skipped
ssh-meister df24763
CleanupTemporaryAudioStage imported from wrong module fix
ssh-meister c7b00fc
Wrong monkeypatch target fix
ssh-meister 3487b00
Computed sample key is lost during serialization fix
ssh-meister f0f825b
Test assertion will fail — sample_key is injected into filtered data fix
ssh-meister 4f40be7
removed double tar traversal for remote shards
ssh-meister 409557e
Raise error for duration <= 0 and double file materialization prevention
ssh-meister 35f8b5a
sample_key declaration
ssh-meister File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from nemo_curator.stages.audio.io.convert import AudioToDocumentStage | ||
| from nemo_curator.stages.audio.io.manifest import AudioManifestReader, AudioManifestReaderStage | ||
| from nemo_curator.stages.audio.io.materialize import CleanupTemporaryAudioStage | ||
| from nemo_curator.stages.audio.io.tarred import ( | ||
| MaterializeTarredAudioStage, | ||
| TarredAudioManifestPartitionStage, | ||
| TarredAudioManifestReader, | ||
| TarredAudioManifestReaderStage, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "AudioManifestReader", | ||
| "AudioManifestReaderStage", | ||
| "AudioToDocumentStage", | ||
| "CleanupTemporaryAudioStage", | ||
| "MaterializeTarredAudioStage", | ||
| "TarredAudioManifestPartitionStage", | ||
| "TarredAudioManifestReader", | ||
| "TarredAudioManifestReaderStage", | ||
| ] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| # Copyright (c) 2026, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| from dataclasses import dataclass, field | ||
| from typing import Any, Literal | ||
|
|
||
| from nemo_curator.stages.base import CompositeStage, ProcessingStage | ||
| from nemo_curator.stages.file_partitioning import FilePartitioningStage | ||
| from nemo_curator.tasks import AudioTask, FileGroupTask, _EmptyTask | ||
| from nemo_curator.tasks.audio_task import build_audio_sample_key | ||
| from nemo_curator.utils.remote_io import open_text_stream | ||
|
|
||
|
|
||
| @dataclass | ||
| class AudioManifestReaderStage(ProcessingStage[FileGroupTask, AudioTask]): | ||
| fields: list[str] | None = None | ||
| storage_options: dict[str, Any] | None = None | ||
| transport: Literal["auto", "fsspec", "pipe"] = "auto" | ||
| audio_filepath_key: str = "audio_filepath" | ||
| manifest_path_key: str = "_manifest_path" | ||
| source_type_key: str = "_audio_source_type" | ||
| source_type_value: str = "manifest" | ||
| name: str = "audio_manifest_reader" | ||
|
|
||
| def inputs(self) -> tuple[list[str], list[str]]: | ||
| return [], [] | ||
|
|
||
| def outputs(self) -> tuple[list[str], list[str]]: | ||
| output_fields = list(self.fields or []) | ||
| if self.audio_filepath_key not in output_fields: | ||
| output_fields.append(self.audio_filepath_key) | ||
| output_fields.extend([self.manifest_path_key, self.source_type_key]) | ||
| return ["sample_key"], output_fields | ||
|
|
||
| def process(self, task: FileGroupTask) -> list[AudioTask]: | ||
| results: list[AudioTask] = [] | ||
| for manifest_index, manifest_path in enumerate(task.data): | ||
| with open_text_stream( | ||
| manifest_path, | ||
| storage_options=self.storage_options, | ||
| transport=self.transport, | ||
| ) as fin: | ||
| for entry_index, line in enumerate(fin): | ||
| if not line.strip(): | ||
| continue | ||
| raw_entry = json.loads(line) | ||
| sample_key = build_audio_sample_key(raw_entry, dataset_name=task.dataset_name) | ||
| entry = dict(raw_entry) | ||
| if self.fields is not None: | ||
| entry = {field: entry[field] for field in self.fields if field in entry} | ||
| if self.audio_filepath_key in raw_entry and self.audio_filepath_key not in entry: | ||
| entry[self.audio_filepath_key] = raw_entry[self.audio_filepath_key] | ||
| entry[self.manifest_path_key] = manifest_path | ||
| entry[self.source_type_key] = self.source_type_value | ||
| results.append( | ||
| AudioTask( | ||
| task_id=f"{task.task_id}_{manifest_index}_{entry_index}", | ||
| dataset_name=task.dataset_name, | ||
| data=entry, | ||
| sample_key=sample_key, | ||
| _metadata=task._metadata, | ||
| _stage_perf=list(task._stage_perf), | ||
| ) | ||
| ) | ||
| return results | ||
|
|
||
|
|
||
| @dataclass | ||
| class AudioManifestReader(CompositeStage[_EmptyTask, AudioTask]): | ||
| manifest_paths: str | list[str] | ||
| files_per_partition: int | None = 1 | ||
| blocksize: int | str | None = None | ||
| file_extensions: list[str] = field(default_factory=lambda: [".jsonl", ".json"]) | ||
| fields: list[str] | None = None | ||
| storage_options: dict[str, Any] | None = None | ||
| transport: Literal["auto", "fsspec", "pipe"] = "auto" | ||
| limit: int | None = None | ||
| name: str = "audio_manifest_reader" | ||
|
|
||
| def __post_init__(self) -> None: | ||
| super().__init__() | ||
| if not self.manifest_paths: | ||
| msg = "manifest_paths is required for AudioManifestReader" | ||
| raise ValueError(msg) | ||
|
|
||
| def decompose(self) -> list[ProcessingStage]: | ||
| return [ | ||
| FilePartitioningStage( | ||
| file_paths=self.manifest_paths, | ||
| files_per_partition=self.files_per_partition, | ||
| blocksize=self.blocksize, | ||
| file_extensions=self.file_extensions, | ||
| storage_options=self.storage_options, | ||
| limit=self.limit, | ||
| ), | ||
| AudioManifestReaderStage( | ||
| fields=self.fields, | ||
| storage_options=self.storage_options, | ||
| transport=self.transport, | ||
| ), | ||
| ] | ||
|
|
||
| def get_description(self) -> str: | ||
| parts = [f"Read audio manifests from {self.manifest_paths}"] | ||
| if self.files_per_partition: | ||
| parts.append(f"with {self.files_per_partition} files per partition") | ||
| elif self.blocksize: | ||
| parts.append(f"with target blocksize {self.blocksize}") | ||
| return ", ".join(parts) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the existing manifest reader?