Skip to content

Commit 876813b

Browse files
chore[compat]: more fixtures compressed (#6986)
Add extract fixtures to the backtesting system. There is another PR that will change the way we actually write the file to a store [see #6993]. Focus only on the generated files and the data from each fixture. --------- Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 7899299 commit 876813b

43 files changed

Lines changed: 1878 additions & 457 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-test/compat-gen/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ path = "src/validate_main.rs"
2525

2626
[dependencies]
2727
# Vortex crates
28-
vortex = { workspace = true, features = ["files", "tokio"] }
28+
vortex = { workspace = true, features = ["files", "tokio", "zstd"] }
2929
vortex-array = { workspace = true, features = ["_test-harness"] }
3030
vortex-buffer = { workspace = true }
3131
vortex-error = { workspace = true }
@@ -37,7 +37,9 @@ arrow-array = { workspace = true }
3737
tpchgen = { workspace = true }
3838
tpchgen-arrow = { workspace = true }
3939

40-
# ClickBench parquet reading
40+
# ClickBench parquet reading + writing
41+
arrow-select = { workspace = true }
42+
bytes = { workspace = true }
4143
parquet = { workspace = true }
4244

4345
# Async runtime
@@ -52,3 +54,4 @@ chrono = { workspace = true, features = ["serde"] }
5254
clap = { workspace = true, features = ["derive"] }
5355
serde = { workspace = true, features = ["derive"] }
5456
serde_json = { workspace = true }
57+
tempfile = { workspace = true }

vortex-test/compat-gen/src/adapter.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use vortex::VortexSessionDefault;
1515
use vortex::file::OpenOptionsSessionExt;
1616
use vortex::file::WriteOptionsSessionExt;
1717
use vortex::io::session::RuntimeSessionExt;
18+
use vortex::layout::LayoutStrategy;
1819
use vortex::layout::layouts::flat::writer::FlatLayoutStrategy;
1920
use vortex_array::ArrayRef;
2021
use vortex_array::DynArray;
@@ -35,7 +36,7 @@ fn runtime() -> VortexResult<Runtime> {
3536
pub fn write_file(path: &Path, chunk: ArrayRef) -> VortexResult<()> {
3637
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));
3738

38-
let strategy: Arc<dyn vortex::layout::LayoutStrategy> = Arc::new(FlatLayoutStrategy::default());
39+
let strategy: Arc<dyn LayoutStrategy> = Arc::new(FlatLayoutStrategy::default());
3940

4041
runtime()?.block_on(async {
4142
let session = VortexSession::default().with_tokio();
@@ -51,6 +52,65 @@ pub fn write_file(path: &Path, chunk: ArrayRef) -> VortexResult<()> {
5152
})
5253
}
5354

55+
/// Write a sequence of array chunks to an in-memory `.vortex` byte buffer with no compression.
56+
pub fn write_file_to_bytes(chunk: ArrayRef) -> VortexResult<ByteBuffer> {
57+
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));
58+
59+
let strategy: Arc<dyn LayoutStrategy> = Arc::new(FlatLayoutStrategy::default());
60+
61+
runtime()?.block_on(async {
62+
let session = VortexSession::default().with_tokio();
63+
let mut bytes = Vec::new();
64+
let _summary = session
65+
.write_options()
66+
.with_strategy(strategy)
67+
.write(&mut bytes, stream)
68+
.await?;
69+
Ok(ByteBuffer::from(bytes))
70+
})
71+
}
72+
73+
/// Write a `.vortex` file using a caller-provided layout strategy (compressor pipeline).
74+
pub fn write_compressed(
75+
path: &Path,
76+
chunk: ArrayRef,
77+
strategy: Arc<dyn LayoutStrategy>,
78+
) -> VortexResult<()> {
79+
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));
80+
81+
runtime()?.block_on(async {
82+
let session = VortexSession::default().with_tokio();
83+
let mut file = tokio::fs::File::create(path)
84+
.await
85+
.map_err(|e| vortex_error::vortex_err!("failed to create {}: {e}", path.display()))?;
86+
let _summary = session
87+
.write_options()
88+
.with_strategy(strategy)
89+
.write(&mut file, stream)
90+
.await?;
91+
Ok(())
92+
})
93+
}
94+
95+
/// Write a `.vortex` file into an in-memory byte buffer using a caller-provided layout strategy.
96+
pub fn write_compressed_to_bytes(
97+
chunk: ArrayRef,
98+
strategy: Arc<dyn LayoutStrategy>,
99+
) -> VortexResult<ByteBuffer> {
100+
let stream = ArrayStreamAdapter::new(chunk.dtype().clone(), stream::iter([Ok(chunk)]));
101+
102+
runtime()?.block_on(async {
103+
let session = VortexSession::default().with_tokio();
104+
let mut bytes = Vec::new();
105+
let _summary = session
106+
.write_options()
107+
.with_strategy(strategy)
108+
.write(&mut bytes, stream)
109+
.await?;
110+
Ok(ByteBuffer::from(bytes))
111+
})
112+
}
113+
54114
/// Read a `.vortex` file from bytes, returning the arrays.
55115
pub fn read_file(bytes: ByteBuffer) -> VortexResult<ArrayRef> {
56116
runtime()?.block_on(async {
Lines changed: 163 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,192 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::fs;
5+
use std::path::PathBuf;
6+
47
use arrow_array::RecordBatch;
8+
use bytes::Bytes;
59
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
610
use vortex_array::ArrayRef;
711
use vortex_array::IntoArray;
812
use vortex_array::arrays::ChunkedArray;
9-
use vortex_array::arrays::Primitive;
10-
use vortex_array::arrays::Struct;
11-
use vortex_array::arrays::VarBin;
1213
use vortex_array::arrow::FromArrowArray;
13-
use vortex_array::vtable::ArrayId;
1414
use vortex_error::VortexResult;
1515
use vortex_error::vortex_err;
1616

17-
use crate::fixtures::ArrayFixture;
17+
use crate::fixtures::DatasetFixture;
1818

19-
/// First partition of ClickBench hits, limited to 1000 rows.
19+
// TODO: Upload the pre-sampled 5k parquet to R2 and download it in a build.rs instead of
20+
// downloading the full ~112MB partition 0 at runtime.
2021
const CLICKBENCH_URL: &str =
2122
"https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_0.parquet";
2223

23-
struct ClickBenchHits1kFixture;
24+
/// Deterministic offsets (seed=42) into clickbench hits partition 0.
25+
const SAMPLE_OFFSETS: [usize; 5] = [26225, 116739, 288389, 670487, 777572];
26+
const ROWS_PER_OFFSET: usize = 1000;
2427

25-
impl ArrayFixture for ClickBenchHits1kFixture {
26-
fn name(&self) -> &str {
27-
"clickbench_hits_1k.vortex"
28+
const MAX_RETRIES: u32 = 3;
29+
30+
/// Returns the path to `data/clickbench_hits_5k.parquet` relative to the crate root,
31+
/// downloading and sampling from the full dataset if it doesn't already exist.
32+
fn cached_clickbench_parquet() -> VortexResult<PathBuf> {
33+
let crate_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
34+
let data_dir = crate_dir.join("data");
35+
let dest = data_dir.join("clickbench_hits_5k.parquet");
36+
37+
if dest.exists() {
38+
return Ok(dest);
2839
}
2940

30-
fn description(&self) -> &str {
31-
"First 1000 rows of ClickBench hits dataset with wide schema of primitives and strings"
41+
fs::create_dir_all(&data_dir).map_err(|e| vortex_err!("failed to create data dir: {e}"))?;
42+
43+
// Download full partition 0 to a temp file.
44+
let source_bytes = download_with_retries(CLICKBENCH_URL)?;
45+
46+
// Sample 5k rows and write to dest.
47+
sample_and_write(&source_bytes, &dest)?;
48+
49+
Ok(dest)
50+
}
51+
52+
fn download_with_retries(url: &str) -> VortexResult<Bytes> {
53+
let client = reqwest::blocking::Client::builder()
54+
.timeout(std::time::Duration::from_secs(300))
55+
.build()
56+
.map_err(|e| vortex_err!("failed to build HTTP client: {e}"))?;
57+
58+
for attempt in 1..=MAX_RETRIES {
59+
match client.get(url).send() {
60+
Ok(response) if response.status().is_success() => {
61+
return response
62+
.bytes()
63+
.map_err(|e| vortex_err!("failed to read response body: {e}"));
64+
}
65+
Ok(response) if response.status().is_client_error() => {
66+
return Err(vortex_err!(
67+
"HTTP {}: failed to download {url}",
68+
response.status()
69+
));
70+
}
71+
Ok(response) => {
72+
eprintln!(
73+
"Download attempt {attempt}/{MAX_RETRIES} failed: HTTP {} for {url}",
74+
response.status()
75+
);
76+
}
77+
Err(e) => {
78+
eprintln!("Download attempt {attempt}/{MAX_RETRIES} failed: {e}");
79+
}
80+
}
81+
82+
if attempt < MAX_RETRIES {
83+
let delay = std::time::Duration::from_secs(2u64.pow(attempt));
84+
std::thread::sleep(delay);
85+
}
86+
}
87+
88+
Err(vortex_err!(
89+
"failed to download {url} after {MAX_RETRIES} attempts"
90+
))
91+
}
92+
93+
#[allow(clippy::cast_possible_truncation)]
94+
fn sample_and_write(source_bytes: &[u8], dest: &std::path::Path) -> VortexResult<()> {
95+
let source_bytes = Bytes::copy_from_slice(source_bytes);
96+
let builder = ParquetRecordBatchReaderBuilder::try_new(source_bytes.clone())
97+
.map_err(|e| vortex_err!("failed to open source parquet: {e}"))?;
98+
let metadata = builder.metadata().clone();
99+
100+
let total_rows: usize = metadata
101+
.row_groups()
102+
.iter()
103+
.map(|rg| rg.num_rows() as usize)
104+
.sum();
105+
106+
// Build (row_group_index, local_offset, count) ranges for each sample window.
107+
let mut ranges: Vec<(usize, usize, usize)> = Vec::new();
108+
for &offset in &SAMPLE_OFFSETS {
109+
let end = (offset + ROWS_PER_OFFSET).min(total_rows);
110+
let mut remaining = end - offset;
111+
let mut global_pos = 0usize;
112+
113+
for (rg_idx, rg_meta) in metadata.row_groups().iter().enumerate() {
114+
let rg_rows = rg_meta.num_rows() as usize;
115+
let rg_end = global_pos + rg_rows;
116+
117+
if offset < rg_end && global_pos < end {
118+
let local_start = offset.saturating_sub(global_pos);
119+
let local_end = (local_start + remaining).min(rg_rows);
120+
let count = local_end - local_start;
121+
if count > 0 {
122+
ranges.push((rg_idx, local_start, count));
123+
remaining -= count;
124+
}
125+
}
126+
global_pos = rg_end;
127+
if remaining == 0 {
128+
break;
129+
}
130+
}
32131
}
33132

34-
fn expected_encodings(&self) -> Vec<ArrayId> {
35-
vec![Struct::ID, Primitive::ID, VarBin::ID]
133+
// Read each range and collect batches.
134+
let mut sampled_batches: Vec<RecordBatch> = Vec::new();
135+
for &(rg_idx, local_offset, count) in &ranges {
136+
let reader = ParquetRecordBatchReaderBuilder::try_new(source_bytes.clone())
137+
.map_err(|e| vortex_err!("failed to open parquet for sampling: {e}"))?
138+
.with_row_groups(vec![rg_idx])
139+
.with_offset(local_offset)
140+
.with_limit(count)
141+
.with_batch_size(count)
142+
.build()
143+
.map_err(|e| vortex_err!("failed to build parquet reader: {e}"))?;
144+
145+
for batch in reader {
146+
sampled_batches
147+
.push(batch.map_err(|e| vortex_err!("failed to read parquet batch: {e}"))?);
148+
}
149+
}
150+
151+
// Write sampled batches to a parquet file.
152+
let schema = sampled_batches[0].schema();
153+
let combined = arrow_select::concat::concat_batches(&schema, &sampled_batches)
154+
.map_err(|e| vortex_err!("failed to concat batches: {e}"))?;
155+
156+
let file =
157+
fs::File::create(dest).map_err(|e| vortex_err!("failed to create output parquet: {e}"))?;
158+
let mut writer = parquet::arrow::ArrowWriter::try_new(file, schema, None)
159+
.map_err(|e| vortex_err!("failed to create parquet writer: {e}"))?;
160+
writer
161+
.write(&combined)
162+
.map_err(|e| vortex_err!("failed to write parquet: {e}"))?;
163+
writer
164+
.close()
165+
.map_err(|e| vortex_err!("failed to close parquet writer: {e}"))?;
166+
167+
Ok(())
168+
}
169+
170+
struct ClickBenchHits5kFixture;
171+
172+
impl DatasetFixture for ClickBenchHits5kFixture {
173+
fn name(&self) -> &str {
174+
"clickbench_hits_5k"
175+
}
176+
177+
fn description(&self) -> &str {
178+
"5000 rows (5x1000 from random offsets) of ClickBench hits dataset with wide schema of primitives and strings"
36179
}
37180

38181
fn build(&self) -> VortexResult<ArrayRef> {
39-
let bytes = reqwest::blocking::get(CLICKBENCH_URL)
40-
.map_err(|e| vortex_err!("failed to download ClickBench parquet: {e}"))?
41-
.bytes()
42-
.map_err(|e| vortex_err!("failed to read ClickBench response body: {e}"))?;
182+
let path = cached_clickbench_parquet()?;
183+
let file_bytes = fs::read(&path)
184+
.map_err(|e| vortex_err!("failed to read cached parquet at {}: {e}", path.display()))?;
185+
let bytes = Bytes::from(file_bytes);
43186

44187
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
45188
.map_err(|e| vortex_err!("failed to open parquet: {e}"))?
46189
.with_batch_size(1000)
47-
.with_limit(1000)
48190
.build()
49191
.map_err(|e| vortex_err!("failed to build parquet reader: {e}"))?;
50192

@@ -62,6 +204,6 @@ impl ArrayFixture for ClickBenchHits1kFixture {
62204
}
63205
}
64206

65-
pub fn fixtures() -> Vec<Box<dyn ArrayFixture>> {
66-
vec![Box::new(ClickBenchHits1kFixture)]
207+
pub fn fixtures() -> Vec<Box<dyn DatasetFixture>> {
208+
vec![Box::new(ClickBenchHits5kFixture)]
67209
}

vortex-test/compat-gen/src/fixtures/arrays/datasets/mod.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,49 @@ mod clickbench;
55
#[allow(clippy::cast_possible_truncation)]
66
mod tpch;
77

8-
use crate::fixtures::ArrayFixture;
8+
use crate::fixtures::DatasetFixture;
99

1010
/// All dataset-derived fixtures.
11-
pub fn fixtures() -> Vec<Box<dyn ArrayFixture>> {
11+
pub fn fixtures() -> Vec<Box<dyn DatasetFixture>> {
1212
let mut fixtures = Vec::new();
1313
fixtures.extend(tpch::fixtures());
1414
fixtures.extend(clickbench::fixtures());
1515
fixtures
1616
}
17+
18+
#[cfg(test)]
19+
mod tests {
20+
use vortex::file::WriteStrategyBuilder;
21+
22+
use super::fixtures;
23+
use crate::adapter;
24+
25+
fn is_clickbench_fixture(name: &str) -> bool {
26+
name.contains("clickbench")
27+
}
28+
29+
#[test]
30+
fn roundtrip_non_clickbench_fixtures_to_bytes() {
31+
for dataset in fixtures()
32+
.into_iter()
33+
.filter(|fixture| !is_clickbench_fixture(fixture.name()))
34+
{
35+
let array = dataset.build().unwrap();
36+
let regular_bytes = adapter::write_compressed_to_bytes(
37+
array.clone(),
38+
WriteStrategyBuilder::default().build(),
39+
)
40+
.unwrap();
41+
let _regular = adapter::read_file(regular_bytes).unwrap();
42+
43+
let compact_bytes = adapter::write_compressed_to_bytes(
44+
array,
45+
WriteStrategyBuilder::default()
46+
.with_compact_encodings()
47+
.build(),
48+
)
49+
.unwrap();
50+
let _compact = adapter::read_file(compact_bytes).unwrap();
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)