Skip to content

Commit 8577d92

Browse files
committed
unify download management for benchmarks
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent 91b4c75 commit 8577d92

8 files changed

Lines changed: 167 additions & 143 deletions

File tree

vortex-bench/src/clickbench/benchmark.rs

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ use std::path::Path;
77

88
use anyhow::Result;
99
use url::Url;
10-
use vortex::error::VortexExpect;
1110

1211
use crate::Benchmark;
1312
use crate::BenchmarkDataset;
1413
use crate::IdempotentPath;
1514
use crate::TableSpec;
1615
use crate::clickbench::*;
16+
use crate::utils::file::resolve_data_url;
1717

1818
/// ClickBench benchmark implementation
1919
pub struct ClickBenchBenchmark {
@@ -37,31 +37,7 @@ impl ClickBenchBenchmark {
3737
}
3838

3939
fn create_data_url(remote_data_dir: &Option<String>, flavor: Flavor) -> Result<Url> {
40-
match remote_data_dir {
41-
None => {
42-
let basepath = format!("clickbench_{flavor}").to_data_path();
43-
Ok(Url::parse(&format!(
44-
"file:{}/",
45-
basepath.to_str().vortex_expect("path should be utf8")
46-
))?)
47-
}
48-
Some(remote_data_dir) => {
49-
if !remote_data_dir.ends_with("/") {
50-
tracing::warn!(
51-
"Supply a --use-remote-data-dir argument which ends in a slash e.g. s3://vortex-bench-dev-eu/parquet/"
52-
);
53-
}
54-
tracing::info!(
55-
concat!(
56-
"Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\\n",
57-
"If it does not, you should kill this command, locally generate the files (by running without\\n",
58-
"--use-remote-data-dir) and upload data/clickbench/ to some remote location.",
59-
),
60-
remote_data_dir,
61-
);
62-
Ok(Url::parse(remote_data_dir)?)
63-
}
64-
}
40+
resolve_data_url(remote_data_dir.as_deref(), &format!("clickbench_{flavor}"))
6541
}
6642
}
6743

vortex-bench/src/clickbench/data.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ use arrow_schema::TimeUnit;
1414
use clap::ValueEnum;
1515
use serde::Deserialize;
1616
use serde::Serialize;
17-
use tokio::task::JoinSet;
1817
use tracing::info;
1918
use vortex::error::VortexExpect;
2019

2120
use crate::Format;
2221
// Re-export for use by clickbench_benchmark
2322
pub use crate::conversions::convert_parquet_directory_to_vortex;
23+
use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY;
2424
use crate::datasets::data_downloads::download_data;
25+
use crate::datasets::data_downloads::download_many;
2526

2627
pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
2728
use DataType::*;
@@ -193,18 +194,14 @@ impl Flavor {
193194
Flavor::Partitioned => {
194195
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
195196
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
196-
197-
let mut tasks = (0_u32..100).map(|idx| {
198-
let output_path = basepath.join(Format::Parquet.name()).join(format!("hits_{idx}.parquet"));
199-
200-
info!("Downloading file {idx}");
197+
info!("Downloading 100 ClickBench parquet shards");
198+
let parquet_dir = basepath.join(Format::Parquet.name());
199+
let downloads = (0_u32..100).map(|idx| {
200+
let output_path = parquet_dir.join(format!("hits_{idx}.parquet"));
201201
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
202-
download_data(output_path, url)
203-
}).collect::<JoinSet<_>>();
204-
205-
while let Some(task) = tasks.join_next().await {
206-
task??;
207-
}
202+
(output_path, url)
203+
});
204+
download_many(downloads, DEFAULT_DOWNLOAD_CONCURRENCY).await?;
208205
}
209206
}
210207
Ok(())

vortex-bench/src/datasets/data_downloads.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ use std::fs::File;
55
use std::io::Read;
66
use std::io::Write;
77
use std::path::PathBuf;
8+
use std::sync::LazyLock;
89
use std::time::Duration;
910

