|
| 1 | +# NeMo Curator — Agent & AI Contributor Guide |
| 2 | + |
| 3 | +This file provides guidance for AI coding agents (Copilot, Claude Code, Cursor, etc.) and their operators contributing to NeMo Curator. Read it before writing any code or opening a PR. |
| 4 | + |
| 5 | +--- |
| 6 | + |
| 7 | +## Quick Reference |
| 8 | + |
| 9 | +```bash |
| 10 | +uv sync --extra all # install all dependencies |
| 11 | +pre-commit run --all-files # lint + format |
| 12 | +pytest -m "not gpu" # run CPU tests |
| 13 | +pytest # run all tests (requires GPU) |
| 14 | +``` |
| 15 | + |
| 16 | +--- |
| 17 | + |
| 18 | +## Environment Setup |
| 19 | + |
| 20 | +**Always use `uv`. Never use bare `pip install` or `python -m pip`.** |
| 21 | + |
| 22 | +```bash |
| 23 | +# Install uv (one-time) |
| 24 | +pip install uv |
| 25 | + |
| 26 | +# Install project + all dev dependencies |
| 27 | +uv sync --extra all |
| 28 | + |
| 29 | +# Install just a specific modality |
| 30 | +uv sync --extra text_cuda12 |
| 31 | +uv sync --extra audio_cpu --extra video_cpu |
| 32 | +``` |
| 33 | + |
| 34 | +Available extras: `text_cpu`, `text_cuda12`, `audio_cpu`, `audio_cuda12`, `image_cpu`, `image_cuda12`, `video_cpu`, `video_cuda12`, `deduplication_cuda12`, `sdg_cpu`, `sdg_cuda12`, `interleaved_cpu`, `interleaved_cuda12`, `all`. |
| 35 | + |
| 36 | +**Set up pre-commit hooks (required before committing):** |
| 37 | + |
| 38 | +```bash |
| 39 | +pip install pre-commit |
| 40 | +pre-commit install --install-hooks |
| 41 | +``` |
| 42 | + |
| 43 | +--- |
| 44 | + |
| 45 | +## Running Tests |
| 46 | + |
| 47 | +```bash |
| 48 | +# CPU-only (no GPU required) |
| 49 | +pytest -m "not gpu" |
| 50 | + |
| 51 | +# Full suite (requires NVIDIA GPU, Volta or newer, CUDA 12.x) |
| 52 | +pytest |
| 53 | + |
| 54 | +# Specific folder |
| 55 | +pytest tests/stages/text/ |
| 56 | +pytest tests/stages/audio/ |
| 57 | + |
| 58 | +# With coverage (what CI runs) |
| 59 | +coverage run --branch --source=nemo_curator -m pytest -m "not gpu" |
| 60 | +coverage report |
| 61 | +``` |
| 62 | + |
| 63 | +**Test layout mirrors source layout.** New stage in `nemo_curator/stages/text/filters/` → tests in `tests/stages/text/filters/`. GPU-only tests are marked `@pytest.mark.gpu`. CPU tests must pass without any GPU. |
| 64 | + |
| 65 | +**Coverage requirement: 80% on changed lines.** CI will block PRs that don't meet this. |
| 66 | + |
| 67 | +--- |
| 68 | + |
| 69 | +## Linting and Formatting |
| 70 | + |
| 71 | +NeMo Curator uses `ruff` for both linting and formatting (line length: 119). The pre-commit hooks run it automatically on commit. |
| 72 | + |
| 73 | +```bash |
| 74 | +# Run all checks manually |
| 75 | +pre-commit run --all-files |
| 76 | + |
| 77 | +# Auto-fix what ruff can fix |
| 78 | +ruff check --fix . |
| 79 | +ruff format . |
| 80 | +``` |
| 81 | + |
| 82 | +Notable ruff config (from `pyproject.toml`): |
| 83 | +- Docstrings are **required** for all public classes and methods. |
| 84 | +- Use `loguru.logger`, not `print()`. |
| 85 | +- Full type annotations required on all functions. |
| 86 | +- `tests/` has relaxed rules (asserts, magic values allowed). |
| 87 | + |
| 88 | +--- |
| 89 | + |
| 90 | +## Architecture Overview |
| 91 | + |
| 92 | +NeMo Curator is built around four core abstractions. Understand these before writing any stage or pipeline code. |
| 93 | + |
| 94 | +### 1. Task (`nemo_curator/tasks/`) |
| 95 | + |
| 96 | +A **Task** is the unit of data flowing through a pipeline — a typed batch of data for a single processing step. |
| 97 | + |
| 98 | +```python |
| 99 | +@dataclass |
| 100 | +class Task(ABC, Generic[T]): |
| 101 | + task_id: str |
| 102 | + dataset_name: str |
| 103 | + data: T # payload (DataFrame, Table, etc.) |
| 104 | + _stage_perf: list[StagePerfStats] # auto-populated by executor |
| 105 | + _metadata: dict[str, Any] # arbitrary key-value metadata |
| 106 | + |
| 107 | + @property |
| 108 | + @abstractmethod |
| 109 | + def num_items(self) -> int: ... |
| 110 | + |
| 111 | + @abstractmethod |
| 112 | + def validate(self) -> bool: ... |
| 113 | +``` |
| 114 | + |
| 115 | +Pre-built task types: `DocumentBatch` (text), `AudioTask`, `ImageBatch`, `VideoBatch`, `InterleavedBatch`, `FileGroupTask`. |
| 116 | + |
| 117 | +### 2. ProcessingStage (`nemo_curator/stages/base.py`) |
| 118 | + |
| 119 | +A **ProcessingStage** transforms one task type to another. This is where all curation logic lives. |
| 120 | + |
| 121 | +```python |
| 122 | +class ProcessingStage(ABC, Generic[X, Y], metaclass=StageMeta): |
| 123 | + # ── Class attributes ── set these, never override as @property ── |
| 124 | + name = "MyStage" # unique string identifier (required) |
| 125 | + resources = Resources(cpus=1.0) # declare CPU/GPU needs |
| 126 | + batch_size = 1 # tasks per batch |
| 127 | + runtime_env: ClassVar[dict | None] = None # Ray runtime env |
| 128 | + |
| 129 | + @abstractmethod |
| 130 | + def process(self, task: X) -> Y | list[Y] | None: |
| 131 | + """1-to-1, 1-to-many, or filter (return None to drop task).""" |
| 132 | + |
| 133 | + # Optional overrides: |
| 134 | + def process_batch(self, tasks: list[X]) -> list[Y]: ... # vectorized processing |
| 135 | + def inputs(self) -> tuple[list[str], list[str]]: ... # declare required attrs/columns |
| 136 | + def outputs(self) -> tuple[list[str], list[str]]: ... # declare produced attrs/columns |
| 137 | + def setup(self, worker_metadata) -> None: ... # called once per worker |
| 138 | + def setup_on_node(self, node_info, worker_metadata) -> None: ... # called once per node |
| 139 | + def teardown(self) -> None: ... # cleanup after processing |
| 140 | +``` |
| 141 | + |
| 142 | +**Critical rules for stages:** |
| 143 | +- Set `name`, `resources`, `batch_size`, `runtime_env` as **plain class attributes**, never as `@property`. |
| 144 | +- Never override `_name`, `_resources`, or `_batch_size` — these are `@final` properties on the base class and will raise `TypeError`. |
| 145 | +- Concrete (non-abstract) stages are **automatically registered** in `_STAGE_REGISTRY` by `StageMeta`. No manual registration needed. |
| 146 | +- All stages **must be idempotent and retry-safe**. Executors (Ray, Xenna) can preempt and reschedule tasks mid-execution. |
| 147 | + |
| 148 | +**Customizing a stage without subclassing** — use `with_()`: |
| 149 | + |
| 150 | +```python |
| 151 | +stage = MyStage().with_( |
| 152 | + resources=Resources(gpu_memory_gb=40.0), |
| 153 | + batch_size=32, |
| 154 | +) |
| 155 | +``` |
| 156 | + |
| 157 | +### 3. CompositeStage (`nemo_curator/stages/base.py`) |
| 158 | + |
| 159 | +A **CompositeStage** is a user-facing stage that decomposes into multiple execution stages at pipeline-build time. It is never executed directly. Use it when a logical operation requires multiple internal stages (e.g., identify + remove for deduplication). |
| 160 | + |
| 161 | +```python |
| 162 | +class MyCompositeStage(CompositeStage[DocumentBatch, DocumentBatch]): |
| 163 | + name = "MyCompositeStage" |
| 164 | + |
| 165 | + def decompose(self) -> list[ProcessingStage]: |
| 166 | + return [IdentifyStage(), RemoveStage()] |
| 167 | +``` |
| 168 | + |
| 169 | +### 4. Pipeline (`nemo_curator/pipeline/pipeline.py`) |
| 170 | + |
| 171 | +```python |
| 172 | +pipeline = Pipeline("name", "description") |
| 173 | +pipeline.add_stage(Stage1()).add_stage(Stage2()).add_stage(Stage3()) |
| 174 | +pipeline.build() # decomposes CompositeStages into execution stages |
| 175 | +``` |
| 176 | + |
| 177 | +### 5. Executors (`nemo_curator/backends/`) |
| 178 | + |
| 179 | +Executors run pipelines on a backend. Two backends exist: |
| 180 | +- `RayDataExecutor` — Ray Data streaming pipeline (text/image/audio/video) |
| 181 | +- `XennaExecutor` — Cosmos Xenna backend for production deployments |
| 182 | + |
| 183 | +See `tutorials/quickstart.py` for a complete working example. |
| 184 | + |
| 185 | +--- |
| 186 | + |
| 187 | +## Adding a New Stage (Step-by-Step) |
| 188 | + |
| 189 | +1. **Pick the right location**: `nemo_curator/stages/{modality}/{category}/your_stage.py` |
| 190 | + - e.g., a new text filter → `nemo_curator/stages/text/filters/my_filter.py` |
| 191 | + |
| 192 | +2. **Write the stage**: |
| 193 | + |
| 194 | +```python |
| 195 | +from nemo_curator.stages.base import ProcessingStage |
| 196 | +from nemo_curator.stages.resources import Resources |
| 197 | +from nemo_curator.tasks.document import DocumentBatch |
| 198 | + |
| 199 | + |
| 200 | +class MyFilter(ProcessingStage[DocumentBatch, DocumentBatch]): |
| 201 | + """One-line summary for docs. |
| 202 | +
|
| 203 | + Longer description if needed. |
| 204 | + """ |
| 205 | + |
| 206 | + name = "MyFilter" |
| 207 | + resources = Resources(cpus=1.0) |
| 208 | + |
| 209 | + def inputs(self) -> tuple[list[str], list[str]]: |
| 210 | + return [], ["text"] # requires task.data to have a "text" column |
| 211 | + |
| 212 | + def outputs(self) -> tuple[list[str], list[str]]: |
| 213 | + return [], ["text"] |
| 214 | + |
| 215 | + def process(self, task: DocumentBatch) -> DocumentBatch | None: |
| 216 | + filtered = task.data[task.data["text"].str.len() > 10] |
| 217 | + if filtered.empty: |
| 218 | + return None # drop the task |
| 219 | + return DocumentBatch( |
| 220 | + task_id=task.task_id, |
| 221 | + dataset_name=task.dataset_name, |
| 222 | + data=filtered, |
| 223 | + ) |
| 224 | +``` |
| 225 | + |
| 226 | +3. **Export it** — add to the appropriate `__init__.py` so users can import it. |
| 227 | + |
| 228 | +4. **Write tests** — mirror the source path: `tests/stages/text/filters/test_my_filter.py`. |
| 229 | + - Test CPU path without any GPU dependency. |
| 230 | + - Mark GPU-only tests with `@pytest.mark.gpu`. |
| 231 | + - Aim for ≥80% coverage of your new code. |
| 232 | + |
| 233 | +5. **Run checks before committing**: |
| 234 | + |
| 235 | +```bash |
| 236 | +pre-commit run --all-files |
| 237 | +pytest tests/stages/text/filters/test_my_filter.py |
| 238 | +``` |
| 239 | + |
| 240 | +--- |
| 241 | + |
| 242 | +## Declaring GPU Resources |
| 243 | + |
| 244 | +Declare GPU needs in `resources` so the executor schedules correctly: |
| 245 | + |
| 246 | +```python |
| 247 | +# Single GPU stage |
| 248 | +resources = Resources(gpu_memory_gb=40.0) |
| 249 | + |
| 250 | +# Multi-GPU stage |
| 251 | +resources = Resources(gpus=4) |
| 252 | + |
| 253 | +# CPU only (default) |
| 254 | +resources = Resources(cpus=1.0) |
| 255 | +``` |
| 256 | + |
| 257 | +--- |
| 258 | + |
| 259 | +## Logging |
| 260 | + |
| 261 | +Use `loguru.logger`. Never use `print()`. |
| 262 | + |
| 263 | +```python |
| 264 | +from loguru import logger |
| 265 | + |
| 266 | +logger.info("Processing task {}", task.task_id) |
| 267 | +logger.warning("Dropping empty batch for {}", task.dataset_name) |
| 268 | +logger.debug("Scores: {}", scores) |
| 269 | +``` |
| 270 | + |
| 271 | +--- |
| 272 | + |
| 273 | +## Commit Requirements |
| 274 | + |
| 275 | +**All commits must be signed (DCO) and signed-off:** |
| 276 | + |
| 277 | +```bash |
| 278 | +git commit -sS -m "feat: add MyFilter stage for text quality filtering" |
| 279 | +# ^^ |
| 280 | +# -s = --signoff (adds Signed-off-by trailer, required by DCO) |
| 281 | +# -S = GPG sign (required if you have commit signing configured) |
| 282 | +``` |
| 283 | + |
| 284 | +If you forget the `-s`: |
| 285 | + |
| 286 | +```bash |
| 287 | +git reset --soft HEAD~1 |
| 288 | +git add <files> |
| 289 | +git commit -sS -m "your message" |
| 290 | +``` |
| 291 | + |
| 292 | +**Commits without `Signed-off-by` will be rejected by CI.** |
| 293 | + |
| 294 | +--- |
| 295 | + |
| 296 | +## Updating Dependencies |
| 297 | + |
| 298 | +When you change `pyproject.toml`, regenerate the lock file: |
| 299 | + |
| 300 | +```bash |
| 301 | +uv lock |
| 302 | +git add pyproject.toml uv.lock |
| 303 | +git commit -s -m "chore: update dependencies" |
| 304 | +``` |
| 305 | + |
| 306 | +The `uv-lock` pre-commit hook will auto-regenerate and block the commit if the lock file is stale — stage the generated file and commit again. |
| 307 | + |
| 308 | +--- |
| 309 | + |
| 310 | +## Pull Request Requirements |
| 311 | + |
| 312 | +1. **One PR does one thing.** "What does this PR do?" must have a clear one-sentence answer. |
| 313 | +2. **All CI checks must pass** — linting, CPU tests, 80% coverage on changed lines. |
| 314 | +3. **Target `main`.** |
| 315 | +4. **Disclose AI assistance.** If an AI agent wrote or materially assisted with the code, include in the PR description: |
| 316 | + |
| 317 | + > *This PR was created with AI assistance (Claude Code / GitHub Copilot / etc.).* |
| 318 | +
|
| 319 | + And add a co-authorship trailer to the commit: |
| 320 | + |
| 321 | + ``` |
| 322 | + Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> |
| 323 | + ``` |
| 324 | + |
| 325 | +5. **Do not open a PR for trivial formatting-only changes** (single typos, whitespace) unless they are part of a larger change. |
| 326 | + |
| 327 | +--- |
| 328 | + |
| 329 | +## What NOT to Do |
| 330 | + |
| 331 | +| Don't | Do instead | |
| 332 | +|-------|-----------| |
| 333 | +| `pip install ...` | `uv sync --extra <group>` | |
| 334 | +| `python script.py` | `uv run python script.py` | |
| 335 | +| Define `name` as `@property` on a stage | Use a plain class attribute: `name = "MyStage"` | |
| 336 | +| Override `_name`, `_resources`, or `_batch_size` | Use `name`, `resources`, `batch_size` class attributes | |
| 337 | +| Call `process()` directly on a `CompositeStage` | Let the Pipeline's `build()` decompose it | |
| 338 | +| Use `print()` for logging | Use `loguru.logger` | |
| 339 | +| Commit without `-s` (signoff) | `git commit -sS ...` | |
| 340 | +| Open a PR without tests | Write tests in `tests/` mirroring the source path | |
| 341 | +| Add heavy imports at module level for optional deps | Use lazy imports inside functions | |
| 342 | + |
| 343 | +--- |
| 344 | + |
| 345 | +## Key File Locations |
| 346 | + |
| 347 | +| What | Where | |
| 348 | +|------|-------| |
| 349 | +| Stage base class | `nemo_curator/stages/base.py` | |
| 350 | +| Resources dataclass | `nemo_curator/stages/resources.py` | |
| 351 | +| Task base class | `nemo_curator/tasks/tasks.py` | |
| 352 | +| DocumentBatch (text) | `nemo_curator/tasks/document.py` | |
| 353 | +| Pipeline | `nemo_curator/pipeline/pipeline.py` | |
| 354 | +| RayData executor | `nemo_curator/backends/ray_data/executor.py` | |
| 355 | +| Xenna executor | `nemo_curator/backends/xenna/` | |
| 356 | +| Text stages | `nemo_curator/stages/text/` | |
| 357 | +| Audio stages | `nemo_curator/stages/audio/` | |
| 358 | +| Image stages | `nemo_curator/stages/image/` | |
| 359 | +| Video stages | `nemo_curator/stages/video/` | |
| 360 | +| Quickstart tutorial | `tutorials/quickstart.py` | |
| 361 | +| Ruff config | `pyproject.toml` → `[tool.ruff]` | |
| 362 | +| Pre-commit config | `.pre-commit-config.yaml` | |
| 363 | +| CI pipeline | `.github/workflows/cicd-main.yml` | |
| 364 | +| API design doc | `api-design.md` | |
0 commit comments