Skip to content

Commit 5e5475a

Browse files
authored
Unify benchmark file download logic (#7462)
Stop repeating same download logic all over codebase There's still slight variation of this logic in statpopgen vcf download Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 216499f commit 5e5475a

5 files changed

Lines changed: 104 additions & 208 deletions

File tree

vortex-bench/src/clickbench/benchmark.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::fs;
66
use std::path::Path;
77

88
use anyhow::Result;
9-
use reqwest::Client;
109
use url::Url;
1110
use vortex::error::VortexExpect;
1211

@@ -89,7 +88,7 @@ impl Benchmark for ClickBenchBenchmark {
8988
}
9089

9190
let basepath = clickbench_flavor(self.flavor).to_data_path();
92-
self.flavor.download(Client::default(), basepath).await?;
91+
self.flavor.download(basepath).await?;
9392

9493
Ok(())
9594
}

vortex-bench/src/clickbench/data.rs

Lines changed: 8 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,22 @@ use std::fmt::Display;
66
use std::path::Path;
77
use std::str::FromStr;
88
use std::sync::LazyLock;
9-
use std::time::Duration;
109

1110
use arrow_schema::DataType;
1211
use arrow_schema::Field;
1312
use arrow_schema::Schema;
1413
use arrow_schema::TimeUnit;
15-
use bytes::Bytes;
1614
use clap::ValueEnum;
17-
use futures::StreamExt;
18-
use indicatif::ProgressBar;
19-
use indicatif::ProgressStyle;
20-
use reqwest::IntoUrl;
2115
use serde::Deserialize;
2216
use serde::Serialize;
23-
use tokio::fs::File;
24-
use tokio::io::AsyncWriteExt;
2517
use tokio::task::JoinSet;
2618
use tracing::info;
27-
use tracing::warn;
2819
use vortex::error::VortexExpect;
2920

3021
use crate::Format;
3122
// Re-export for use by clickbench_benchmark
3223
pub use crate::conversions::convert_parquet_directory_to_vortex;
33-
use crate::idempotent_async;
24+
use crate::datasets::data_downloads::download_data;
3425

