diff --git a/examples/speculative_decoding/guides/CR2_eagle_config.json b/examples/speculative_decoding/guides/CR2_eagle_config.json new file mode 100644 index 00000000000..9150b747aac --- /dev/null +++ b/examples/speculative_decoding/guides/CR2_eagle_config.json @@ -0,0 +1,15 @@ +{ + "draft_vocab_size": 32000, + "initializer_range": 0.02, + "rms_norm_eps": 1e-06, + "_attn_implementation": "flex_attention", + "rope_scaling": { + "beta_fast": 32.0, + "beta_slow": 1.0, + "factor": 32.0, + "original_max_position_embeddings": 8192, + "rope_type": "yarn", + "truncate": false + }, + "rope_theta": 150000 +} diff --git a/examples/speculative_decoding/guides/nemotron_mapping.bin b/examples/speculative_decoding/guides/nemotron_mapping.bin new file mode 100644 index 00000000000..d260fd43b6f Binary files /dev/null and b/examples/speculative_decoding/guides/nemotron_mapping.bin differ diff --git a/examples/speculative_decoding/guides/train_eagle_head_cosmos_reason2.ipynb b/examples/speculative_decoding/guides/train_eagle_head_cosmos_reason2.ipynb new file mode 100644 index 00000000000..f5fb5bf2ee1 --- /dev/null +++ b/examples/speculative_decoding/guides/train_eagle_head_cosmos_reason2.ipynb @@ -0,0 +1,260 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Training an EAGLE3 Draft Head for Cosmos-Reason2\n", + "\n", + "This notebook walks through the full workflow for training an EAGLE3 speculative-decoding draft head on top of [nvidia/Cosmos-Reason2-8B](https://huggingface.co/nvidia/Cosmos-Reason2-8B).\n", + "\n", + "**Workflow overview**\n", + "\n", + "| Step | Description |\n", + "| :---: | :--- |\n", + "| 1 | Install dependencies |\n", + "| 2 | Authenticate with Hugging Face |\n", + "| 3 | Prepare training data from the Nemotron dataset |\n", + "| 4 | Calibrate the draft vocabulary |\n", + "| 5 | Launch training |\n", + "| 6 | Export checkpoint for deployment |\n", + "\n", + "> **Hardware requirement** – Cosmos-Reason2-8B requires at least one 80 GB GPU (e.g. H100/A100).\n", + "> Multi-GPU training is supported automatically via FSDP2 when more than one GPU is available." + ], + "id": "efe23925" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1 – Install Dependencies" + ], + "id": "e64d39b5" + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "%%bash\n", + "pip install -U nvidia-modelopt[hf]\n", + "pip install -r ../requirements.txt" + ], + "execution_count": null, + "outputs": [], + "id": "f0049171" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2 – Authenticate with Hugging Face\n", + "\n", + "Both `nvidia/Cosmos-Reason2-8B` and `nvidia/Nemotron-Post-Training-Dataset-v2` require accepting\n", + "their licence agreements on the Hub. Run the cell below and follow the interactive prompt to log in:" + ], + "id": "fe68982a" + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "%%bash\n", + "hf auth login" + ], + "execution_count": null, + "outputs": [], + "id": "b62417b6" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3 – Prepare Training Data\n", + "\n", + "We use a curated subset of [nvidia/Nemotron-Post-Training-Dataset-v2](https://huggingface.co/datasets/nvidia/Nemotron-Post-Training-Dataset-v2)\n", + "(chat split) for training. The `nemotron_mapping.bin` file (bundled alongside this notebook) selects the specific rows to use.\n", + "It stores 0-based dataset row indices as packed `int32` values (little-endian, produced by `numpy.ndarray.tofile`).\n", + "\n", + "The script streams only the required parquet shards and writes a conversation file in the\n", + "standard `jsonl` format expected by `launch_train.sh`." + ], + "id": "cdd4d470" + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "%%bash\n", + "python ../prepare_input_conversations/add_nemotron_chat.py \\\n", + " --mapping-file nemotron_mapping.bin" + ], + "execution_count": null, + "outputs": [], + "id": "32259e23" + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "%%bash\n", + "# Expect exactly 89511 conversations.\n", + "count=$(wc -l < input_conversations/nemotron-chat.jsonl)\n", + "echo \"${count} conversations in ../input_conversations/nemotron-chat.jsonl\"\n", + "[ \"$count\" -eq 89511 ] || { echo \"ERROR: expected 89511, got ${count}\"; exit 1; }" + ], + "execution_count": null, + "outputs": [], + "id": "d05b97d3" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4 – Calibrate the Draft Vocabulary\n", + "\n", + "`CR2_eagle_config.json` sets `\"draft_vocab_size\": 32000`. Using a compressed vocabulary\n", + "speeds up training and inference, but requires a one-time calibration step that produces a\n", + "token-mapping file (`d2t.pt`)." + ], + "id": "09717fcc" + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "%%bash\n", + "python ../scripts/calibrate_draft_vocab.py \\\n", + " --model nvidia/Cosmos-Reason2-8B \\\n", + " --data input_conversations/nemotron-chat.jsonl \\\n", + " --draft_vocab_size 32000 \\\n", + " --save_dir draft_vocab_cache" + ], + "execution_count": null, + "outputs": [], + "id": "388f6897" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5 – Train the EAGLE3 Draft Head\n", + "\n", + "Training is launched via `launch_train.sh`, which internally calls `accelerate launch main.py`\n", + "and sets up FSDP2 automatically when multiple GPUs are available.\n", + "\n", + "Key arguments used for Cosmos-Reason2:\n", + "\n", + "| Argument | Value | Notes |\n", + "| :--- | :--- | :--- |\n", + "| `--model` | `nvidia/Cosmos-Reason2-8B` | Target VLM |\n", + "| `--data` | `guides/input_conversations/nemotron-chat.jsonl` | Training conversations |\n", + "| `--eagle_config` | `guides/CR2_eagle_config.json` | Draft-head architecture |\n", + "| `--draft_vocab_cache` | `guides/draft_vocab_cache/Cosmos-Reason2-8B/d2t.pt` | Token-mapping from Step 4 |\n", + "| `--vlm_processor` | `nvidia/Cosmos-Reason2-8B` | VLM image processor |\n", + "| `--vlm_img_dir` | `data/` | Directory containing referenced images |\n", + "| `--training_seq_len` | `16384` | Max token length per sample (lower to save GPU memory or speed up training) |\n", + "| `--lr` | `1.5e-4` | Learning rate |\n", + "| `--num_epochs` | `20` | Training epochs |\n", + "| `--train_bs` | `1` | Per-device batch size |\n", + "| `--save_steps` | `1000` | Checkpoint frequency |\n", + "| `--ar_validate_steps` | `1000000` | Effectively disables in-training AR validation |\n", + "\n", + "> **Tip** – Set `--ar_validate_steps` to a smaller value (e.g. `500`) to periodically measure\n", + "> acceptance rate on MT-Bench during training." + ], + "id": "336c43b9" + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "%%bash\n", + "export WANDB_MODE=disabled\n", + "OUTPUT_DIR=ckpts/cosmos-reason2-8b-eagle3\n", + "EAGLE_CONFIG=guides/CR2_eagle_config.json\n", + "DRAFT_VOCAB_CACHE=guides/draft_vocab_cache/Cosmos-Reason2-8B/d2t.pt\n", + "\n", + "\n", + "# 20 epochs on 89k samples (4xB100): ~24 hours.\n", + "cd ..; OUTPUT_DIR=$OUTPUT_DIR ./launch_train.sh \\\n", + " --model nvidia/Cosmos-Reason2-8B \\\n", + " --output_dir $OUTPUT_DIR \\\n", + " --data guides/input_conversations/nemotron-chat.jsonl \\\n", + " --lr 1.5e-4 \\\n", + " --num_epochs 20 \\\n", + " --train_bs 1 \\\n", + " --eagle_config $EAGLE_CONFIG \\\n", + " --draft_vocab_cache $DRAFT_VOCAB_CACHE \\\n", + " --training_seq_len 16384 \\\n", + " --save_steps 1000 \\\n", + " --ar_validate_steps 1000000 \\\n", + " --vlm_processor nvidia/Cosmos-Reason2-8B \\\n", + " --vlm_img_dir data/" + ], + "execution_count": null, + "outputs": [], + "id": "0380f773" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 6 – Export Checkpoint for Deployment\n", + "\n", + "After training completes, convert the ModelOpt checkpoint to the Hugging Face–compatible\n", + "format expected by vLLM. Point `--model_path` to the desired checkpoint subdirectory\n", + "(e.g. `checkpoint-110000`)." + ], + "id": "98e0f8c4" + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "%%bash\n", + "CKPT_DIR=ckpts/cosmos-reason2-8b-eagle3/checkpoint-110000\n", + "EXPORT_PATH=export/cosmos-reason2-8b-eagle3\n", + "\n", + "python scripts/export_hf_checkpoint.py \\\n", + " --model_path $CKPT_DIR \\\n", + " --export_path $EXPORT_PATH" + ], + "execution_count": null, + "outputs": [], + "id": "63880f67" + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Deployment\n", + "\n", + "The exported checkpoint can be served directly with **vLLM**:\n", + "\n", + "```bash\n", + "vllm serve nvidia/Cosmos-Reason2-8B \\\n", + " --host 0.0.0.0 \\\n", + " --port 8000 \\\n", + " --speculative_config '{\"method\": \"eagle3\", \"model\": \"export/cosmos-reason2-8b-eagle3\", \"num_speculative_tokens\": 3}'\n", + "```\n", + "\n", + "Refer to the [vLLM speculative decoding docs](https://docs.vllm.ai/en/latest/features/spec_decode/) for the full list of options." + ], + "id": "413c4275" + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/examples/speculative_decoding/prepare_input_conversations/add_nemotron_chat.py b/examples/speculative_decoding/prepare_input_conversations/add_nemotron_chat.py new file mode 100644 index 00000000000..b293e6c4d87 --- /dev/null +++ b/examples/speculative_decoding/prepare_input_conversations/add_nemotron_chat.py @@ -0,0 +1,241 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Add Nemotron-Post-Training-Dataset-v2 chat conversations to a conversation dataset. + +Dataset: https://huggingface.co/datasets/nvidia/Nemotron-Post-Training-Dataset-v2 + +Note: This dataset requires agreeing to the terms of use on Hugging Face. + Make sure you are logged in with `huggingface-cli login` before running this script. +""" + +from __future__ import annotations + +import argparse +import itertools +from pathlib import Path + +import numpy as np +from datasets import load_dataset +from huggingface_hub import hf_hub_url, list_repo_files +from tqdm import tqdm +from utils import ( + dataset_splits_explanation, + id_for_conversation, + update_dataset_file_with_conversations, +) + +NEMOTRON_DATASET_ID = "nvidia/Nemotron-Post-Training-Dataset-v2" +NEMOTRON_CHAT_SPLIT = "chat" + + +def get_split_parquet_urls(dataset_id: str, split_name: str) -> list[str]: + """Return the HuggingFace Hub download URLs for parquet shards of *split_name* only. + + Files in this dataset follow the flat naming pattern: + ``data/{split_name}-XXXXX-of-XXXXX.parquet`` (no per-split subdirectory). + We resolve them to full Hub URLs so they can be passed to + ``load_dataset("parquet", data_files=…)`` which skips the repo's split + metadata validation and downloads only the requested shards. + + Raises: + ValueError: If no parquet files are found for the requested split. + """ + all_files = list(list_repo_files(dataset_id, repo_type="dataset")) + split_files = sorted( + f for f in all_files if f.endswith(".parquet") and Path(f).name.startswith(f"{split_name}-") + ) + if not split_files: + err_msg = ( + f"No parquet files found for split '{split_name}' in {dataset_id}. " + f"Available files: {[f for f in all_files if f.endswith('.parquet')]}" + ) + raise ValueError(err_msg) + return [hf_hub_url(dataset_id, f, repo_type="dataset") for f in split_files] + + +def parse_args() -> argparse.Namespace: + """Parse command-line arguments.""" + parser = argparse.ArgumentParser( + description="Load Nemotron-Post-Training-Dataset-v2 chat conversations." + ) + + parser.add_argument( + "--max-samples", + type=int, + default=None, + help=( + "Maximum number of samples to load from the dataset. " + "The chat split contains ~627,720 samples. " + "If not provided, all samples are loaded." + ), + ) + + parser.add_argument( + "--mapping-file", + type=Path, + default=None, + help=( + "Path to a binary file containing 0-based dataset row indices stored as int32 " + "(produced by numpy.ndarray.tofile with dtype='int32'). " + "Rows are loaded in the order they appear in the file. " + "When provided, the dataset is downloaded (not streamed) to allow random access " + "and --max-samples is ignored." + ), + ) + + parser.add_argument( + "--output-split-name", + type=str, + default="nemotron-chat", + help=dataset_splits_explanation("nemotron-chat"), + ) + + parser.add_argument( + "--output-dir", + type=Path, + default=Path("input_conversations/"), + help="Path to save the conversations file(s) into. Default is 'input_conversations/'.", + ) + + return parser.parse_args() + + +def parse_nemotron_conversation(raw_conversations: list) -> list[dict] | None: + """Parse a Nemotron conversation into a list of messages with standardized roles. + + Args: + raw_conversations: List of message dicts from the Nemotron dataset. + + Returns: + List of parsed message dicts with 'role' and 'content' keys, or None if the + conversation should be skipped. + """ + msgs = [] + for msg in raw_conversations: + # Resolve role field (datasets may use "from" or "role") + raw_role = msg.get("from") or msg.get("role") + if not isinstance(raw_role, str): + continue + role = raw_role.lower() + + # Normalize role names to standard values + if role in ("human", "user"): + role = "user" + elif role in ("gpt", "assistant"): + role = "assistant" + elif role == "system": + # Skip system messages; they are metadata not part of the conversation turns + continue + else: + # Skip unrecognized roles rather than failing, as the dataset may evolve + continue + + # Resolve content field + if "value" in msg: + content = msg["value"] + elif "content" in msg: + content = msg["content"] + elif "text" in msg: + content = msg["text"] + else: + continue + + content = content.strip() + if content: + msgs.append({"role": role, "content": content}) + + return msgs if msgs else None + + +async def main(args: argparse.Namespace) -> None: + # Resolve Hub download URLs for chat parquet shards only. + # Using load_dataset("parquet", data_files=urls) bypasses the repo's split + # metadata validation, so only the chat shards are ever downloaded. + print(f"Resolving chat parquet URLs in {NEMOTRON_DATASET_ID}...") + chat_parquet_urls = get_split_parquet_urls(NEMOTRON_DATASET_ID, NEMOTRON_CHAT_SPLIT) + print(f"Found {len(chat_parquet_urls)} parquet shard(s) for the '{NEMOTRON_CHAT_SPLIT}' split.") + + if args.mapping_file is not None: + # --- Mapping mode: download (and cache) only the chat parquet shards, + # then use ds.select() for fast index-based random access. + # The chat data is cached after the first run; ds.select() is near-instant + # on subsequent runs. --- + if not args.mapping_file.exists(): + err_msg = f"Mapping file {args.mapping_file} does not exist." + raise FileNotFoundError(err_msg) + + ordered_source_indices: list[int] = np.fromfile(args.mapping_file, dtype="int32").tolist() + print(f"Mapping file loaded: {len(ordered_source_indices)} entries.") + + ds = load_dataset( + "parquet", + data_files={"train": chat_parquet_urls}, + split="train", + streaming=False, + ) + # ds.select() preserves the given order of indices and operates on the + # locally cached Arrow data, so it is fast even for large index lists. + ds_subset = ds.select(ordered_source_indices) + iterable = enumerate( + tqdm(ds_subset, desc="Processing mapped samples", total=len(ordered_source_indices)) + ) + else: + # --- Streaming mode: fetch chat shards on demand, no upfront download --- + print(f"Streaming {NEMOTRON_DATASET_ID} (split={NEMOTRON_CHAT_SPLIT})...") + ds = load_dataset( + "parquet", + data_files={"train": chat_parquet_urls}, + split="train", + streaming=True, + ) + stream = itertools.islice(ds, args.max_samples) # islice(ds, None) = full split + iterable = enumerate(tqdm(stream, desc="Loading Nemotron chat", total=args.max_samples)) + + input_conversations: list[dict] = [] + skipped = 0 + for i, sample in iterable: + raw_conversations = sample.get("conversations") or sample.get("messages") or [] + if not raw_conversations: + skipped += 1 + continue + + msgs = parse_nemotron_conversation(raw_conversations) + if not msgs: + skipped += 1 + continue + + # Build a unique conversation ID, incorporating any source ID if available. + source_id = sample.get("id") or sample.get("conversation_id") or f"{i:06}" + conv_hash = id_for_conversation(msgs) + cid = f"nemotron-chat-{source_id}_{conv_hash}" + + input_conversations.append({"conversation_id": cid, "conversations": msgs}) + + print( + f"Loaded {len(input_conversations)} conversations from Nemotron chat " + f"(skipped {skipped} empty/invalid entries)." + ) + + update_dataset_file_with_conversations( + input_conversations, args.output_dir, args.output_split_name + ) + + +if __name__ == "__main__": + import asyncio + + args = parse_args() + asyncio.run(main(args))