From 28d43802fbe2e61eafc09d59ad72cd8d307fd66f Mon Sep 17 00:00:00 2001 From: Pei Li Date: Tue, 19 May 2026 13:03:25 +0200 Subject: [PATCH 1/9] Expose enable_sample_tables in prepare_dataset and as --no-sample-tables CLI flag The SqliteIndexWriter already supports enable_sample_tables=False to skip the samples and sample_parts tables, but the flag was not plumbed through BaseWebdatasetFactory.prepare_dataset() or the `energon prepare` CLI. On very large datasets (100M+ samples) the SQLite inserts and post-insert btree builds dominate preparation runtime; for datasets consumed purely by the integer-indexed loader (ShardInfosITarReader), the samples table is not needed at training time. Changes: - prepare_dataset() accepts enable_sample_tables=True (default), passes through to SqliteIndexWriterAggregator. - `energon prepare --no-sample-tables` flag exposes the same option. Cannot be combined with --tar-index-only (which operates on an already-prepared dataset). - append_samples/append_parts become no-ops when the writer was constructed with enable_sample_tables=False, so producers can keep emitting rows without changes. - SqliteIndexWriter always drops stale samples/sample_parts on reset_tables=True, regardless of whether the new run intends to repopulate them. This avoids leaving stale tables when re-preparing a dataset with --no-sample-tables. - Adds a unit test covering the CLI flag, table-absence, and the --no-sample-tables vs --tar-index-only conflict. Signed-off-by: Pei Li --- .../energon/flavors/webdataset/indexing.py | 23 ++++++- .../energon/flavors/webdataset/prepare.py | 9 +++ src/megatron/energon/tools/prepare.py | 19 ++++++ tests/test_dataset.py | 62 +++++++++++++++++++ 4 files changed, 111 insertions(+), 2 deletions(-) diff --git a/src/megatron/energon/flavors/webdataset/indexing.py b/src/megatron/energon/flavors/webdataset/indexing.py index 4e5cebcd..540176fe 100644 --- a/src/megatron/energon/flavors/webdataset/indexing.py +++ b/src/megatron/energon/flavors/webdataset/indexing.py @@ -79,6 +79,10 @@ def __init__( if self.enable_sample_tables: assert self.reset_tables, "Reset tables is required when enabling sample tables" + if self.reset_tables: + # Always drop on reset — stale tables from a previous prepare run with + # enable_sample_tables=True must not survive a re-prepare with + # enable_sample_tables=False (and vice versa). self.db.execute("DROP INDEX IF EXISTS idx_samples_sample_key") self.db.execute("DROP INDEX IF EXISTS idx_samples_by_tar_and_idx") self.db.execute("DROP TABLE IF EXISTS samples") @@ -87,6 +91,7 @@ def __init__( self.db.execute("DROP INDEX IF EXISTS idx_sample_parts_full") self.db.execute("DROP TABLE IF EXISTS sample_parts") + if self.enable_sample_tables: self.db.execute( """ CREATE TABLE IF NOT EXISTS samples ( @@ -139,10 +144,17 @@ def append_samples( self, rows: Sequence["IndexSample"], ) -> None: - """Insert multiple sample rows efficiently.""" + """Insert multiple sample rows efficiently. + + No-op when ``enable_sample_tables`` was set to False at construction time — the + ``samples`` table does not exist in that case. + """ assert self.db is not None, "Database is closed" + if not self.enable_sample_tables: + return + if len(rows) == 0: return @@ -177,10 +189,17 @@ def append_parts( self, rows: Sequence["IndexSamplePart"], ) -> None: - """Insert multiple sample part rows efficiently.""" + """Insert multiple sample part rows efficiently. + + No-op when ``enable_sample_tables`` was set to False at construction time — the + ``sample_parts`` table does not exist in that case. + """ assert self.db is not None, "Database is closed" + if not self.enable_sample_tables: + return + if len(rows) == 0: return diff --git a/src/megatron/energon/flavors/webdataset/prepare.py b/src/megatron/energon/flavors/webdataset/prepare.py index d3a16223..eb311417 100644 --- a/src/megatron/energon/flavors/webdataset/prepare.py +++ b/src/megatron/energon/flavors/webdataset/prepare.py @@ -489,6 +489,7 @@ def prepare_dataset( tar_index_only: bool = False, media_filter: Optional[MediaFilterConfig] = None, fix_duplicates: bool = False, + enable_sample_tables: bool = True, ) -> Tuple[Set[str], List[Tuple[str, int]]]: """ Preprocess the shards and write the split config. Preprocessing is done in parallel. @@ -507,6 +508,13 @@ def prepare_dataset( tar_index_only: Only create tar-index, then exit media_filter: Media filter configuration fix_duplicates: If True, fix duplicate keys in the dataset by renaming the files in the shards. + enable_sample_tables: If True (default), populate the ``samples`` and ``sample_parts`` + tables in the SQLite index. Set to False to skip these tables and their post-insert + btree builds — only the per-tar ``.tar.idx`` files, ``.info.json`` and split config + are produced. Use this for datasets consumed purely by the integer-indexed loader + (``ShardInfosITarReader``); sample-key lookups, polylithic joins and media-metadata + filtering will not work. Substantially reduces preparation time on very large + datasets (100M+ samples) where the SQLite inserts and index builds dominate runtime. Returns: The set of all parts found in the shards. But at most 50. @@ -566,6 +574,7 @@ def prepare_dataset( parent_path / MAIN_FOLDER_NAME / INDEX_SQLITE_FILENAME, total_tasks=len(paths), progress_fn=progress_fn, + enable_sample_tables=enable_sample_tables, enable_media_metadata=media_filter is not None, media_filter=media_filter, ) diff --git a/src/megatron/energon/tools/prepare.py b/src/megatron/energon/tools/prepare.py index 5e6643cf..dd24e73b 100644 --- a/src/megatron/energon/tools/prepare.py +++ b/src/megatron/energon/tools/prepare.py @@ -123,6 +123,17 @@ def printify_json(data: Any) -> Any: help="Only (re)generate the tar-index", is_flag=True, ) +@click.option( + "--no-sample-tables", + help=( + "Skip populating the SQLite samples and sample_parts tables. Only the per-tar " + ".tar.idx files, .info.json and split config are produced. Use for datasets " + "consumed purely by the integer-indexed loader; sample-key lookups, polylithic " + "joins and media-metadata filtering will not work. Substantially reduces " + "preparation time on very large datasets." + ), + is_flag=True, +) @click.option( "--shuffle-tars", help="If set, the tar files will be shuffled before splitting.", @@ -191,6 +202,7 @@ def command( exclude: str, num_workers: int, tar_index_only: bool, + no_sample_tables: bool, shuffle_tars: bool, media_metadata_by_glob: str | None, media_metadata_by_header: bool, @@ -220,6 +232,12 @@ def command( if do_media_metadata and tar_index_only: raise click.UsageError("--media-metadata-by-... cannot be combined with --tar-index-only") + if no_sample_tables and tar_index_only: + raise click.UsageError( + "--no-sample-tables cannot be combined with --tar-index-only " + "(--tar-index-only operates on an already-prepared dataset)" + ) + media_filter_config = ( MediaFilterConfig.parse( media_metadata_by_glob, media_metadata_by_header, media_metadata_by_extension @@ -348,6 +366,7 @@ def progress_fn(els, length=None): workers=num_workers, media_filter=media_filter_config, fix_duplicates=fix_duplicates, + enable_sample_tables=not no_sample_tables, ) found_types = list(found_types) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 01aa74ee..314f613b 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -22,6 +22,7 @@ import numpy as np import torch import webdataset as wds +import click from click.testing import CliRunner from PIL import Image @@ -1803,6 +1804,67 @@ def test_prepare_dataset_noninteractive_crude(self): content = f.read() assert "CrudeWebdataset" in content + def test_prepare_dataset_no_sample_tables(self): + """`--no-sample-tables` skips the SQLite samples/sample_parts tables. + + Verifies that: + - Prepare still succeeds and emits .info.json + the per-tar .tar.idx files. + - The SQLite database exists but does not contain the `samples` or `sample_parts` + tables (the bulk of the SQLite cost on large datasets). + - The flag rejects being combined with `--tar-index-only`. + """ + + import sqlite3 + + runner = CliRunner() + result = runner.invoke( + prepare_command, + [ + str(self.dataset_path), + "--non-interactive", + "--force-overwrite", + "--split-ratio=1,0,0", + "--sample-type=CrudeWebdataset", + "--no-sample-tables", + ], + catch_exceptions=False, + ) + assert result.exit_code == 0, f"Prepare failed: {result.stdout}" + + # .info.json and per-tar .tar.idx files must still exist. + assert (self.dataset_path / MAIN_FOLDER_NAME / ".info.json").is_file() + tar_idx_files = list(self.dataset_path.glob("**/*.tar.idx")) + assert len(tar_idx_files) > 0, "Expected per-tar .tar.idx files to be produced" + + # SQLite file exists, but the samples / sample_parts tables must NOT be created. + sqlite_path = self.dataset_path / MAIN_FOLDER_NAME / "index.sqlite" + assert sqlite_path.is_file() + with sqlite3.connect(str(sqlite_path)) as conn: + tables = { + row[0] + for row in conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ) + } + assert "samples" not in tables, f"unexpected samples table: {tables}" + assert "sample_parts" not in tables, f"unexpected sample_parts table: {tables}" + + # --no-sample-tables + --tar-index-only must be rejected. + result = runner.invoke( + prepare_command, + [ + str(self.dataset_path), + "--non-interactive", + "--no-sample-tables", + "--tar-index-only", + ], + catch_exceptions=True, + ) + assert result.exit_code != 0 + assert "--no-sample-tables cannot be combined with --tar-index-only" in ( + result.stdout + (result.stderr or "") + ) or isinstance(result.exception, click.UsageError) + def test_preview_captioning_dataset(self): runner = CliRunner() result = runner.invoke( From 99998e662e9d3b77cd7ee01cf249f9f68582ad2d Mon Sep 17 00:00:00 2001 From: Pei Li Date: Tue, 19 May 2026 14:38:22 +0200 Subject: [PATCH 2/9] docs: document --no-sample-tables and enable_sample_tables=False Adds user-facing docs for the new option to skip the SQLite samples and sample_parts tables during prepare: - docs/source/basic/data_prep.md: subsection under "index.sqlite and index.uuid" explaining when and why to use `energon prepare --no-sample-tables`, including the trade-off (polylithic joins, FileStore access, and `energon mount` won't work). - docs/source/advanced/data_prep_api.md: section showing the equivalent `BaseWebdatasetFactory.prepare_dataset(..., enable_sample_tables=False)` programmatic usage with the same trade-off note. Signed-off-by: Pei Li --- docs/source/advanced/data_prep_api.md | 22 ++++++++++++++++++++++ docs/source/basic/data_prep.md | 22 ++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/docs/source/advanced/data_prep_api.md b/docs/source/advanced/data_prep_api.md index 8768f7ea..996a4725 100644 --- a/docs/source/advanced/data_prep_api.md +++ b/docs/source/advanced/data_prep_api.md @@ -68,3 +68,25 @@ Then, run the script: if __name__ == "__main__": prepare_one_dataset(Path("/path/to/dataset"), 16, Path("/path/to/template_dir")) ``` + +## Skipping the SQLite samples tables for very large datasets + +`prepare_dataset` accepts an `enable_sample_tables` kwarg (default `True`). On very large datasets (100M+ samples) the SQLite inserts and post-load btree builds for the `samples` and `sample_parts` tables can dominate preparation runtime. If the dataset is consumed purely sequentially via the integer-indexed loader (`ShardInfosITarReader`), those tables are never queried at training time, and you can skip populating them by passing `enable_sample_tables=False`: + +```python +BaseWebdatasetFactory.prepare_dataset( + path, + all_tars, + split_parts_ratio=split_parts_ratio, + progress_fn=progress_fn, + workers=num_workers, + enable_sample_tables=False, +) +``` + +`.tar.idx`, `.info.json` and the split config are still produced. + +```{admonition} Trade-off +:class: warning +With `enable_sample_tables=False`, sample-key lookups are unavailable. This breaks polylithic dataset joins (built via SQL `JOIN` over the `samples` tables), `as_file_store()` / `WebdatasetFileStore` access (used for aux-data on crude datasets and by `energon mount`), and any direct `SqliteIndexReader` queries. Failures are loud (`sqlite3.OperationalError: no such table: samples`). +``` diff --git a/docs/source/basic/data_prep.md b/docs/source/basic/data_prep.md index 73629696..e173bed3 100644 --- a/docs/source/basic/data_prep.md +++ b/docs/source/basic/data_prep.md @@ -686,6 +686,28 @@ The `media_metadata` table is used to store the media metadata for the selected | 00002.mp4 | ... | ... | +#### Skipping the samples tables for very large datasets + +On very large datasets (100M+ samples) the `samples` and `sample_parts` table inserts, combined with the post-load btree index builds over the unique `sample_key`, can dominate `energon prepare` runtime. For datasets that are consumed purely sequentially via the integer-indexed loader (`ShardInfosITarReader` — the default loader for monolithic webdatasets), those tables are never queried at training time: shard cumulative counts come from `.info.json`, and per-sample byte offsets come from the per-tar `.tar.idx` files. Checkpoint/resume uses integer `SliceState` offsets resolved through the same path. + +If you do not need sample-key lookups, you can skip populating the SQLite sample tables: + +```sh +> energon prepare --no-sample-tables /path/to/dataset +``` + +This still produces `.tar.idx`, `.info.json` and the split config — everything the integer-indexed loader needs. + +```{admonition} Trade-off +:class: warning +Datasets prepared with `--no-sample-tables` cannot be used with features that look samples up by `sample_key`. In particular: + +- [Polylithic datasets](aux-data) — the join is built by an SQL `JOIN` over the `samples` tables. +- The {py:meth}`as_file_store ` method and the {py:class}`WebdatasetFileStore `, which are used for aux-data access on [crude datasets](crude-data) and by [`energon mount`](energon-mount). + +Failures are loud (`sqlite3.OperationalError: no such table: samples`), not silent. The flag cannot be combined with `--tar-index-only` (which operates on an already-prepared dataset). +``` + (data-on-disk-jsonl)= ## Dataset Format on Disk for JSONL Datasets From 6c0cf886bc55ea6f01e36cd658030eed9374d7bf Mon Sep 17 00:00:00 2001 From: Pei Li Date: Tue, 19 May 2026 17:02:45 +0200 Subject: [PATCH 3/9] docs: shorten --no-sample-tables sections per review feedback Drop the `:class: warning` admonition (no other doc in the repo uses it that way) and trim both new subsections to focus on the single consequence that matters to users: a dataset prepared with --no-sample-tables / enable_sample_tables=False cannot be used as an auxiliary dataset. Matches the length and style of neighboring subsections. Signed-off-by: Pei Li --- docs/source/advanced/data_prep_api.md | 20 +------------------- docs/source/basic/data_prep.md | 16 ++-------------- 2 files changed, 3 insertions(+), 33 deletions(-) diff --git a/docs/source/advanced/data_prep_api.md b/docs/source/advanced/data_prep_api.md index 996a4725..24a578cd 100644 --- a/docs/source/advanced/data_prep_api.md +++ b/docs/source/advanced/data_prep_api.md @@ -71,22 +71,4 @@ if __name__ == "__main__": ## Skipping the SQLite samples tables for very large datasets -`prepare_dataset` accepts an `enable_sample_tables` kwarg (default `True`). On very large datasets (100M+ samples) the SQLite inserts and post-load btree builds for the `samples` and `sample_parts` tables can dominate preparation runtime. If the dataset is consumed purely sequentially via the integer-indexed loader (`ShardInfosITarReader`), those tables are never queried at training time, and you can skip populating them by passing `enable_sample_tables=False`: - -```python -BaseWebdatasetFactory.prepare_dataset( - path, - all_tars, - split_parts_ratio=split_parts_ratio, - progress_fn=progress_fn, - workers=num_workers, - enable_sample_tables=False, -) -``` - -`.tar.idx`, `.info.json` and the split config are still produced. - -```{admonition} Trade-off -:class: warning -With `enable_sample_tables=False`, sample-key lookups are unavailable. This breaks polylithic dataset joins (built via SQL `JOIN` over the `samples` tables), `as_file_store()` / `WebdatasetFileStore` access (used for aux-data on crude datasets and by `energon mount`), and any direct `SqliteIndexReader` queries. Failures are loud (`sqlite3.OperationalError: no such table: samples`). -``` +Pass `enable_sample_tables=False` to `prepare_dataset` (default `True`) to skip populating the `samples` and `sample_parts` tables. Useful when preparation is bottlenecked by SQLite indexing on very large datasets. A dataset prepared this way cannot be used as an [auxiliary dataset](aux-data). diff --git a/docs/source/basic/data_prep.md b/docs/source/basic/data_prep.md index e173bed3..982b7663 100644 --- a/docs/source/basic/data_prep.md +++ b/docs/source/basic/data_prep.md @@ -688,25 +688,13 @@ The `media_metadata` table is used to store the media metadata for the selected #### Skipping the samples tables for very large datasets -On very large datasets (100M+ samples) the `samples` and `sample_parts` table inserts, combined with the post-load btree index builds over the unique `sample_key`, can dominate `energon prepare` runtime. For datasets that are consumed purely sequentially via the integer-indexed loader (`ShardInfosITarReader` — the default loader for monolithic webdatasets), those tables are never queried at training time: shard cumulative counts come from `.info.json`, and per-sample byte offsets come from the per-tar `.tar.idx` files. Checkpoint/resume uses integer `SliceState` offsets resolved through the same path. - -If you do not need sample-key lookups, you can skip populating the SQLite sample tables: +For very large datasets (100M+ samples), populating the `samples` and `sample_parts` tables can dominate `energon prepare` runtime. If the dataset will only be consumed sequentially (not as an auxiliary dataset), you can skip these tables: ```sh > energon prepare --no-sample-tables /path/to/dataset ``` -This still produces `.tar.idx`, `.info.json` and the split config — everything the integer-indexed loader needs. - -```{admonition} Trade-off -:class: warning -Datasets prepared with `--no-sample-tables` cannot be used with features that look samples up by `sample_key`. In particular: - -- [Polylithic datasets](aux-data) — the join is built by an SQL `JOIN` over the `samples` tables. -- The {py:meth}`as_file_store ` method and the {py:class}`WebdatasetFileStore `, which are used for aux-data access on [crude datasets](crude-data) and by [`energon mount`](energon-mount). - -Failures are loud (`sqlite3.OperationalError: no such table: samples`), not silent. The flag cannot be combined with `--tar-index-only` (which operates on an already-prepared dataset). -``` +A dataset prepared this way cannot be used as an [auxiliary dataset](aux-data). (data-on-disk-jsonl)= ## Dataset Format on Disk for JSONL Datasets From e71d662ea66e3ee18b84a06175de1a4724a10b3a Mon Sep 17 00:00:00 2001 From: Pei Li Date: Tue, 19 May 2026 17:03:11 +0200 Subject: [PATCH 4/9] Raise MissingSamplesTableError on sample-key lookups against no-sample-tables datasets Without this, code paths that load a dataset prepared with --no-sample-tables as auxiliary data (or any other use of the SQLite sample-key index) would fail with the unhelpful raw `sqlite3.OperationalError: no such table: samples`. This adds a new MissingSamplesTableError that names the cause and points to the fix (re-prepare without --no-sample-tables). Every SqliteIndexReader method that queries the samples / sample_parts tables runs a one-time check on first use and raises the descriptive error if the table is absent. get_media_metadata is unaffected since the media_metadata table is independent. Extends the test to verify the error type and message for get_sample_pointer_by_key, get_sample_count, and get_sample_part. Signed-off-by: Pei Li --- .../energon/flavors/webdataset/indexing.py | 46 +++++++++++++++++++ tests/test_dataset.py | 27 +++++++++-- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/src/megatron/energon/flavors/webdataset/indexing.py b/src/megatron/energon/flavors/webdataset/indexing.py index 540176fe..29304c3c 100644 --- a/src/megatron/energon/flavors/webdataset/indexing.py +++ b/src/megatron/energon/flavors/webdataset/indexing.py @@ -27,6 +27,25 @@ def __init__(self, sample_key: str) -> None: self.sample_key = sample_key +class MissingSamplesTableError(RuntimeError): + """Raised when a sample-key operation is attempted on a dataset prepared without sample tables. + + Datasets prepared with ``energon prepare --no-sample-tables`` (or programmatically with + ``prepare_dataset(..., enable_sample_tables=False)``) do not have the ``samples`` / + ``sample_parts`` tables in their SQLite index. Such datasets cannot be used as auxiliary + datasets and do not support sample-key lookups. + """ + + def __init__(self, sqlite_path: "EPath") -> None: + super().__init__( + f"Dataset at {sqlite_path} was prepared without the SQLite samples tables " + f"(`energon prepare --no-sample-tables` / `enable_sample_tables=False`). " + f"Re-prepare the dataset without that option to use it as an auxiliary dataset " + f"or for any sample-key lookup." + ) + self.sqlite_path = sqlite_path + + class SqliteIndexWriter: sqlite_path: EPath db: Optional[sqlite3.Connection] @@ -378,6 +397,7 @@ class SqliteIndexReader: sqlite_path: EPath db: ThreadLocalSqlite + _samples_table_checked: bool def __init__(self, sqlite_path: EPath): """Initialize the SQLite database reader. @@ -393,6 +413,25 @@ def __init__(self, sqlite_path: EPath): path = f"file:{path}?mode=ro&immutable=1" self.db = ThreadLocalSqlite(path, is_uri=True) + self._samples_table_checked = False + + def _check_samples_table(self) -> None: + """Verify the ``samples`` table is present, raising a clear error if not. + + Called by every method that queries the ``samples`` / ``sample_parts`` tables, so callers + accessing a dataset prepared with ``--no-sample-tables`` get a descriptive error instead + of a raw ``sqlite3.OperationalError: no such table: samples``. The check runs once per + reader instance. + """ + if self._samples_table_checked: + return + assert self.db is not None, "Database is closed" + row = self.db.select_one( + "SELECT name FROM sqlite_master WHERE type='table' AND name='samples'" + ) + if row is None: + raise MissingSamplesTableError(self.sqlite_path) + self._samples_table_checked = True def db_has_sample_parts(self) -> bool: """Check if the database has a sample_parts table. @@ -427,6 +466,7 @@ def list_all_samples(self) -> Generator[Tuple[str, int, int], None, None]: """ assert self.db is not None, "Database is closed" + self._check_samples_table() for row in self.db.select_all("SELECT sample_key, byte_size, tar_file_id FROM samples"): yield row[0], row[1], row[2] @@ -439,6 +479,7 @@ def list_all_sample_parts(self) -> Generator[Tuple[str, int, int], None, None]: """ assert self.db is not None, "Database is closed" + self._check_samples_table() # Select all parts (sorted by tar_file_id, sample_index) but joined with the sample_key names for row in self.db.select_all( @@ -464,6 +505,7 @@ def list_sample_parts(self, sample_key: str) -> Generator[Tuple[str, int, int], """ assert self.db is not None, "Database is closed" + self._check_samples_table() # Select all parts (sorted by tar_file_id, sample_index) but joined with the sample_key names for row in self.db.select_all( @@ -483,6 +525,7 @@ def list_sample_parts(self, sample_key: str) -> Generator[Tuple[str, int, int], def get_total_size(self) -> int: """Get the total size of all samples in the database.""" assert self.db is not None, "Database is closed" + self._check_samples_table() count = self.db.select_one("SELECT SUM(byte_size) FROM samples") return count[0] if count else 0 @@ -490,6 +533,7 @@ def get_total_size(self) -> int: def get_sample_count(self) -> int: """Get the total number of samples in the database.""" assert self.db is not None, "Database is closed" + self._check_samples_table() count = self.db.select_one("SELECT COUNT(*) FROM samples") return count[0] if count else 0 @@ -505,6 +549,7 @@ def get_sample_part(self, key: str, part_name: str) -> ITarRawSamplePartPointer: Pointer to the sample part raw data. """ assert self.db is not None, "Database is closed" + self._check_samples_table() row = self.db.select_one( "SELECT sp.tar_file_id, sp.content_byte_offset, sp.content_byte_size " @@ -534,6 +579,7 @@ def get_sample_pointer_by_key(self, key: str) -> ITarSamplePointer: Tuple of (tar_file_id, sample_key, sample_index, byte_offset, byte_size) """ assert self.db is not None, "Database is closed" + self._check_samples_table() sample = self.db.select_one( "SELECT tar_file_id, sample_key, sample_index, byte_offset, byte_size " diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 314f613b..5bef8a3a 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -19,10 +19,10 @@ from pathlib import Path from typing import Hashable, List, Tuple, Type, Union +import click import numpy as np import torch import webdataset as wds -import click from click.testing import CliRunner from PIL import Image @@ -1841,10 +1841,7 @@ def test_prepare_dataset_no_sample_tables(self): assert sqlite_path.is_file() with sqlite3.connect(str(sqlite_path)) as conn: tables = { - row[0] - for row in conn.execute( - "SELECT name FROM sqlite_master WHERE type='table'" - ) + row[0] for row in conn.execute("SELECT name FROM sqlite_master WHERE type='table'") } assert "samples" not in tables, f"unexpected samples table: {tables}" assert "sample_parts" not in tables, f"unexpected sample_parts table: {tables}" @@ -1865,6 +1862,26 @@ def test_prepare_dataset_no_sample_tables(self): result.stdout + (result.stderr or "") ) or isinstance(result.exception, click.UsageError) + # Sample-key operations against the SQLite reader must raise a descriptive error, + # not the raw `sqlite3.OperationalError: no such table: samples`. + from megatron.energon.epathlib import EPath + from megatron.energon.flavors.webdataset.indexing import ( + MissingSamplesTableError, + SqliteIndexReader, + ) + + reader = SqliteIndexReader(EPath(str(sqlite_path))) + try: + with self.assertRaises(MissingSamplesTableError) as ctx: + reader.get_sample_pointer_by_key("any-key") + assert "no-sample-tables" in str(ctx.exception) + with self.assertRaises(MissingSamplesTableError): + reader.get_sample_count() + with self.assertRaises(MissingSamplesTableError): + reader.get_sample_part("any-key", "txt") + finally: + reader.close() + def test_preview_captioning_dataset(self): runner = CliRunner() result = runner.invoke( From 88245f2daf84645f13d6c4c36ef28de1850b3d9d Mon Sep 17 00:00:00 2001 From: Pei Li Date: Tue, 19 May 2026 17:29:23 +0200 Subject: [PATCH 5/9] test: save/restore round-trip on a --no-sample-tables dataset Empirically verifies that the integer-indexed loader's checkpoint / resume path works on a dataset prepared with --no-sample-tables. ShardInfosITarReader and SliceState never touch the SQLite samples tables, so the load-bearing claim of the flag is that training-time save/restore still produces the same sample sequence. This test exercises the round-trip: 1. Reference: an uninterrupted iteration of 20 samples. 2. Capture state mid-stream (after 10 samples) via save_state_rank(). 3. Continue iterating to capture the next 10 samples (post_save). 4. Build a fresh loader, restore_state_rank(state), iterate 10 samples (post_restore). 5. Assert first_half + post_save == reference (no divergence from the reference run) and post_restore == post_save (resumed iteration matches continued iteration). Re-prepares the test fixture as CaptioningSample + --no-sample-tables so get_train_dataset returns decodable samples. Signed-off-by: Pei Li --- tests/test_dataset.py | 66 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 5bef8a3a..c686a7b9 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -1882,6 +1882,72 @@ def test_prepare_dataset_no_sample_tables(self): finally: reader.close() + def test_prepare_dataset_no_sample_tables_save_restore(self): + """Resume after a mid-iteration checkpoint must reach the same samples in the + same order on a dataset prepared with ``--no-sample-tables``. + + This is the load-bearing claim of the flag: the integer-indexed loader + (``ShardInfosITarReader``) and the savable state (``SliceState``) do not + touch the SQLite samples tables, so save/restore must work without them. + We re-prepare the fixture with ``--no-sample-tables`` plus a captioning + field-map (so ``get_train_dataset`` yields decodable samples), then compare + a save/restore round-trip against a reference run. + """ + + from megatron.energon import get_savable_loader, get_train_dataset + + runner = CliRunner() + result = runner.invoke( + prepare_command, + [ + str(self.dataset_path), + "--non-interactive", + "--force-overwrite", + "--split-ratio=1,0,0", + "--sample-type=CaptioningSample", + '--field-map={"image": "png", "caption": "txt"}', + "--no-sample-tables", + ], + catch_exceptions=False, + ) + assert result.exit_code == 0, f"Prepare failed: {result.stdout}" + + def loader_factory(): + return get_savable_loader( + get_train_dataset( + self.dataset_path, + batch_size=2, + worker_config=no_worker_config, + shuffle_buffer_size=20, + max_samples_per_sequence=10, + ) + ) + + def keys_from(loader, n): + return [tuple(batch.__key__) for _, batch in zip(range(n), loader)] + + # Reference: a single uninterrupted run, used as the ground truth. + reference = keys_from(loader_factory(), 20) + + # Capture state mid-stream. + loader = loader_factory() + first_half = keys_from(loader, 10) + state = loader.save_state_rank() + post_save = keys_from(loader, 10) + + # Restore into a fresh loader and continue. The resumed sequence must + # match what the original loader produced after `save_state_rank()`. + resumed = loader_factory() + resumed.restore_state_rank(state) + post_restore = keys_from(resumed, 10) + + assert first_half + post_save == reference, ( + f"Uninterrupted iteration diverges from reference: {first_half + post_save} != {reference}" + ) + assert post_restore == post_save, ( + f"Resume diverged from continued iteration: {post_restore} != {post_save}" + ) + def test_preview_captioning_dataset(self): runner = CliRunner() result = runner.invoke( From e35c68dc8ac1b5b7564a032dd845501425124ddd Mon Sep 17 00:00:00 2001 From: Pei Li Date: Tue, 19 May 2026 19:08:22 +0200 Subject: [PATCH 6/9] Address second review round: clearer docs, eager check in SqliteITarEntryReader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - data_prep.md: replace "consumed sequentially" with the precise constraint ("not used as auxiliary or mounted"). - SqliteIndexReader: expose has_sample_tables as a constructor-time attribute (mirrors db_has_sample_parts); drop the per-method _check_samples_table guard. - SqliteITarEntryReader: raise MissingSamplesTableError at __init__ when the samples table is missing — fail fast at the boundary that actually requires it. - Test updated to assert at SqliteITarEntryReader construction. Signed-off-by: Pei Li --- docs/source/basic/data_prep.md | 4 +-- .../energon/flavors/webdataset/indexing.py | 33 ++++++------------ .../energon/flavors/webdataset/itar_reader.py | 12 +++++-- tests/test_dataset.py | 34 ++++++------------- 4 files changed, 33 insertions(+), 50 deletions(-) diff --git a/docs/source/basic/data_prep.md b/docs/source/basic/data_prep.md index 982b7663..1edb62b5 100644 --- a/docs/source/basic/data_prep.md +++ b/docs/source/basic/data_prep.md @@ -688,13 +688,13 @@ The `media_metadata` table is used to store the media metadata for the selected #### Skipping the samples tables for very large datasets -For very large datasets (100M+ samples), populating the `samples` and `sample_parts` tables can dominate `energon prepare` runtime. If the dataset will only be consumed sequentially (not as an auxiliary dataset), you can skip these tables: +For very large datasets (100M+ samples), populating the `samples` and `sample_parts` tables can dominate `energon prepare` runtime. If the dataset will not be used as an auxiliary dataset or mounted, you can skip these tables: ```sh > energon prepare --no-sample-tables /path/to/dataset ``` -A dataset prepared this way cannot be used as an [auxiliary dataset](aux-data). +A dataset prepared this way cannot be used as an [auxiliary dataset](aux-data) or with `energon mount`. (data-on-disk-jsonl)= ## Dataset Format on Disk for JSONL Datasets diff --git a/src/megatron/energon/flavors/webdataset/indexing.py b/src/megatron/energon/flavors/webdataset/indexing.py index 29304c3c..c698a5f8 100644 --- a/src/megatron/energon/flavors/webdataset/indexing.py +++ b/src/megatron/energon/flavors/webdataset/indexing.py @@ -36,9 +36,9 @@ class MissingSamplesTableError(RuntimeError): datasets and do not support sample-key lookups. """ - def __init__(self, sqlite_path: "EPath") -> None: + def __init__(self, sqlite_path: EPath) -> None: super().__init__( - f"Dataset at {sqlite_path} was prepared without the SQLite samples tables " + f"Dataset at {sqlite_path.parent.parent} was prepared without the SQLite samples tables " f"(`energon prepare --no-sample-tables` / `enable_sample_tables=False`). " f"Re-prepare the dataset without that option to use it as an auxiliary dataset " f"or for any sample-key lookup." @@ -397,7 +397,6 @@ class SqliteIndexReader: sqlite_path: EPath db: ThreadLocalSqlite - _samples_table_checked: bool def __init__(self, sqlite_path: EPath): """Initialize the SQLite database reader. @@ -413,25 +412,20 @@ def __init__(self, sqlite_path: EPath): path = f"file:{path}?mode=ro&immutable=1" self.db = ThreadLocalSqlite(path, is_uri=True) - self._samples_table_checked = False - def _check_samples_table(self) -> None: - """Verify the ``samples`` table is present, raising a clear error if not. + def db_has_samples(self) -> bool: + """Check if the database has a samples table. - Called by every method that queries the ``samples`` / ``sample_parts`` tables, so callers - accessing a dataset prepared with ``--no-sample-tables`` get a descriptive error instead - of a raw ``sqlite3.OperationalError: no such table: samples``. The check runs once per - reader instance. + Returns: + True if samples table exists, False otherwise. """ - if self._samples_table_checked: - return assert self.db is not None, "Database is closed" - row = self.db.select_one( + + db_exists = self.db.select_one( "SELECT name FROM sqlite_master WHERE type='table' AND name='samples'" ) - if row is None: - raise MissingSamplesTableError(self.sqlite_path) - self._samples_table_checked = True + self.db.thread_close() + return db_exists is not None def db_has_sample_parts(self) -> bool: """Check if the database has a sample_parts table. @@ -466,7 +460,6 @@ def list_all_samples(self) -> Generator[Tuple[str, int, int], None, None]: """ assert self.db is not None, "Database is closed" - self._check_samples_table() for row in self.db.select_all("SELECT sample_key, byte_size, tar_file_id FROM samples"): yield row[0], row[1], row[2] @@ -479,7 +472,6 @@ def list_all_sample_parts(self) -> Generator[Tuple[str, int, int], None, None]: """ assert self.db is not None, "Database is closed" - self._check_samples_table() # Select all parts (sorted by tar_file_id, sample_index) but joined with the sample_key names for row in self.db.select_all( @@ -505,7 +497,6 @@ def list_sample_parts(self, sample_key: str) -> Generator[Tuple[str, int, int], """ assert self.db is not None, "Database is closed" - self._check_samples_table() # Select all parts (sorted by tar_file_id, sample_index) but joined with the sample_key names for row in self.db.select_all( @@ -525,7 +516,6 @@ def list_sample_parts(self, sample_key: str) -> Generator[Tuple[str, int, int], def get_total_size(self) -> int: """Get the total size of all samples in the database.""" assert self.db is not None, "Database is closed" - self._check_samples_table() count = self.db.select_one("SELECT SUM(byte_size) FROM samples") return count[0] if count else 0 @@ -533,7 +523,6 @@ def get_total_size(self) -> int: def get_sample_count(self) -> int: """Get the total number of samples in the database.""" assert self.db is not None, "Database is closed" - self._check_samples_table() count = self.db.select_one("SELECT COUNT(*) FROM samples") return count[0] if count else 0 @@ -549,7 +538,6 @@ def get_sample_part(self, key: str, part_name: str) -> ITarRawSamplePartPointer: Pointer to the sample part raw data. """ assert self.db is not None, "Database is closed" - self._check_samples_table() row = self.db.select_one( "SELECT sp.tar_file_id, sp.content_byte_offset, sp.content_byte_size " @@ -579,7 +567,6 @@ def get_sample_pointer_by_key(self, key: str) -> ITarSamplePointer: Tuple of (tar_file_id, sample_key, sample_index, byte_offset, byte_size) """ assert self.db is not None, "Database is closed" - self._check_samples_table() sample = self.db.select_one( "SELECT tar_file_id, sample_key, sample_index, byte_offset, byte_size " diff --git a/src/megatron/energon/flavors/webdataset/itar_reader.py b/src/megatron/energon/flavors/webdataset/itar_reader.py index b6c837aa..a45fca69 100644 --- a/src/megatron/energon/flavors/webdataset/itar_reader.py +++ b/src/megatron/energon/flavors/webdataset/itar_reader.py @@ -484,7 +484,8 @@ class SqliteITarEntryReader(ITarReader[str]): """ sqlite_reader: SqliteIndexReader - db_has_sample_parts: int + db_has_sample_parts: bool + db_has_samples: bool def __init__( self, @@ -496,7 +497,10 @@ def __init__( disable_cache: bool = False, ): from megatron.energon.flavors.webdataset.config import MAIN_FOLDER_NAME - from megatron.energon.flavors.webdataset.indexing import SqliteIndexReader + from megatron.energon.flavors.webdataset.indexing import ( + MissingSamplesTableError, + SqliteIndexReader, + ) # shard_name_to_info_idx = {name: i for i, name in enumerate(wds_meta.info_shard_files)} tar_filenames = get_info_shard_files(base_path) @@ -507,6 +511,10 @@ def __init__( self.sqlite_reader = SqliteIndexReader(sqlite_path) self.db_has_sample_parts = self.sqlite_reader.db_has_sample_parts() + self.db_has_samples = self.sqlite_reader.db_has_samples() + + if not self.db_has_samples: + raise MissingSamplesTableError(sqlite_path) self.key_is_full_entryname = key_is_full_entryname diff --git a/tests/test_dataset.py b/tests/test_dataset.py index c686a7b9..907e0c90 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -10,6 +10,7 @@ import logging import math import random +import sqlite3 import sys import tempfile import unittest @@ -48,8 +49,11 @@ ) from megatron.energon.dataset_config import get_dataset_from_config from megatron.energon.edataclass import edataclass +from megatron.energon.epathlib import EPath from megatron.energon.flavors import BaseWebdatasetFactory from megatron.energon.flavors.webdataset.config import MAIN_FOLDER_NAME +from megatron.energon.flavors.webdataset.indexing import MissingSamplesTableError, SqliteIndexReader +from megatron.energon.flavors.webdataset.itar_reader import SqliteITarEntryReader from megatron.energon.task_encoder.base import stateless from megatron.energon.tools.analyze_debug import command as analyze_debug_command from megatron.energon.tools.info import command as info_command @@ -1814,8 +1818,6 @@ def test_prepare_dataset_no_sample_tables(self): - The flag rejects being combined with `--tar-index-only`. """ - import sqlite3 - runner = CliRunner() result = runner.invoke( prepare_command, @@ -1862,25 +1864,13 @@ def test_prepare_dataset_no_sample_tables(self): result.stdout + (result.stderr or "") ) or isinstance(result.exception, click.UsageError) - # Sample-key operations against the SQLite reader must raise a descriptive error, - # not the raw `sqlite3.OperationalError: no such table: samples`. - from megatron.energon.epathlib import EPath - from megatron.energon.flavors.webdataset.indexing import ( - MissingSamplesTableError, - SqliteIndexReader, - ) - - reader = SqliteIndexReader(EPath(str(sqlite_path))) - try: - with self.assertRaises(MissingSamplesTableError) as ctx: - reader.get_sample_pointer_by_key("any-key") - assert "no-sample-tables" in str(ctx.exception) - with self.assertRaises(MissingSamplesTableError): - reader.get_sample_count() - with self.assertRaises(MissingSamplesTableError): - reader.get_sample_part("any-key", "txt") - finally: - reader.close() + # SqliteIndexReader exposes db_has_samples() as part of its public surface. + index_reader = SqliteIndexReader(EPath(str(sqlite_path))) + assert index_reader.db_has_samples() is False + + with self.assertRaises(MissingSamplesTableError) as ctx: + SqliteITarEntryReader(EPath(str(self.dataset_path))) + assert "no-sample-tables" in str(ctx.exception) def test_prepare_dataset_no_sample_tables_save_restore(self): """Resume after a mid-iteration checkpoint must reach the same samples in the @@ -1894,8 +1884,6 @@ def test_prepare_dataset_no_sample_tables_save_restore(self): a save/restore round-trip against a reference run. """ - from megatron.energon import get_savable_loader, get_train_dataset - runner = CliRunner() result = runner.invoke( prepare_command, From 3a7644ab11ba085d3eea9bf5492125df35f87fb8 Mon Sep 17 00:00:00 2001 From: pei-li-hedgehog Date: Wed, 20 May 2026 09:58:55 +0200 Subject: [PATCH 7/9] Update src/megatron/energon/flavors/webdataset/itar_reader.py Co-authored-by: Lukas Voegtle <5764745+voegtlel@users.noreply.github.com> Signed-off-by: pei-li-hedgehog --- src/megatron/energon/flavors/webdataset/itar_reader.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/megatron/energon/flavors/webdataset/itar_reader.py b/src/megatron/energon/flavors/webdataset/itar_reader.py index a45fca69..d321e9cf 100644 --- a/src/megatron/energon/flavors/webdataset/itar_reader.py +++ b/src/megatron/energon/flavors/webdataset/itar_reader.py @@ -511,9 +511,7 @@ def __init__( self.sqlite_reader = SqliteIndexReader(sqlite_path) self.db_has_sample_parts = self.sqlite_reader.db_has_sample_parts() - self.db_has_samples = self.sqlite_reader.db_has_samples() - - if not self.db_has_samples: + if not self.sqlite_reader.db_has_samples(): raise MissingSamplesTableError(sqlite_path) self.key_is_full_entryname = key_is_full_entryname From 250686531f0df35e8112f2555ccfff4fc996f791 Mon Sep 17 00:00:00 2001 From: pei-li-hedgehog Date: Wed, 20 May 2026 09:59:23 +0200 Subject: [PATCH 8/9] Update src/megatron/energon/flavors/webdataset/itar_reader.py Co-authored-by: Lukas Voegtle <5764745+voegtlel@users.noreply.github.com> Signed-off-by: pei-li-hedgehog --- src/megatron/energon/flavors/webdataset/itar_reader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/megatron/energon/flavors/webdataset/itar_reader.py b/src/megatron/energon/flavors/webdataset/itar_reader.py index d321e9cf..3b8ae3c1 100644 --- a/src/megatron/energon/flavors/webdataset/itar_reader.py +++ b/src/megatron/energon/flavors/webdataset/itar_reader.py @@ -485,7 +485,6 @@ class SqliteITarEntryReader(ITarReader[str]): sqlite_reader: SqliteIndexReader db_has_sample_parts: bool - db_has_samples: bool def __init__( self, From 09148620113a4459d71f6c75f173f7373a417e78 Mon Sep 17 00:00:00 2001 From: Pei Li Date: Wed, 20 May 2026 10:18:05 +0200 Subject: [PATCH 9/9] fix test failure for test_prepare_dataset_no_sample_tables Signed-off-by: Pei Li --- tests/test_dataset.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 907e0c90..3cc9efb6 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -20,7 +20,6 @@ from pathlib import Path from typing import Hashable, List, Tuple, Type, Union -import click import numpy as np import torch import webdataset as wds @@ -1860,9 +1859,7 @@ def test_prepare_dataset_no_sample_tables(self): catch_exceptions=True, ) assert result.exit_code != 0 - assert "--no-sample-tables cannot be combined with --tar-index-only" in ( - result.stdout + (result.stderr or "") - ) or isinstance(result.exception, click.UsageError) + assert "--no-sample-tables cannot be combined with --tar-index-only" in result.output # SqliteIndexReader exposes db_has_samples() as part of its public surface. index_reader = SqliteIndexReader(EPath(str(sqlite_path)))