diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dfa804a7710..33906466a4d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -886,6 +886,12 @@ jobs: enable-sccache: "false" - run: cargo build --package vortex-jni + compat-check: + name: "Compat check" + uses: ./.github/workflows/compat-validation.yml + with: + mode: last + rust-publish-dry-run: name: "Rust publish dry-run" timeout-minutes: 120 diff --git a/.github/workflows/compat-validation.yml b/.github/workflows/compat-validation.yml index 16bd6b4ed2b..bdefc033964 100644 --- a/.github/workflows/compat-validation.yml +++ b/.github/workflows/compat-validation.yml @@ -3,32 +3,44 @@ name: Compat Validation on: schedule: - cron: "0 6 * * 1" # Monday 6am UTC + workflow_call: + inputs: + mode: + description: "Validation mode" + required: true + default: "all" + type: string workflow_dispatch: inputs: mode: description: "Validation mode" required: true - default: "last" + default: "all" type: choice options: - - last - all - -env: - FIXTURES_URL: https://vortex-compat-fixtures.s3.amazonaws.com + - last jobs: compat-test: - runs-on: ubuntu-latest + runs-on: >- + ${{ github.repository == 'vortex-data/vortex' + && format('runs-on={0}/runner=amd64-medium/image=ubuntu24-full-x64-pre/tag=compat-validation', github.run_id) + || 'ubuntu-latest' }} + timeout-minutes: 120 steps: - - uses: actions/checkout@v4 - - - uses: dtolnay/rust-toolchain@stable - - - uses: Swatinem/rust-cache@v2 - + - uses: runs-on/action@v2 + if: github.repository == 'vortex-data/vortex' + with: + sccache: s3 + - uses: actions/checkout@v6 + - uses: ./.github/actions/setup-prebuild + - name: Install uv + uses: spiraldb/actions/.github/actions/setup-uv@0.18.5 + with: + sync: false - name: Run compat tests run: | - MODE="${{ inputs.mode || 'last' }}" - python3 vortex-test/compat-gen/scripts/compat.py check \ + MODE="${{ inputs.mode || 'all' }}" + uv run vortex-test/compat-gen/scripts/compat.py check \ --mode "$MODE" diff --git a/vortex-test/compat-gen/scripts/compat.py b/vortex-test/compat-gen/scripts/compat.py index 9c1d6c4fc10..123779c846f 100644 --- a/vortex-test/compat-gen/scripts/compat.py +++ b/vortex-test/compat-gen/scripts/compat.py @@ -1,6 +1,9 @@ #!/usr/bin/env python3 # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors +# /// script +# dependencies = ["jsonschema"] +# /// """ Vortex backward-compatibility orchestrator. @@ -79,6 +82,7 @@ # Check all versions, or specific ones uv run compat.py check + uv run compat.py check --mode last uv run compat.py check --versions 0.62.0,0.63.0 # Inspect store contents @@ -341,7 +345,7 @@ def cmd_generate(args: argparse.Namespace) -> None: output = Path(args.output) version = _version_from_ref(args.git_ref) - _run_rust_generate(output) + _run_rust_generate(output, profile=args.profile) # Read fixtures.json (with sha256 from Rust) and write a versioned manifest. fixtures_json = json.loads((output / "fixtures.json").read_text()) @@ -387,7 +391,7 @@ def _publish_full( output = Path(tmpdir) / "fixtures" _info("generating fixtures...") - _run_rust_generate(output) + _run_rust_generate(output, profile=args.profile) fixtures_json = json.loads((output / "fixtures.json").read_text()) @@ -472,7 +476,7 @@ def _publish_update( output = Path(tmpdir) / "fixtures" _info("generating fixtures...") - _run_rust_generate(output) + _run_rust_generate(output, profile=args.profile) fixtures_json = json.loads((output / "fixtures.json").read_text()) @@ -541,10 +545,16 @@ def cmd_check(args: argparse.Namespace) -> None: """Download fixtures from store and check with Rust binary.""" store = _parse_store(args.store) + if args.versions and args.mode != "all": + print("error: --versions and --mode are mutually exclusive", file=sys.stderr) + sys.exit(1) + if args.versions: versions = [v.strip() for v in args.versions.split(",")] else: versions = store.list_versions() + if args.mode == "last" and versions: + versions = versions[-1:] if not versions: _info("no versions found in store") @@ -573,18 +583,15 @@ def cmd_check(args: argparse.Namespace) -> None: with tempfile.TemporaryDirectory() as tmpdir: tmppath = Path(tmpdir) - for entry in manifest["fixtures"]: - name = entry["name"] - data = store.read(f"{prefix}/{name}") - if data is None: - _info(f" v{version}: {name} not found at {prefix}/{name}") - all_failures.append((version, name, "fixture file not found in store")) - total_failed += 1 - continue - (tmppath / name).write_bytes(data) - _info(f" downloaded {name} ({len(data)} bytes)") - - result = _run_rust_check(tmppath, mode="subset") + _info(f" downloading {len(manifest['fixtures'])} fixtures...") + download_failures = _parallel_download(store, manifest["fixtures"], prefix, tmppath) + for name, error in download_failures: + _info(f" v{version}: {name} {error}") + all_failures.append((version, name, error)) + total_failed += 1 + + _info(f" checking v{version}...") + result = _run_rust_check(tmppath, mode="subset", profile=args.profile) passed = len(result.get("passed", [])) failed_list = result.get("failed", []) @@ -742,18 +749,78 @@ def _parallel_upload(store: Store, items: list[tuple[str, Path]], max_workers: i future.result() -def _run_rust_generate(output: Path) -> None: +def _parallel_download( + store: Store, + fixtures: list[dict], + prefix: str, + dest: Path, + max_workers: int = 8, +) -> list[tuple[str, str]]: + """Download fixture files from the store in parallel. + + Returns a list of (name, error) for any failures. + """ + failures: list[tuple[str, str]] = [] + total_bytes = 0 + + def _download_one(entry: dict) -> tuple[str, bytes | None]: + name = entry["name"] + data = store.read(f"{prefix}/{name}") + return name, data + + with ThreadPoolExecutor(max_workers=max_workers) as pool: + futures = {pool.submit(_download_one, entry): entry["name"] for entry in fixtures} + for future in as_completed(futures): + name, data = future.result() + if data is None: + failures.append((name, f"not found at {prefix}/{name}")) + else: + (dest / name).write_bytes(data) + total_bytes += len(data) + + _info(f" downloaded {len(fixtures) - len(failures)} fixtures ({total_bytes} bytes)") + return failures + + +def _build_compat_bin(profile: str = "release") -> str: + """Build vortex-compat and return the path to the binary. + + If VORTEX_COMPAT_BIN is set, skips the build and returns that path. + Otherwise runs `cargo build` with visible output, then locates the binary. + """ + bin_path = os.environ.get("VORTEX_COMPAT_BIN") + if bin_path: + return bin_path + + _info(f"building vortex-compat ({profile})...") + _run_cmd(["cargo", "build", "-p", CARGO_BIN, "--profile", profile], check=True) + + # Ask cargo where the binary is. + result = subprocess.run( + ["cargo", "metadata", "--format-version=1", "--no-deps"], + capture_output=True, + text=True, + check=True, + ) + target_dir = json.loads(result.stdout)["target_directory"] + # Cargo puts "dev" profile binaries in "debug/", all others in "/". + dir_name = "debug" if profile == "dev" else profile + bin_path = str(Path(target_dir) / dir_name / CARGO_BIN) + return bin_path + + +def _run_rust_generate(output: Path, profile: str = "release") -> None: """Run `vortex-compat generate --output `.""" - cmd = _cargo_run_cmd() + ["generate", "--output", str(output)] - _run_cmd(cmd, check=True) + bin_path = _build_compat_bin(profile) + _run_cmd([bin_path, "generate", "--output", str(output)], check=True) -def _run_rust_check(dir: Path, mode: str = "subset") -> dict: +def _run_rust_check(dir: Path, mode: str = "subset", profile: str = "release") -> dict: """Run `vortex-compat check --dir --mode ` and parse JSON stdout.""" - cmd = _cargo_run_cmd() + ["check", "--dir", str(dir), "--mode", mode] - result = subprocess.run(cmd, capture_output=True, text=True) - if result.stderr: - print(result.stderr, end="", file=sys.stderr) + bin_path = _build_compat_bin(profile) + cmd = [bin_path, "check", "--dir", str(dir), "--mode", mode] + _info(f" $ {' '.join(cmd)}") + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=None, text=True) # noqa: UP022 if result.stdout.strip(): return json.loads(result.stdout) @@ -767,17 +834,12 @@ def _run_rust_check(dir: Path, mode: str = "subset") -> dict: return {"passed": [], "failed": [], "skipped": []} -def _cargo_run_cmd() -> list[str]: - """Build the command to invoke vortex-compat (pre-built binary or cargo run).""" - bin_path = os.environ.get("VORTEX_COMPAT_BIN") - if bin_path: - return [bin_path] - return ["cargo", "run", "-p", CARGO_BIN, "--release", "--"] - - def _run_cmd(cmd: list[str], check: bool = False, cwd: Path | None = None) -> subprocess.CompletedProcess: _info(f" $ {' '.join(cmd)}") - return subprocess.run(cmd, check=check, cwd=cwd) + result = subprocess.run(cmd, check=False, cwd=cwd) + if check and result.returncode != 0: + raise subprocess.CalledProcessError(result.returncode, cmd) + return result def _find_prev_version(versions: list[str], current: str) -> str | None: @@ -816,6 +878,11 @@ def main() -> None: epilog=EPILOG, formatter_class=argparse.RawDescriptionHelpFormatter, ) + parser.add_argument( + "--profile", + default="release", + help="Cargo build profile (default: release). Use 'dev' for faster builds.", + ) sub = parser.add_subparsers(dest="command", metavar="COMMAND") # -- generate -- @@ -902,6 +969,7 @@ def main() -> None: epilog=( "examples:\n" " uv run compat.py check\n" + " uv run compat.py check --mode last\n" " uv run compat.py check --versions 0.62.0,0.63.0\n" " uv run compat.py check --store /tmp/store" ), @@ -910,7 +978,14 @@ def main() -> None: p.add_argument("--store", default=DEFAULT_STORE, help="Store spec (default: %(default)s)") p.add_argument( "--versions", - help="Comma-separated versions to check (default: all)", + help="Comma-separated versions to check (mutually exclusive with --mode)", + ) + p.add_argument( + "--mode", + choices=["all", "last"], + default="all", + help="Which versions to check: 'all' (default) or 'last' (most recent only). " + "Mutually exclusive with --versions.", ) # -- list -- diff --git a/vortex-test/compat-gen/src/adapter.rs b/vortex-test/compat-gen/src/adapter.rs index bfdcedfd57a..1ff1b11ccf1 100644 --- a/vortex-test/compat-gen/src/adapter.rs +++ b/vortex-test/compat-gen/src/adapter.rs @@ -16,10 +16,13 @@ use vortex::file::OpenOptionsSessionExt; use vortex::file::WriteOptionsSessionExt; use vortex::io::session::RuntimeSessionExt; use vortex::layout::LayoutStrategy; +use vortex::layout::layouts::flat::Flat; use vortex::layout::layouts::flat::writer::FlatLayoutStrategy; use vortex_array::ArrayRef; use vortex_array::ArrayVisitorExt; use vortex_array::DynArray; +use vortex_array::MaskFuture; +use vortex_array::expr::root; use vortex_array::expr::stats::Stat; use vortex_array::stream::ArrayStreamAdapter; use vortex_array::stream::ArrayStreamExt; @@ -108,3 +111,38 @@ pub fn read_file(bytes: ByteBuffer) -> VortexResult { file.scan()?.into_array_stream()?.read_all().await }) } + +/// Open a `.vortex` file and fully decode every array in the layout tree, including +/// auxiliary data like zone maps and dictionaries. +/// +/// Walks the entire layout tree and for each leaf `FlatLayout`, reads the segment +/// and calls `ArrayParts::decode()` to fully deserialize the array. This exercises +/// every segment in the file — not just the data path that a plain `scan()` touches. +/// If any segment is corrupt or any array fails to decode, this will error. +pub fn read_layout_tree(bytes: ByteBuffer) -> VortexResult<()> { + runtime()?.block_on(async { + let session = VortexSession::default().with_tokio(); + let file = session.open_options().open_buffer(bytes)?; + let root_layout = file.footer().layout().clone(); + let segment_source = file.segment_source(); + + for layout_result in root_layout.depth_first_traversal() { + let layout = layout_result?; + if layout.as_opt::().is_none() { + continue; + } + let row_count = layout.row_count(); + if row_count == 0 { + continue; + } + let reader = layout.new_reader("".into(), segment_source.clone(), &session)?; + let len = + usize::try_from(row_count).map_err(|e| vortex_err!("row count overflow: {e}"))?; + reader + .projection_evaluation(&(0..row_count), &root(), MaskFuture::new_true(len))? + .await?; + } + + Ok(()) + }) +} diff --git a/vortex-test/compat-gen/src/check.rs b/vortex-test/compat-gen/src/check.rs index f8dd4e8f52d..7533ee9dab7 100644 --- a/vortex-test/compat-gen/src/check.rs +++ b/vortex-test/compat-gen/src/check.rs @@ -135,7 +135,7 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { // Read the stored file. let stored_bytes = match std::fs::read(&stored_path) { - Ok(b) => b, + Ok(b) => ByteBuffer::from(b), Err(e) => { result.failed.push(FailedFixture { name: fresh_name.clone(), @@ -144,30 +144,42 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { continue; } }; - let stored_array = match adapter::read_file(ByteBuffer::from(stored_bytes)) { - Ok(a) => a, + + // Read the fresh file. + let fresh_path = tmp_dir.path().join(fresh_name); + let fresh_bytes = match std::fs::read(&fresh_path) { + Ok(b) => ByteBuffer::from(b), Err(e) => { result.failed.push(FailedFixture { name: fresh_name.clone(), - error: format!("failed to decode stored vortex file: {e}"), + error: format!("failed to read fresh file: {e}"), }); continue; } }; - // Read the fresh file. - let fresh_path = tmp_dir.path().join(fresh_name); - let fresh_bytes = match std::fs::read(&fresh_path) { - Ok(b) => b, + // Validate the full layout tree of the stored file (reads every segment + // including zone maps, dictionaries, etc.). + if let Err(e) = adapter::read_layout_tree(stored_bytes.clone()) { + result.failed.push(FailedFixture { + name: fresh_name.clone(), + error: format!("stored file layout tree invalid: {e}"), + }); + continue; + } + + // Scan data arrays from both files and compare. + let stored_array = match adapter::read_file(stored_bytes) { + Ok(a) => a, Err(e) => { result.failed.push(FailedFixture { name: fresh_name.clone(), - error: format!("failed to read fresh file: {e}"), + error: format!("failed to decode stored vortex file: {e}"), }); continue; } }; - let fresh_array = match adapter::read_file(ByteBuffer::from(fresh_bytes)) { + let fresh_array = match adapter::read_file(fresh_bytes) { Ok(a) => a, Err(e) => { result.failed.push(FailedFixture { @@ -178,7 +190,6 @@ pub fn check(dir: &Path, mode: Mode, exclude: &[String]) -> VortexResult<()> { } }; - // Compare arrays. assert_arrays_eq!(stored_array, fresh_array); eprintln!(" pass {fresh_name}"); result.passed.push(fresh_name.clone());