1011
use anyhow::Context;
1112
use anyhow::Error;
1213
use anyhow::Result;
1314
use bzip2::read::BzDecoder;
1415
use futures::StreamExt;
16+
use futures::stream;
1517
use indicatif::ProgressBar;
1618
use indicatif::ProgressStyle;
1719
use parking_lot::RwLock;
@@ -25,6 +27,29 @@ use tracing::warn;
2527
use crate::utils::file::idempotent;
2628
use crate::utils::file::idempotent_async;
2729

30+
/// Default concurrency limit for bulk downloads. Keeps us polite to the upstream while still
31+
/// saturating a typical 10 Gb link on a parquet-per-shard benchmark.
32+
pub const DEFAULT_DOWNLOAD_CONCURRENCY: usize = 16;
33+
34+
/// Shared HTTP client used by every dataset download.
35+
///
36+
/// Reusing a single client gives us connection pooling, DNS caching, and consistent timeouts
37+
/// across all callers. Each benchmark used to build its own `reqwest::Client` on every download,
38+
/// which both wasted TLS handshakes and made it hard to reason about total in-flight concurrency.
39+
static HTTP_CLIENT: LazyLock<Client> = LazyLock::new(|| {
40+
Client::builder()
41+
.read_timeout(Duration::from_secs(60))
42+
.timeout(Duration::from_secs(60 * 15))
43+
.build()
44+
.expect("failed to build shared benchmark HTTP client")
45+
});
46+
47+
/// Access the shared HTTP client. Exposed for callers that need custom request shapes
48+
/// (e.g. streaming VCF parsing) while still benefitting from pooled connections.
49+
pub fn http_client() -> &'static Client {
50+
&HTTP_CLIENT
51+
}
52+
2853
async fn retry_get<F: Future<Output = Result<Response>>, R: Fn() -> F>(
2954
make_req: R,
3055
tmp_path: PathBuf,
@@ -97,12 +122,13 @@ async fn retry_get<F: Future<Output = Result<Response>>, R: Fn() -> F>(
97122
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error")))
98123
}
99124

125+
/// Idempotently download a single URL to `fname`.
126+
///
127+
/// Uses the shared HTTP client, a 3-attempt exponential backoff retry loop, and an `indicatif`
128+
/// progress bar. If `fname` already exists, the download is skipped.
129+
#[tracing::instrument(skip_all, fields(url = %data_url.as_ref(), path = %fname.display()))]
100130
pub async fn download_data(fname: PathBuf, data_url: impl AsRef<str>) -> Result<PathBuf> {
101-
let client = Client::builder()
102-
.read_timeout(Duration::from_secs(60))
103-
.timeout(Duration::from_secs(60 * 15))
104-
.build()
105-
.context("Failed to build HTTP client")?;
131+
let client = http_client();
106132

107133
idempotent_async(&fname, async |path| {
108134
let url = data_url.as_ref();
@@ -123,6 +149,64 @@ pub async fn download_data(fname: PathBuf, data_url: impl AsRef<str>) -> Result<
123149
.await
124150
}
125151

152+
/// Idempotently download many `(path, url)` pairs with bounded parallelism.
153+
///
154+
/// This is the preferred way to fetch multi-shard datasets (ClickBench partitioned, vector
155+
/// dataset train shards, Public BI tables, etc.) because it:
156+
///
157+
/// - caps in-flight HTTP requests at `max_concurrency` so we don't overwhelm the upstream
158+
/// or our own network stack,
159+
/// - reuses the shared HTTP client across every shard,
160+
/// - short-circuits on the first error (the remaining in-flight downloads are dropped when
161+
/// the returned future is dropped),
162+
/// - returns the resolved on-disk paths in the same order they were submitted.
163+
///
164+
/// Pass `0` as `max_concurrency` to use [`DEFAULT_DOWNLOAD_CONCURRENCY`].
165+
#[tracing::instrument(skip_all, fields(count = tracing::field::Empty, max_concurrency))]
166+
pub async fn download_many<I>(downloads: I, max_concurrency: usize) -> Result<Vec<PathBuf>>
167+
where
168+
I: IntoIterator,
169+
I::Item: IntoDownload,
170+
{
171+
let downloads: Vec<(PathBuf, String)> = downloads
172+
.into_iter()
173+
.map(IntoDownload::into_download)
174+
.collect();
175+
tracing::Span::current().record("count", downloads.len());
176+
177+
let concurrency = if max_concurrency == 0 {
178+
DEFAULT_DOWNLOAD_CONCURRENCY
179+
} else {
180+
max_concurrency
181+
};
182+
183+
let results: Vec<Result<PathBuf>> = stream::iter(downloads)
184+
.map(|(path, url)| async move { download_data(path, url).await })
185+
.buffered(concurrency)
186+
.collect()
187+
.await;
188+
189+
results.into_iter().collect()
190+
}
191+
192+
/// Anything that can be described as a `(target_path, url)` pair accepted by [`download_many`].
193+
pub trait IntoDownload {
194+
fn into_download(self) -> (PathBuf, String);
195+
}
196+
197+
impl IntoDownload for (PathBuf, String) {
198+
fn into_download(self) -> (PathBuf, String) {
199+
self
200+
}
201+
}
202+
203+
impl IntoDownload for (PathBuf, &str) {
204+
fn into_download(self) -> (PathBuf, String) {
205+
(self.0, self.1.to_owned())
206+
}
207+
}
208+
209+
#[tracing::instrument(skip_all, fields(input = %input_path.display(), output = %output_path.display()))]
126210
pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> Result<PathBuf> {
127211
idempotent(&output_path, |path| {
128212
info!(

vortex-bench/src/fineweb/mod.rs

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33

44
use std::path::PathBuf;
55

6-
use tokio::io::AsyncWriteExt;
76
use tracing::info;
87
use url::Url;
98

109
use crate::Benchmark;
1110
use crate::BenchmarkDataset;
12-
use crate::IdempotentPath;
1311
use crate::TableSpec;
14-
use crate::idempotent_async;
12+
use crate::datasets::data_downloads::download_data;
13+
use crate::utils::file::resolve_data_url;
1514

1615
/// URL to the sample file
1716
const SAMPLE_URL: &str = "https://huggingface.co/datasets/HuggingFaceFW/fineweb/resolve/v1.4.0/sample/10BT/001_00000.parquet";
@@ -56,30 +55,7 @@ impl FinewebBenchmark {
5655
}
5756

5857
fn create_data_url(remote_data_dir: &Option<String>) -> anyhow::Result<Url> {
59-
match remote_data_dir {
60-
None => {
61-
let data_dir = "fineweb".to_data_path();
62-
Url::from_directory_path(&data_dir).map_err(|_| {
63-
anyhow::anyhow!("Failed to create URL from directory path: {:?}", &data_dir)
64-
})
65-
}
66-
Some(remote_data_dir) => {
67-
if !remote_data_dir.ends_with("/") {
68-
tracing::warn!(
69-
"Supply a --use-remote-data-dir argument which ends in a slash e.g. s3://vortex-bench-dev-eu/develop/12345/fineweb/"
70-
);
71-
}
72-
tracing::info!(
73-
concat!(
74-
"Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\n",
75-
"If it does not, you should kill this command, locally generate the files (by running without\n",
76-
"--use-remote-data-dir) and upload data/fineweb/ to some remote location.",
77-
),
78-
remote_data_dir,
79-
);
80-
Ok(Url::parse(remote_data_dir)?)
81-
}
82-
}
58+
resolve_data_url(remote_data_dir.as_deref(), "fineweb")
8359
}
8460
}
8561

@@ -104,27 +80,7 @@ impl Benchmark for FinewebBenchmark {
10480
return Ok(());
10581
}
10682

107-
let parquet = idempotent_async(&self.parquet_path()?, |parquet_path| async move {
108-
info!("Downloading FineWeb Parquet source from HuggingFace");
109-
110-
let response = reqwest::get(SAMPLE_URL)
111-
.await?
112-
.error_for_status()
113-
.map_err(|err| {
114-
anyhow::anyhow!("error fetching fineweb sample from HuggingFace: {err}")
115-
})?;
116-
117-
let bytes = response.bytes().await?;
118-
let mut w = tokio::fs::File::create(parquet_path).await?;
119-
120-
w.write_all(&bytes).await?;
121-
122-
w.flush().await?;
123-
124-
Ok(())
125-
})
126-
.await?;
127-
83+
let parquet = download_data(self.parquet_path()?, SAMPLE_URL).await?;
12884
info!("fineweb base data generated in {}", parquet.display());
12985

13086
Ok(())

vortex-bench/src/public_bi.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ use crate::SESSION;
4242
use crate::TableSpec;
4343
use crate::conversions::parquet_to_vortex_chunks;
4444
use crate::datasets::Dataset;
45+
use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY;
4546
use crate::datasets::data_downloads::decompress_bz2;
46-
use crate::datasets::data_downloads::download_data;
47+
use crate::datasets::data_downloads::download_many;
4748
use crate::idempotent_async;
4849
use crate::workspace_root;
4950

@@ -289,16 +290,13 @@ pub struct PBIData {
289290

290291
impl PBIData {
291292
async fn download_bzips(&self) -> anyhow::Result<()> {
292-
let download_futures = self.tables.iter().map(|table| {
293-
download_data(
293+
let downloads = self.tables.iter().map(|table| {
294+
(
294295
self.get_file_path(&table.name, FileType::CsvBzip2),
295-
table.data_url.as_str(),
296+
table.data_url.as_str().to_owned(),
296297
)
297298
});
298-
let results = join_all(download_futures).await;
299-
for result in results {
300-
result?;
301-
}
299+
download_many(downloads, DEFAULT_DOWNLOAD_CONCURRENCY).await?;
302300
Ok(())
303301
}
304302

vortex-bench/src/realnest/gharchive.rs

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ use url::Url;
1414

1515
use crate::Benchmark;
1616
use crate::BenchmarkDataset;
17-
use crate::IdempotentPath;
1817
use crate::TableSpec;
1918
use crate::idempotent;
2019
use crate::idempotent_async;
20+
use crate::utils::file::resolve_data_url;
2121

2222
/// Template URL for raw JSON dataset
2323
fn raw_json_url(hour: usize) -> String {
@@ -48,30 +48,7 @@ impl GithubArchiveBenchmark {
4848
}
4949

5050
fn create_data_url(remote_data_dir: &Option<String>) -> anyhow::Result<Url> {
51-
match remote_data_dir {
52-
None => {
53-
let data_dir = "gharchive".to_data_path();
54-
Url::from_directory_path(&data_dir).map_err(|_| {
55-
anyhow::anyhow!("Failed to create URL from directory path: {:?}", &data_dir)
56-
})
57-
}
58-
Some(remote_data_dir) => {
59-
if !remote_data_dir.ends_with("/") {
60-
tracing::warn!(
61-
"Supply a --use-remote-data-dir argument which ends in a slash e.g. s3://vortex-bench-dev-eu/develop/12345/gharchive/"
62-
);
63-
}
64-
tracing::info!(
65-
concat!(
66-
"Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\n",
67-
"If it does not, you should kill this command, locally generate the files (by running without\n",
68-
"--use-remote-data-dir) and upload data/gharchive/ to some remote location.",
69-
),
70-
remote_data_dir,
71-
);
72-
Ok(Url::parse(remote_data_dir)?)
73-
}
74-
}
51+
resolve_data_url(remote_data_dir.as_deref(), "gharchive")
7552
}
7653
}
7754

0 commit comments

Comments
 (0)