From 92a3145b94f745a5740379c221e1d70923bf6931 Mon Sep 17 00:00:00 2001 From: AjAnubolu Date: Mon, 2 Mar 2026 17:17:01 -0800 Subject: [PATCH 1/5] [CI] add preprocessing pipeline tests --- .buildkite/pipeline.yml | 15 ++ .buildkite/scripts/pr_test.sh | 4 + fastvideo/tests/modal/pr_test.py | 14 ++ fastvideo/tests/preprocessing/__init__.py | 0 .../preprocessing/test_preprocessing_t2v.py | 156 ++++++++++++++++++ 5 files changed, 189 insertions(+) create mode 100644 fastvideo/tests/preprocessing/__init__.py create mode 100644 fastvideo/tests/preprocessing/test_preprocessing_t2v.py diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index a88e80a9c0..9ef416ef64 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -171,6 +171,21 @@ steps: - TEST_TYPE=unit_test agents: queue: "default" + - path: + - "fastvideo/pipelines/preprocess/**" + - "fastvideo/workflow/preprocess/**" + - "fastvideo/dataset/preprocessing_datasets.py" + - "fastvideo/dataset/dataloader/schema.py" + - "fastvideo/tests/preprocessing/**" + - "pyproject.toml" + - "docker/Dockerfile.python3.12" + config: + command: "timeout 20m .buildkite/scripts/pr_test.sh" + label: "Preprocessing Tests" + env: + - TEST_TYPE=preprocessing + agents: + queue: "default" # - path: # - "scripts/lora_extraction/**" # - "pyproject.toml" diff --git a/.buildkite/scripts/pr_test.sh b/.buildkite/scripts/pr_test.sh index 278b1e64f7..e65baaf26e 100755 --- a/.buildkite/scripts/pr_test.sh +++ b/.buildkite/scripts/pr_test.sh @@ -119,6 +119,10 @@ case "$TEST_TYPE" in log "Running LoRA extraction tests..." MODAL_COMMAND="$MODAL_ENV HF_API_KEY=$HF_API_KEY python3 -m modal run $MODAL_TEST_FILE::run_lora_extraction_tests" ;; + "preprocessing") + log "Running preprocessing tests..." + MODAL_COMMAND="$MODAL_ENV HF_API_KEY=$HF_API_KEY python3 -m modal run $MODAL_TEST_FILE::run_preprocessing_tests" + ;; *) log "Error: Unknown test type: $TEST_TYPE" exit 1 diff --git a/fastvideo/tests/modal/pr_test.py b/fastvideo/tests/modal/pr_test.py index dd25e6a003..7ea4726ecb 100644 --- a/fastvideo/tests/modal/pr_test.py +++ b/fastvideo/tests/modal/pr_test.py @@ -138,6 +138,20 @@ def run_self_forcing_tests(): def run_unit_test(): run_test("pytest ./fastvideo/tests/dataset/ ./fastvideo/tests/workflow/ ./fastvideo/tests/entrypoints/ -vs") +@app.function( + gpu="L40S:1", + image=image, + timeout=1200, + secrets=[modal.Secret.from_dict( + {"HF_API_KEY": os.environ.get("HF_API_KEY", "")})], + volumes={"/root/data": model_vol}) +def run_preprocessing_tests(): + run_test( + "export HF_HOME='/root/data/.cache' && " + "hf auth login --token $HF_API_KEY && " + "pytest ./fastvideo/tests/preprocessing/ -vs" + ) + @app.function(gpu="L40S:1", image=image, timeout=3600, secrets=[modal.Secret.from_dict({"HF_API_KEY": os.environ.get("HF_API_KEY", "")})]) def run_lora_extraction_tests(): run_test("hf auth login --token $HF_API_KEY && pytest ./fastvideo/tests/lora_extraction/test_lora_extraction.py") diff --git a/fastvideo/tests/preprocessing/__init__.py b/fastvideo/tests/preprocessing/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/fastvideo/tests/preprocessing/test_preprocessing_t2v.py b/fastvideo/tests/preprocessing/test_preprocessing_t2v.py new file mode 100644 index 0000000000..75f037352d --- /dev/null +++ b/fastvideo/tests/preprocessing/test_preprocessing_t2v.py @@ -0,0 +1,156 @@ +"""End-to-end test for T2V preprocessing pipeline. + +Downloads a small test dataset, runs the preprocessing pipeline via +torchrun, and validates the output parquet files structurally. +""" + +import os +import shutil +import subprocess +from pathlib import Path + +import pyarrow.parquet as pq +from huggingface_hub import snapshot_download + +NUM_NODES = "1" +NUM_GPUS_PER_NODE = "1" +MODEL_PATH = "Wan-AI/Wan2.1-T2V-1.3B-Diffusers" +PREPROCESSING_ENTRY_FILE = ("fastvideo/pipelines/preprocess/v1_preprocess.py") +DATA_DIR = Path("data") +RAW_DATA_DIR = DATA_DIR / "cats" +PREPROCESSED_DATA_DIR = DATA_DIR / "cats_preprocessed_t2v" + +EXPECTED_T2V_COLUMNS = { + "id", + "file_name", + "caption", + "media_type", + "width", + "height", + "num_frames", + "duration_sec", + "fps", + "vae_latent_bytes", + "vae_latent_shape", + "vae_latent_dtype", + "text_embedding_bytes", + "text_embedding_shape", + "text_embedding_dtype", +} + + +def _download_data(): + """Download the small cats overfit dataset from HuggingFace.""" + os.makedirs(DATA_DIR, exist_ok=True) + snapshot_download( + repo_id="wlsaidhi/cats-overfit-merged", + local_dir=str(RAW_DATA_DIR), + repo_type="dataset", + resume_download=True, + token=os.environ.get("HF_TOKEN"), + ) + assert RAW_DATA_DIR.exists(), ( + f"Download failed: {RAW_DATA_DIR} does not exist") + + +def _run_preprocessing(): + """Run the T2V preprocessing pipeline via torchrun.""" + if PREPROCESSED_DATA_DIR.exists(): + shutil.rmtree(PREPROCESSED_DATA_DIR) + + cmd = [ + "torchrun", + "--nnodes", + NUM_NODES, + "--nproc_per_node", + NUM_GPUS_PER_NODE, + PREPROCESSING_ENTRY_FILE, + "--model_path", + MODEL_PATH, + "--data_merge_path", + os.path.join(RAW_DATA_DIR, "merge_1_sample.txt"), + "--preprocess_video_batch_size", + "1", + "--max_height", + "480", + "--max_width", + "832", + "--num_frames", + "77", + "--dataloader_num_workers", + "0", + "--output_dir", + str(PREPROCESSED_DATA_DIR), + "--train_fps", + "16", + "--samples_per_file", + "1", + "--flush_frequency", + "1", + "--video_length_tolerance_range", + "5", + "--preprocess_task", + "t2v", + ] # fmt: skip + + subprocess.run(cmd, check=True) + + +def test_preprocessing_t2v(): + """Run T2V preprocessing and validate output parquet files.""" + _download_data() + _run_preprocessing() + + parquet_dir = PREPROCESSED_DATA_DIR / "combined_parquet_dataset" + assert parquet_dir.exists(), ( + f"Expected output dir not found: {parquet_dir}") + + parquet_files = sorted(parquet_dir.glob("*.parquet")) + assert len(parquet_files) >= 1, "No parquet files produced" + + table = pq.read_table(str(parquet_files[0])) + + # -- schema validation -- + actual_columns = set(table.schema.names) + missing = EXPECTED_T2V_COLUMNS - actual_columns + assert not missing, f"Missing columns in parquet: {missing}" + + # -- row count -- + assert table.num_rows >= 1, "Parquet file has zero rows" + + # -- per-row content validation -- + for i in range(table.num_rows): + row = { + col: table.column(col)[i].as_py() + for col in EXPECTED_T2V_COLUMNS + } + + # VAE latent + assert len(row["vae_latent_bytes"]) > 0, ( + f"Row {i}: vae_latent_bytes is empty") + assert len(row["vae_latent_shape"]) == 4, ( + f"Row {i}: vae_latent_shape should have 4 elements " + f"(C,T,H,W), got {row['vae_latent_shape']}") + + # Text embedding + assert len(row["text_embedding_bytes"]) > 0, ( + f"Row {i}: text_embedding_bytes is empty") + + # Caption + assert isinstance(row["caption"], str) and row["caption"], ( + f"Row {i}: caption is empty or not a string") + + # Media type + assert row["media_type"] == "video", ( + f"Row {i}: expected media_type='video', " + f"got '{row['media_type']}'") + + # Dimensions + assert row["width"] > 0, ( + f"Row {i}: width must be positive, got {row['width']}") + assert row["height"] > 0, ( + f"Row {i}: height must be positive, got {row['height']}") + + +if __name__ == "__main__": + test_preprocessing_t2v() From 4d05c5ecf418f10afc709eb74b5e81960241dfb5 Mon Sep 17 00:00:00 2001 From: AjAnubolu Date: Mon, 2 Mar 2026 17:24:56 -0800 Subject: [PATCH 2/5] fix yapf formatting --- fastvideo/tests/modal/pr_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/fastvideo/tests/modal/pr_test.py b/fastvideo/tests/modal/pr_test.py index f3cf4c6d1d..ebe15ea35b 100644 --- a/fastvideo/tests/modal/pr_test.py +++ b/fastvideo/tests/modal/pr_test.py @@ -219,10 +219,9 @@ def run_unit_test(): ], volumes={"/root/data": model_vol}) def run_preprocessing_tests(): - run_test( - "export HF_HOME='/root/data/.cache' && " - "hf auth login --token $HF_API_KEY && " - "pytest ./fastvideo/tests/preprocessing/ -vs") + run_test("export HF_HOME='/root/data/.cache' && " + "hf auth login --token $HF_API_KEY && " + "pytest ./fastvideo/tests/preprocessing/ -vs") @app.function(gpu="L40S:1", From 422a8578301928e5ac7a34402832c417764f0ab3 Mon Sep 17 00:00:00 2001 From: AjAnubolu Date: Mon, 2 Mar 2026 22:06:53 -0800 Subject: [PATCH 3/5] [CI] add new preprocessing pipeline tests and comparison test --- .../preprocessing/test_preprocessing_t2v.py | 198 ++++++++++++++---- 1 file changed, 155 insertions(+), 43 deletions(-) diff --git a/fastvideo/tests/preprocessing/test_preprocessing_t2v.py b/fastvideo/tests/preprocessing/test_preprocessing_t2v.py index 75f037352d..88151ca708 100644 --- a/fastvideo/tests/preprocessing/test_preprocessing_t2v.py +++ b/fastvideo/tests/preprocessing/test_preprocessing_t2v.py @@ -1,7 +1,7 @@ -"""End-to-end test for T2V preprocessing pipeline. +"""End-to-end tests for T2V preprocessing pipelines. -Downloads a small test dataset, runs the preprocessing pipeline via -torchrun, and validates the output parquet files structurally. +Downloads a small test dataset, runs old and new preprocessing pipelines +via torchrun, and validates the output parquet files structurally. """ import os @@ -15,10 +15,12 @@ NUM_NODES = "1" NUM_GPUS_PER_NODE = "1" MODEL_PATH = "Wan-AI/Wan2.1-T2V-1.3B-Diffusers" -PREPROCESSING_ENTRY_FILE = ("fastvideo/pipelines/preprocess/v1_preprocess.py") +OLD_ENTRY_FILE = "fastvideo/pipelines/preprocess/v1_preprocess.py" +NEW_ENTRY_FILE = ("fastvideo/pipelines/preprocess/v1_preprocessing_new.py") DATA_DIR = Path("data") RAW_DATA_DIR = DATA_DIR / "cats" -PREPROCESSED_DATA_DIR = DATA_DIR / "cats_preprocessed_t2v" +PREPROCESSED_DIR_OLD = DATA_DIR / "cats_preprocessed_t2v" +PREPROCESSED_DIR_NEW = DATA_DIR / "cats_preprocessed_t2v_new" EXPECTED_T2V_COLUMNS = { "id", @@ -53,10 +55,10 @@ def _download_data(): f"Download failed: {RAW_DATA_DIR} does not exist") -def _run_preprocessing(): - """Run the T2V preprocessing pipeline via torchrun.""" - if PREPROCESSED_DATA_DIR.exists(): - shutil.rmtree(PREPROCESSED_DATA_DIR) +def _run_old_preprocessing(): + """Run the old T2V preprocessing pipeline via torchrun.""" + if PREPROCESSED_DIR_OLD.exists(): + shutil.rmtree(PREPROCESSED_DIR_OLD) cmd = [ "torchrun", @@ -64,7 +66,7 @@ def _run_preprocessing(): NUM_NODES, "--nproc_per_node", NUM_GPUS_PER_NODE, - PREPROCESSING_ENTRY_FILE, + OLD_ENTRY_FILE, "--model_path", MODEL_PATH, "--data_merge_path", @@ -80,7 +82,7 @@ def _run_preprocessing(): "--dataloader_num_workers", "0", "--output_dir", - str(PREPROCESSED_DATA_DIR), + str(PREPROCESSED_DIR_OLD), "--train_fps", "16", "--samples_per_file", @@ -96,61 +98,171 @@ def _run_preprocessing(): subprocess.run(cmd, check=True) -def test_preprocessing_t2v(): - """Run T2V preprocessing and validate output parquet files.""" - _download_data() - _run_preprocessing() +def _run_new_preprocessing(): + """Run the new T2V preprocessing pipeline via torchrun.""" + if PREPROCESSED_DIR_NEW.exists(): + shutil.rmtree(PREPROCESSED_DIR_NEW) + + cmd = [ + "torchrun", + "--nnodes", + NUM_NODES, + "--nproc_per_node", + NUM_GPUS_PER_NODE, + NEW_ENTRY_FILE, + "--model-path", + MODEL_PATH, + "--mode", + "preprocess", + "--workload-type", + "t2v", + "--preprocess.dataset-path", + str(RAW_DATA_DIR), + "--preprocess.dataset-type", + "merged", + "--preprocess.dataset-output-dir", + str(PREPROCESSED_DIR_NEW), + "--preprocess.video-loader-type", + "torchvision", + "--preprocess.preprocess-video-batch-size", + "1", + "--preprocess.max-height", + "480", + "--preprocess.max-width", + "832", + "--preprocess.num-frames", + "77", + "--preprocess.dataloader-num-workers", + "0", + "--preprocess.train-fps", + "16", + "--preprocess.samples-per-file", + "1", + "--preprocess.flush-frequency", + "1", + "--preprocess.video-length-tolerance-range", + "5", + ] # fmt: skip + + subprocess.run(cmd, check=True) + - parquet_dir = PREPROCESSED_DATA_DIR / "combined_parquet_dataset" +def _read_first_parquet(parquet_dir): + """Read and return the first parquet table from a directory.""" assert parquet_dir.exists(), ( f"Expected output dir not found: {parquet_dir}") - parquet_files = sorted(parquet_dir.glob("*.parquet")) - assert len(parquet_files) >= 1, "No parquet files produced" + assert len(parquet_files) >= 1, (f"No parquet files in {parquet_dir}") + return pq.read_table(str(parquet_files[0])) - table = pq.read_table(str(parquet_files[0])) - # -- schema validation -- +def _validate_parquet_t2v(table, label=""): + """Validate a parquet table has the expected T2V schema and content.""" + prefix = f"[{label}] " if label else "" + + # Schema actual_columns = set(table.schema.names) missing = EXPECTED_T2V_COLUMNS - actual_columns - assert not missing, f"Missing columns in parquet: {missing}" + assert not missing, f"{prefix}Missing columns: {missing}" - # -- row count -- - assert table.num_rows >= 1, "Parquet file has zero rows" + # Row count + assert table.num_rows >= 1, f"{prefix}Parquet has zero rows" - # -- per-row content validation -- + # Per-row validation for i in range(table.num_rows): row = { col: table.column(col)[i].as_py() for col in EXPECTED_T2V_COLUMNS } - # VAE latent assert len(row["vae_latent_bytes"]) > 0, ( - f"Row {i}: vae_latent_bytes is empty") + f"{prefix}Row {i}: vae_latent_bytes is empty") assert len(row["vae_latent_shape"]) == 4, ( - f"Row {i}: vae_latent_shape should have 4 elements " - f"(C,T,H,W), got {row['vae_latent_shape']}") - - # Text embedding + f"{prefix}Row {i}: vae_latent_shape should have " + f"4 elements (C,T,H,W), got {row['vae_latent_shape']}") assert len(row["text_embedding_bytes"]) > 0, ( - f"Row {i}: text_embedding_bytes is empty") - - # Caption + f"{prefix}Row {i}: text_embedding_bytes is empty") assert isinstance(row["caption"], str) and row["caption"], ( - f"Row {i}: caption is empty or not a string") - - # Media type + f"{prefix}Row {i}: caption is empty or not a string") assert row["media_type"] == "video", ( - f"Row {i}: expected media_type='video', " + f"{prefix}Row {i}: expected media_type='video', " f"got '{row['media_type']}'") + assert row["width"] > 0, (f"{prefix}Row {i}: width must be positive") + assert row["height"] > 0, (f"{prefix}Row {i}: height must be positive") + + +def test_preprocessing_t2v_old(): + """Run old T2V preprocessing and validate output parquet files.""" + _download_data() + _run_old_preprocessing() + + parquet_dir = PREPROCESSED_DIR_OLD / "combined_parquet_dataset" + table = _read_first_parquet(parquet_dir) + _validate_parquet_t2v(table, label="old pipeline") + + +def test_preprocessing_t2v_new(): + """Run new T2V preprocessing and validate output parquet files.""" + _download_data() + _run_new_preprocessing() + + parquet_dir = PREPROCESSED_DIR_NEW / "training_dataset" / "worker_0" + table = _read_first_parquet(parquet_dir) + _validate_parquet_t2v(table, label="new pipeline") + + +def test_preprocessing_pipelines_match(): + """Compare old and new pipeline outputs structurally.""" + _download_data() + _run_old_preprocessing() + _run_new_preprocessing() + + old_dir = PREPROCESSED_DIR_OLD / "combined_parquet_dataset" + new_dir = PREPROCESSED_DIR_NEW / "training_dataset" / "worker_0" + old_table = _read_first_parquet(old_dir) + new_table = _read_first_parquet(new_dir) + + # Both must have the expected columns + for name, tbl in [("old", old_table), ("new", new_table)]: + actual = set(tbl.schema.names) + missing = EXPECTED_T2V_COLUMNS - actual + assert not missing, f"{name} pipeline missing columns: {missing}" + + # Same number of rows + assert old_table.num_rows == new_table.num_rows, ( + f"Row count mismatch: old={old_table.num_rows}, " + f"new={new_table.num_rows}") + + # Per-row structural comparison (not exact bytes — fp diffs expected) + for i in range(old_table.num_rows): + old_row = { + col: old_table.column(col)[i].as_py() + for col in EXPECTED_T2V_COLUMNS + } + new_row = { + col: new_table.column(col)[i].as_py() + for col in EXPECTED_T2V_COLUMNS + } - # Dimensions - assert row["width"] > 0, ( - f"Row {i}: width must be positive, got {row['width']}") - assert row["height"] > 0, ( - f"Row {i}: height must be positive, got {row['height']}") + assert old_row["vae_latent_shape"] == new_row["vae_latent_shape"], ( + f"Row {i}: vae_latent_shape mismatch: " + f"old={old_row['vae_latent_shape']}, " + f"new={new_row['vae_latent_shape']}") + assert ( + old_row["text_embedding_shape"] == new_row["text_embedding_shape"] + ), (f"Row {i}: text_embedding_shape mismatch: " + f"old={old_row['text_embedding_shape']}, " + f"new={new_row['text_embedding_shape']}") + assert old_row["vae_latent_dtype"] == new_row["vae_latent_dtype"], ( + f"Row {i}: vae_latent_dtype mismatch") + assert ( + old_row["text_embedding_dtype"] == new_row["text_embedding_dtype"] + ), (f"Row {i}: text_embedding_dtype mismatch") + assert old_row["media_type"] == new_row["media_type"], ( + f"Row {i}: media_type mismatch") if __name__ == "__main__": - test_preprocessing_t2v() + test_preprocessing_t2v_old() + test_preprocessing_t2v_new() + test_preprocessing_pipelines_match() From ec7cf3f52dec052ca70e6ed384f51d93c9adb9f0 Mon Sep 17 00:00:00 2001 From: AjAnubolu Date: Tue, 3 Mar 2026 00:27:09 -0800 Subject: [PATCH 4/5] fix pymarkdown lint in architecture.md --- docs/inference/architecture.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/inference/architecture.md b/docs/inference/architecture.md index 085d47b779..bdfd1e8c33 100644 --- a/docs/inference/architecture.md +++ b/docs/inference/architecture.md @@ -268,6 +268,7 @@ Specialized variants: `CausalDenoisingStage`, `LTX2DenoisingStage`, `positive_int_divisible(divisor)`, etc. `VerificationResult` collects check results: + ```python result = VerificationResult() result.add_check("height", batch.height, V.positive_int_divisible(8)) From d72e41a694d10c0af83978e12089859d2fc1e6b2 Mon Sep 17 00:00:00 2001 From: AjAnubolu Date: Tue, 3 Mar 2026 00:29:34 -0800 Subject: [PATCH 5/5] fix HF_TOKEN env var and use pathlib consistently --- fastvideo/tests/preprocessing/test_preprocessing_t2v.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fastvideo/tests/preprocessing/test_preprocessing_t2v.py b/fastvideo/tests/preprocessing/test_preprocessing_t2v.py index 88151ca708..a04df0253b 100644 --- a/fastvideo/tests/preprocessing/test_preprocessing_t2v.py +++ b/fastvideo/tests/preprocessing/test_preprocessing_t2v.py @@ -49,7 +49,7 @@ def _download_data(): local_dir=str(RAW_DATA_DIR), repo_type="dataset", resume_download=True, - token=os.environ.get("HF_TOKEN"), + token=os.environ.get("HF_API_KEY"), ) assert RAW_DATA_DIR.exists(), ( f"Download failed: {RAW_DATA_DIR} does not exist") @@ -70,7 +70,7 @@ def _run_old_preprocessing(): "--model_path", MODEL_PATH, "--data_merge_path", - os.path.join(RAW_DATA_DIR, "merge_1_sample.txt"), + str(RAW_DATA_DIR / "merge_1_sample.txt"), "--preprocess_video_batch_size", "1", "--max_height",