Skip to content

Commit 4d26af1

Browse files
committed
fix
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent ab83928 commit 4d26af1

3 files changed

Lines changed: 161 additions & 5 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-test/compat-gen/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ arrow-array = { workspace = true }
3737
tpchgen = { workspace = true }
3838
tpchgen-arrow = { workspace = true }
3939

40-
# ClickBench parquet reading
40+
# ClickBench parquet reading + writing
41+
arrow-select = { workspace = true }
4142
bytes = { workspace = true }
4243
parquet = { workspace = true }
4344

@@ -54,3 +55,4 @@ clap = { workspace = true, features = ["derive"] }
5455
serde = { workspace = true, features = ["derive"] }
5556
serde_json = { workspace = true }
5657
tempfile = { workspace = true }
58+

vortex-test/compat-gen/src/fixtures/arrays/datasets/clickbench.rs

Lines changed: 157 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::fs;
5+
use std::path::PathBuf;
6+
47
use arrow_array::RecordBatch;
58
use bytes::Bytes;
69
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -13,9 +16,156 @@ use vortex_error::vortex_err;
1316

1417
use crate::fixtures::DatasetFixture;
1518

16-
/// 5×1000 rows sampled from deterministic random offsets in ClickBench hits partition 0.
17-
/// Offsets (seed=42): [26225, 116739, 288389, 670487, 777572].
18-
const CLICKBENCH_PARQUET: &[u8] = include_bytes!("../../../../data/clickbench_hits_5k.parquet");
19+
// TODO: Upload the pre-sampled 5k parquet to R2 and download it in a build.rs instead of
20+
// downloading the full ~112MB partition 0 at runtime.
21+
const CLICKBENCH_URL: &str =
22+
"https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_0.parquet";
23+
24+
/// Deterministic offsets (seed=42) into clickbench hits partition 0.
25+
const SAMPLE_OFFSETS: [usize; 5] = [26225, 116739, 288389, 670487, 777572];
26+
const ROWS_PER_OFFSET: usize = 1000;
27+
28+
const MAX_RETRIES: u32 = 3;
29+
30+
/// Returns the path to `data/clickbench_hits_5k.parquet` relative to the crate root,
31+
/// downloading and sampling from the full dataset if it doesn't already exist.
32+
fn cached_clickbench_parquet() -> VortexResult<PathBuf> {
33+
let crate_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
34+
let data_dir = crate_dir.join("data");
35+
let dest = data_dir.join("clickbench_hits_5k.parquet");
36+
37+
if dest.exists() {
38+
return Ok(dest);
39+
}
40+
41+
fs::create_dir_all(&data_dir).map_err(|e| vortex_err!("failed to create data dir: {e}"))?;
42+
43+
// Download full partition 0 to a temp file.
44+
let source_bytes = download_with_retries(CLICKBENCH_URL)?;
45+
46+
// Sample 5k rows and write to dest.
47+
sample_and_write(&source_bytes, &dest)?;
48+
49+
Ok(dest)
50+
}
51+
52+
fn download_with_retries(url: &str) -> VortexResult<Bytes> {
53+
let client = reqwest::blocking::Client::builder()
54+
.timeout(std::time::Duration::from_secs(300))
55+
.build()
56+
.map_err(|e| vortex_err!("failed to build HTTP client: {e}"))?;
57+
58+
for attempt in 1..=MAX_RETRIES {
59+
match client.get(url).send() {
60+
Ok(response) if response.status().is_success() => {
61+
return response
62+
.bytes()
63+
.map_err(|e| vortex_err!("failed to read response body: {e}"));
64+
}
65+
Ok(response) if response.status().is_client_error() => {
66+
return Err(vortex_err!(
67+
"HTTP {}: failed to download {url}",
68+
response.status()
69+
));
70+
}
71+
Ok(response) => {
72+
eprintln!(
73+
"Download attempt {attempt}/{MAX_RETRIES} failed: HTTP {} for {url}",
74+
response.status()
75+
);
76+
}
77+
Err(e) => {
78+
eprintln!("Download attempt {attempt}/{MAX_RETRIES} failed: {e}");
79+
}
80+
}
81+
82+
if attempt < MAX_RETRIES {
83+
let delay = std::time::Duration::from_secs(2u64.pow(attempt));
84+
std::thread::sleep(delay);
85+
}
86+
}
87+
88+
Err(vortex_err!(
89+
"failed to download {url} after {MAX_RETRIES} attempts"
90+
))
91+
}
92+
93+
#[allow(clippy::cast_possible_truncation)]
94+
fn sample_and_write(source_bytes: &[u8], dest: &std::path::Path) -> VortexResult<()> {
95+
let source_bytes = Bytes::copy_from_slice(source_bytes);
96+
let builder = ParquetRecordBatchReaderBuilder::try_new(source_bytes.clone())
97+
.map_err(|e| vortex_err!("failed to open source parquet: {e}"))?;
98+
let metadata = builder.metadata().clone();
99+
100+
let total_rows: usize = metadata
101+
.row_groups()
102+
.iter()
103+
.map(|rg| rg.num_rows() as usize)
104+
.sum();
105+
106+
// Build (row_group_index, local_offset, count) ranges for each sample window.
107+
let mut ranges: Vec<(usize, usize, usize)> = Vec::new();
108+
for &offset in &SAMPLE_OFFSETS {
109+
let end = (offset + ROWS_PER_OFFSET).min(total_rows);
110+
let mut remaining = end - offset;
111+
let mut global_pos = 0usize;
112+
113+
for (rg_idx, rg_meta) in metadata.row_groups().iter().enumerate() {
114+
let rg_rows = rg_meta.num_rows() as usize;
115+
let rg_end = global_pos + rg_rows;
116+
117+
if offset < rg_end && global_pos < end {
118+
let local_start = offset.saturating_sub(global_pos);
119+
let local_end = (local_start + remaining).min(rg_rows);
120+
let count = local_end - local_start;
121+
if count > 0 {
122+
ranges.push((rg_idx, local_start, count));
123+
remaining -= count;
124+
}
125+
}
126+
global_pos = rg_end;
127+
if remaining == 0 {
128+
break;
129+
}
130+
}
131+
}
132+
133+
// Read each range and collect batches.
134+
let mut sampled_batches: Vec<RecordBatch> = Vec::new();
135+
for &(rg_idx, local_offset, count) in &ranges {
136+
let reader = ParquetRecordBatchReaderBuilder::try_new(source_bytes.clone())
137+
.map_err(|e| vortex_err!("failed to open parquet for sampling: {e}"))?
138+
.with_row_groups(vec![rg_idx])
139+
.with_offset(local_offset)
140+
.with_limit(count)
141+
.with_batch_size(count)
142+
.build()
143+
.map_err(|e| vortex_err!("failed to build parquet reader: {e}"))?;
144+
145+
for batch in reader {
146+
sampled_batches
147+
.push(batch.map_err(|e| vortex_err!("failed to read parquet batch: {e}"))?);
148+
}
149+
}
150+
151+
// Write sampled batches to a parquet file.
152+
let schema = sampled_batches[0].schema();
153+
let combined = arrow_select::concat::concat_batches(&schema, &sampled_batches)
154+
.map_err(|e| vortex_err!("failed to concat batches: {e}"))?;
155+
156+
let file =
157+
fs::File::create(dest).map_err(|e| vortex_err!("failed to create output parquet: {e}"))?;
158+
let mut writer = parquet::arrow::ArrowWriter::try_new(file, schema, None)
159+
.map_err(|e| vortex_err!("failed to create parquet writer: {e}"))?;
160+
writer
161+
.write(&combined)
162+
.map_err(|e| vortex_err!("failed to write parquet: {e}"))?;
163+
writer
164+
.close()
165+
.map_err(|e| vortex_err!("failed to close parquet writer: {e}"))?;
166+
167+
Ok(())
168+
}
19169

20170
struct ClickBenchHits5kFixture;
21171

@@ -29,7 +179,10 @@ impl DatasetFixture for ClickBenchHits5kFixture {
29179
}
30180

31181
fn build(&self) -> VortexResult<ArrayRef> {
32-
let bytes = Bytes::from_static(CLICKBENCH_PARQUET);
182+
let path = cached_clickbench_parquet()?;
183+
let file_bytes = fs::read(&path)
184+
.map_err(|e| vortex_err!("failed to read cached parquet at {}: {e}", path.display()))?;
185+
let bytes = Bytes::from(file_bytes);
33186

34187
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes)
35188
.map_err(|e| vortex_err!("failed to open parquet: {e}"))?

0 commit comments

Comments
 (0)