3526
pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
3627
use DataType::*;
@@ -190,40 +181,25 @@ impl Display for Flavor {
190181

191182
impl Flavor {
192183
// TODO(joe): move these elsewhere.
193-
pub async fn download(
194-
&self,
195-
client: reqwest::Client,
196-
basepath: impl AsRef<Path>,
197-
) -> anyhow::Result<()> {
184+
pub async fn download(&self, basepath: impl AsRef<Path>) -> anyhow::Result<()> {
198185
let basepath = basepath.as_ref();
199186
match self {
200187
Flavor::Single => {
201188
let output_path = basepath.join(Format::Parquet.name()).join("hits.parquet");
202-
idempotent_async(output_path.as_path(), |output_path| async move {
203-
info!("Downloading single clickbench file");
204-
let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet";
205-
download_large_file(&client, url, &output_path).await?;
206-
anyhow::Ok(())
207-
})
208-
.await?;
189+
info!("Downloading single clickbench file");
190+
let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet";
191+
download_data(output_path, url).await?;
209192
}
210193
Flavor::Partitioned => {
211194
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
212195
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
213196

214197
let mut tasks = (0_u32..100).map(|idx| {
215198
let output_path = basepath.join(Format::Parquet.name()).join(format!("hits_{idx}.parquet"));
216-
let client = client.clone();
217199

218-
idempotent_async(output_path, move|output_path| async move {
219-
info!("Downloading file {idx}");
220-
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
221-
let body = retry_get(&client, url).await?;
222-
let mut file = File::create(output_path).await?;
223-
file.write_all(&body).await?;
224-
225-
anyhow::Ok(())
226-
})
200+
info!("Downloading file {idx}");
201+
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
202+
download_data(output_path, url)
227203
}).collect::<JoinSet<_>>();
228204

229205
while let Some(task) = tasks.join_next().await {
@@ -234,66 +210,3 @@ impl Flavor {
234210
Ok(())
235211
}
236212
}
237-
238-
/// Downloads a large file with streaming and progress indication.
239-
async fn download_large_file(
240-
client: &reqwest::Client,
241-
url: &str,
242-
output_path: &Path,
243-
) -> anyhow::Result<()> {
244-
let response = client.get(url).send().await?.error_for_status()?;
245-
246-
let total_size = response.content_length().unwrap_or(0);
247-
248-
let progress = ProgressBar::new(total_size);
249-
progress.set_style(
250-
ProgressStyle::with_template(
251-
"[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})",
252-
)
253-
.expect("valid template"),
254-
);
255-
256-
let mut file = File::create(output_path).await?;
257-
let mut stream = response.bytes_stream();
258-
259-
while let Some(chunk) = stream.next().await {
260-
let chunk = chunk?;
261-
file.write_all(&chunk).await?;
262-
progress.inc(chunk.len() as u64);
263-
}
264-
265-
progress.finish();
266-
Ok(())
267-
}
268-
269-
async fn retry_get(client: &reqwest::Client, url: impl IntoUrl) -> anyhow::Result<Bytes> {
270-
let url = url.as_str();
271-
let make_req = || async { client.get(url).send().await };
272-
273-
let mut body = None;
274-
275-
for attempt in 0..3 {
276-
match make_req().await.and_then(|r| r.error_for_status()) {
277-
Ok(r) => match r.bytes().await {
278-
Ok(b) => {
279-
body = Some(b);
280-
break;
281-
}
282-
Err(e) => {
283-
warn!("Request errored with {e}, retying for the {attempt} time");
284-
}
285-
},
286-
Err(e) => {
287-
warn!("Request errored with {e}, retying for the {attempt} time");
288-
}
289-
}
290-
291-
// Very basic backoff mechanism
292-
tokio::time::sleep(Duration::from_secs(attempt + 1)).await;
293-
}
294-
295-
match body {
296-
Some(v) => Ok(v),
297-
None => anyhow::bail!("Exahusted retry attempts for {url}"),
298-
}
299-
}

vortex-bench/src/datasets/data_downloads.rs

Lines changed: 89 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,48 +8,117 @@ use std::path::PathBuf;
88
use std::time::Duration;
99

1010
use anyhow::Context;
11+
use anyhow::Error;
1112
use anyhow::Result;
1213
use bzip2::read::BzDecoder;
14+
use futures::StreamExt;
15+
use indicatif::ProgressBar;
16+
use indicatif::ProgressStyle;
17+
use parking_lot::RwLock;
1318
use reqwest::Client;
19+
use reqwest::Response;
1420
use tokio::fs::File as TokioFile;
1521
use tokio::io::AsyncWriteExt;
1622
use tracing::info;
23+
use tracing::warn;
1724

1825
use crate::utils::file::idempotent;
1926
use crate::utils::file::idempotent_async;
2027

21-
pub async fn download_data(fname: PathBuf, data_url: &str) -> Result<PathBuf> {
22-
idempotent_async(&fname, async |path| {
23-
info!(
24-
"Downloading {} from {}",
25-
fname.to_str().context("Failed to convert path to string")?,
26-
data_url
27-
);
28-
let mut file = TokioFile::create(path)
28+
async fn retry_get<F: Future<Output = Result<Response>>, R: Fn() -> F>(
29+
make_req: R,
30+
tmp_path: PathBuf,
31+
) -> Result<()> {
32+
const MAX_ATTEMPTS: u32 = 3;
33+
let mut last_err: Option<Error> = None;
34+
let progress = RwLock::new(None::<ProgressBar>);
35+
36+
let retry = async || -> Result<()> {
37+
let mut file = TokioFile::create(tmp_path)
2938
.await
3039
.context("Failed to create file")?;
31-
let mut response = Client::builder()
32-
.read_timeout(Duration::from_secs(60))
33-
.timeout(Duration::from_secs(60 * 15))
34-
.build()
35-
.context("Failed to build HTTP client")?
36-
.get(data_url)
37-
.send()
40+
let response = make_req()
3841
.await
3942
.context("Failed to send HTTP request")?
4043
.error_for_status()
4144
.context("HTTP request returned error status")?;
4245

43-
while let Some(chunk) = response
44-
.chunk()
45-
.await
46-
.context("Failed to read response chunk")?
47-
{
46+
*progress.write() = response.content_length().map(|total| {
47+
let progress = ProgressBar::new(total);
48+
progress.set_style(
49+
ProgressStyle::with_template(
50+
"[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})",
51+
)
52+
.expect("valid template"),
53+
);
54+
progress
55+
});
56+
57+
let mut stream = response.bytes_stream();
58+
while let Some(chunk) = stream.next().await {
59+
let chunk = chunk?;
4860
AsyncWriteExt::write_all(&mut file, &chunk)
4961
.await
5062
.context("Failed to write to file")?;
63+
if let Some(p) = progress.write().as_mut() {
64+
p.inc(chunk.len() as u64)
65+
}
5166
}
67+
68+
AsyncWriteExt::flush(&mut file).await?;
5269
Ok(())
70+
};
71+
72+
for attempt in 0..MAX_ATTEMPTS {
73+
let outcome = retry.clone()().await;
74+
75+
match outcome {
76+
Ok(_) => {
77+
if let Some(p) = progress.write().take() {
78+
p.finish_and_clear()
79+
}
80+
return Ok(());
81+
}
82+
Err(e) => {
83+
if let Some(p) = progress.write().take() {
84+
p.abandon()
85+
}
86+
last_err = Some(e)
87+
}
88+
}
89+
let backoff = Duration::from_secs(1u64 << attempt);
90+
warn!(
91+
"download attempt {} failed; retrying in {:?}",
92+
attempt + 1,
93+
backoff
94+
);
95+
tokio::time::sleep(backoff).await;
96+
}
97+
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error")))
98+
}
99+
100+
pub async fn download_data(fname: PathBuf, data_url: impl AsRef<str>) -> Result<PathBuf> {
101+
let client = Client::builder()
102+
.read_timeout(Duration::from_secs(60))
103+
.timeout(Duration::from_secs(60 * 15))
104+
.build()
105+
.context("Failed to build HTTP client")?;
106+
107+
idempotent_async(&fname, async |path| {
108+
let url = data_url.as_ref();
109+
info!(
110+
"Downloading {} from {}",
111+
fname.to_str().context("Failed to convert path to string")?,
112+
url
113+
);
114+
retry_get(
115+
async || {
116+
let res = client.get(url).send().await?;
117+
Ok(res)
118+
},
119+
path,
120+
)
121+
.await
53122
})
54123
.await
55124
}

vortex-bench/src/utils/file.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use std::fs::create_dir_all;
4+
use std::fs;
55
use std::future::Future;
66
use std::path::Path;
77
use std::path::PathBuf;
@@ -25,10 +25,10 @@ pub fn idempotent<T, P: IdempotentPath + ?Sized>(
2525
if !data_path.exists() {
2626
// Ensure parent directory exists
2727
if let Some(parent) = data_path.parent() {
28-
create_dir_all(parent).context("Failed to create parent directories")?;
28+
fs::create_dir_all(parent).context("Failed to create parent directories")?;
2929
}
3030
f(temp_path.as_path())?;
31-
std::fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
31+
fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
3232
}
3333
Ok(data_path)
3434
}
@@ -44,10 +44,10 @@ where
4444
if !data_path.exists() {
4545
// Ensure parent directory exists
4646
if let Some(parent) = data_path.parent() {
47-
create_dir_all(parent).context("Failed to create parent directories")?;
47+
fs::create_dir_all(parent).context("Failed to create parent directories")?;
4848
}
4949
f(temp_path.clone()).await?;
50-
std::fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
50+
fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
5151
}
5252
Ok(data_path)
5353
}

0 commit comments

Comments
 (0)