Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions vortex-bench/src/clickbench/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::fs;
use std::path::Path;

use anyhow::Result;
use reqwest::Client;
use url::Url;
use vortex::error::VortexExpect;

Expand Down Expand Up @@ -89,7 +88,7 @@ impl Benchmark for ClickBenchBenchmark {
}

let basepath = clickbench_flavor(self.flavor).to_data_path();
self.flavor.download(Client::default(), basepath).await?;
self.flavor.download(basepath).await?;

Ok(())
}
Expand Down
103 changes: 8 additions & 95 deletions vortex-bench/src/clickbench/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,22 @@ use std::fmt::Display;
use std::path::Path;
use std::str::FromStr;
use std::sync::LazyLock;
use std::time::Duration;

use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Schema;
use arrow_schema::TimeUnit;
use bytes::Bytes;
use clap::ValueEnum;
use futures::StreamExt;
use indicatif::ProgressBar;
use indicatif::ProgressStyle;
use reqwest::IntoUrl;
use serde::Deserialize;
use serde::Serialize;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
use tracing::info;
use tracing::warn;
use vortex::error::VortexExpect;

use crate::Format;
// Re-export for use by clickbench_benchmark
pub use crate::conversions::convert_parquet_directory_to_vortex;
use crate::idempotent_async;
use crate::datasets::data_downloads::download_data;

pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
use DataType::*;
Expand Down Expand Up @@ -190,40 +181,25 @@ impl Display for Flavor {

impl Flavor {
// TODO(joe): move these elsewhere.
pub async fn download(
&self,
client: reqwest::Client,
basepath: impl AsRef<Path>,
) -> anyhow::Result<()> {
pub async fn download(&self, basepath: impl AsRef<Path>) -> anyhow::Result<()> {
let basepath = basepath.as_ref();
match self {
Flavor::Single => {
let output_path = basepath.join(Format::Parquet.name()).join("hits.parquet");
idempotent_async(output_path.as_path(), |output_path| async move {
info!("Downloading single clickbench file");
let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet";
download_large_file(&client, url, &output_path).await?;
anyhow::Ok(())
})
.await?;
info!("Downloading single clickbench file");
let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet";
download_data(output_path, url).await?;
}
Flavor::Partitioned => {
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.

let mut tasks = (0_u32..100).map(|idx| {
let output_path = basepath.join(Format::Parquet.name()).join(format!("hits_{idx}.parquet"));
let client = client.clone();

idempotent_async(output_path, move|output_path| async move {
info!("Downloading file {idx}");
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
let body = retry_get(&client, url).await?;
let mut file = File::create(output_path).await?;
file.write_all(&body).await?;

anyhow::Ok(())
})
info!("Downloading file {idx}");
let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet");
download_data(output_path, url)
}).collect::<JoinSet<_>>();

while let Some(task) = tasks.join_next().await {
Expand All @@ -234,66 +210,3 @@ impl Flavor {
Ok(())
}
}

/// Downloads a large file with streaming and progress indication.
async fn download_large_file(
client: &reqwest::Client,
url: &str,
output_path: &Path,
) -> anyhow::Result<()> {
let response = client.get(url).send().await?.error_for_status()?;

let total_size = response.content_length().unwrap_or(0);

let progress = ProgressBar::new(total_size);
progress.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})",
)
.expect("valid template"),
);

let mut file = File::create(output_path).await?;
let mut stream = response.bytes_stream();

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
progress.inc(chunk.len() as u64);
}

progress.finish();
Ok(())
}

async fn retry_get(client: &reqwest::Client, url: impl IntoUrl) -> anyhow::Result<Bytes> {
let url = url.as_str();
let make_req = || async { client.get(url).send().await };

let mut body = None;

for attempt in 0..3 {
match make_req().await.and_then(|r| r.error_for_status()) {
Ok(r) => match r.bytes().await {
Ok(b) => {
body = Some(b);
break;
}
Err(e) => {
warn!("Request errored with {e}, retying for the {attempt} time");
}
},
Err(e) => {
warn!("Request errored with {e}, retying for the {attempt} time");
}
}

// Very basic backoff mechanism
tokio::time::sleep(Duration::from_secs(attempt + 1)).await;
}

match body {
Some(v) => Ok(v),
None => anyhow::bail!("Exahusted retry attempts for {url}"),
}
}
109 changes: 89 additions & 20 deletions vortex-bench/src/datasets/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,117 @@ use std::path::PathBuf;
use std::time::Duration;

use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use bzip2::read::BzDecoder;
use futures::StreamExt;
use indicatif::ProgressBar;
use indicatif::ProgressStyle;
use parking_lot::RwLock;
use reqwest::Client;
use reqwest::Response;
use tokio::fs::File as TokioFile;
use tokio::io::AsyncWriteExt;
use tracing::info;
use tracing::warn;

use crate::utils::file::idempotent;
use crate::utils::file::idempotent_async;

pub async fn download_data(fname: PathBuf, data_url: &str) -> Result<PathBuf> {
idempotent_async(&fname, async |path| {
info!(
"Downloading {} from {}",
fname.to_str().context("Failed to convert path to string")?,
data_url
);
let mut file = TokioFile::create(path)
async fn retry_get<F: Future<Output = Result<Response>>, R: Fn() -> F>(
make_req: R,
tmp_path: PathBuf,
) -> Result<()> {
const MAX_ATTEMPTS: u32 = 3;
let mut last_err: Option<Error> = None;
let progress = RwLock::new(None::<ProgressBar>);

let retry = async || -> Result<()> {
let mut file = TokioFile::create(tmp_path)
.await
.context("Failed to create file")?;
let mut response = Client::builder()
.read_timeout(Duration::from_secs(60))
.timeout(Duration::from_secs(60 * 15))
.build()
.context("Failed to build HTTP client")?
.get(data_url)
.send()
let response = make_req()
.await
.context("Failed to send HTTP request")?
.error_for_status()
.context("HTTP request returned error status")?;

while let Some(chunk) = response
.chunk()
.await
.context("Failed to read response chunk")?
{
*progress.write() = response.content_length().map(|total| {
let progress = ProgressBar::new(total);
progress.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})",
)
.expect("valid template"),
);
progress
});

let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
AsyncWriteExt::write_all(&mut file, &chunk)
.await
.context("Failed to write to file")?;
if let Some(p) = progress.write().as_mut() {
p.inc(chunk.len() as u64)
}
}

AsyncWriteExt::flush(&mut file).await?;
Ok(())
};

for attempt in 0..MAX_ATTEMPTS {
let outcome = retry.clone()().await;

match outcome {
Ok(_) => {
if let Some(p) = progress.write().take() {
p.finish_and_clear()
}
return Ok(());
}
Err(e) => {
if let Some(p) = progress.write().take() {
p.abandon()
}
last_err = Some(e)
}
}
let backoff = Duration::from_secs(1u64 << attempt);
warn!(
"download attempt {} failed; retrying in {:?}",
attempt + 1,
backoff
);
tokio::time::sleep(backoff).await;
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error")))
}

pub async fn download_data(fname: PathBuf, data_url: impl AsRef<str>) -> Result<PathBuf> {
let client = Client::builder()
.read_timeout(Duration::from_secs(60))
.timeout(Duration::from_secs(60 * 15))
.build()
.context("Failed to build HTTP client")?;

idempotent_async(&fname, async |path| {
let url = data_url.as_ref();
info!(
"Downloading {} from {}",
fname.to_str().context("Failed to convert path to string")?,
url
);
retry_get(
async || {
let res = client.get(url).send().await?;
Ok(res)
},
path,
)
.await
})
.await
}
Expand Down
10 changes: 5 additions & 5 deletions vortex-bench/src/utils/file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::fs::create_dir_all;
use std::fs;
use std::future::Future;
use std::path::Path;
use std::path::PathBuf;
Expand All @@ -25,10 +25,10 @@ pub fn idempotent<T, P: IdempotentPath + ?Sized>(
if !data_path.exists() {
// Ensure parent directory exists
if let Some(parent) = data_path.parent() {
create_dir_all(parent).context("Failed to create parent directories")?;
fs::create_dir_all(parent).context("Failed to create parent directories")?;
}
f(temp_path.as_path())?;
std::fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
}
Ok(data_path)
}
Expand All @@ -44,10 +44,10 @@ where
if !data_path.exists() {
// Ensure parent directory exists
if let Some(parent) = data_path.parent() {
create_dir_all(parent).context("Failed to create parent directories")?;
fs::create_dir_all(parent).context("Failed to create parent directories")?;
}
f(temp_path.clone()).await?;
std::fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
fs::rename(temp_path, &data_path).context("Failed to rename temp file")?;
}
Ok(data_path)
}
Expand Down
Loading
Loading