From 00b111da647f0c3e50aa9479c84f261428da4739 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Thu, 16 Apr 2026 10:26:32 -0400 Subject: [PATCH 1/3] unify download management for benchmarks Signed-off-by: Connor Tsui --- vortex-bench/src/clickbench/benchmark.rs | 28 +----- vortex-bench/src/clickbench/data.rs | 21 ++--- vortex-bench/src/datasets/data_downloads.rs | 94 +++++++++++++++++++-- vortex-bench/src/fineweb/mod.rs | 52 +----------- vortex-bench/src/public_bi.rs | 14 ++- vortex-bench/src/realnest/gharchive.rs | 27 +----- vortex-bench/src/utils/file.rs | 42 +++++++++ vortex-bench/src/vector_dataset/download.rs | 32 +++---- 8 files changed, 167 insertions(+), 143 deletions(-) diff --git a/vortex-bench/src/clickbench/benchmark.rs b/vortex-bench/src/clickbench/benchmark.rs index 2e197bef8e1..9cfafb8832e 100644 --- a/vortex-bench/src/clickbench/benchmark.rs +++ b/vortex-bench/src/clickbench/benchmark.rs @@ -7,13 +7,13 @@ use std::path::Path; use anyhow::Result; use url::Url; -use vortex::error::VortexExpect; use crate::Benchmark; use crate::BenchmarkDataset; use crate::IdempotentPath; use crate::TableSpec; use crate::clickbench::*; +use crate::utils::file::resolve_data_url; /// ClickBench benchmark implementation pub struct ClickBenchBenchmark { @@ -37,31 +37,7 @@ impl ClickBenchBenchmark { } fn create_data_url(remote_data_dir: &Option, flavor: Flavor) -> Result { - match remote_data_dir { - None => { - let basepath = format!("clickbench_{flavor}").to_data_path(); - Ok(Url::parse(&format!( - "file:{}/", - basepath.to_str().vortex_expect("path should be utf8") - ))?) - } - Some(remote_data_dir) => { - if !remote_data_dir.ends_with("/") { - tracing::warn!( - "Supply a --use-remote-data-dir argument which ends in a slash e.g. s3://vortex-bench-dev-eu/parquet/" - ); - } - tracing::info!( - concat!( - "Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\\n", - "If it does not, you should kill this command, locally generate the files (by running without\\n", - "--use-remote-data-dir) and upload data/clickbench/ to some remote location.", - ), - remote_data_dir, - ); - Ok(Url::parse(remote_data_dir)?) - } - } + resolve_data_url(remote_data_dir.as_deref(), &format!("clickbench_{flavor}")) } } diff --git a/vortex-bench/src/clickbench/data.rs b/vortex-bench/src/clickbench/data.rs index af11e468d45..d8d3d31d595 100644 --- a/vortex-bench/src/clickbench/data.rs +++ b/vortex-bench/src/clickbench/data.rs @@ -14,14 +14,15 @@ use arrow_schema::TimeUnit; use clap::ValueEnum; use serde::Deserialize; use serde::Serialize; -use tokio::task::JoinSet; use tracing::info; use vortex::error::VortexExpect; use crate::Format; // Re-export for use by clickbench_benchmark pub use crate::conversions::convert_parquet_directory_to_vortex; +use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY; use crate::datasets::data_downloads::download_data; +use crate::datasets::data_downloads::download_many; pub static HITS_SCHEMA: LazyLock = LazyLock::new(|| { use DataType::*; @@ -193,18 +194,14 @@ impl Flavor { Flavor::Partitioned => { // The clickbench-provided file is missing some higher-level type info, so we reprocess it // to add that info, see https://github.com/ClickHouse/ClickBench/issues/7. - - let mut tasks = (0_u32..100).map(|idx| { - let output_path = basepath.join(Format::Parquet.name()).join(format!("hits_{idx}.parquet")); - - info!("Downloading file {idx}"); + info!("Downloading 100 ClickBench parquet shards"); + let parquet_dir = basepath.join(Format::Parquet.name()); + let downloads = (0_u32..100).map(|idx| { + let output_path = parquet_dir.join(format!("hits_{idx}.parquet")); let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet"); - download_data(output_path, url) - }).collect::>(); - - while let Some(task) = tasks.join_next().await { - task??; - } + (output_path, url) + }); + download_many(downloads, DEFAULT_DOWNLOAD_CONCURRENCY).await?; } } Ok(()) diff --git a/vortex-bench/src/datasets/data_downloads.rs b/vortex-bench/src/datasets/data_downloads.rs index a603436b458..848e81b45b6 100644 --- a/vortex-bench/src/datasets/data_downloads.rs +++ b/vortex-bench/src/datasets/data_downloads.rs @@ -5,6 +5,7 @@ use std::fs::File; use std::io::Read; use std::io::Write; use std::path::PathBuf; +use std::sync::LazyLock; use std::time::Duration; use anyhow::Context; @@ -12,6 +13,7 @@ use anyhow::Error; use anyhow::Result; use bzip2::read::BzDecoder; use futures::StreamExt; +use futures::stream; use indicatif::ProgressBar; use indicatif::ProgressStyle; use parking_lot::RwLock; @@ -25,6 +27,29 @@ use tracing::warn; use crate::utils::file::idempotent; use crate::utils::file::idempotent_async; +/// Default concurrency limit for bulk downloads. Keeps us polite to the upstream while still +/// saturating a typical 10 Gb link on a parquet-per-shard benchmark. +pub const DEFAULT_DOWNLOAD_CONCURRENCY: usize = 16; + +/// Shared HTTP client used by every dataset download. +/// +/// Reusing a single client gives us connection pooling, DNS caching, and consistent timeouts +/// across all callers. Each benchmark used to build its own `reqwest::Client` on every download, +/// which both wasted TLS handshakes and made it hard to reason about total in-flight concurrency. +static HTTP_CLIENT: LazyLock = LazyLock::new(|| { + Client::builder() + .read_timeout(Duration::from_secs(60)) + .timeout(Duration::from_secs(60 * 15)) + .build() + .expect("failed to build shared benchmark HTTP client") +}); + +/// Access the shared HTTP client. Exposed for callers that need custom request shapes +/// (e.g. streaming VCF parsing) while still benefitting from pooled connections. +pub fn http_client() -> &'static Client { + &HTTP_CLIENT +} + async fn retry_get>, R: Fn() -> F>( make_req: R, tmp_path: PathBuf, @@ -97,12 +122,13 @@ async fn retry_get>, R: Fn() -> F>( Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error"))) } +/// Idempotently download a single URL to `fname`. +/// +/// Uses the shared HTTP client, a 3-attempt exponential backoff retry loop, and an `indicatif` +/// progress bar. If `fname` already exists, the download is skipped. +#[tracing::instrument(skip_all, fields(url = %data_url.as_ref(), path = %fname.display()))] pub async fn download_data(fname: PathBuf, data_url: impl AsRef) -> Result { - let client = Client::builder() - .read_timeout(Duration::from_secs(60)) - .timeout(Duration::from_secs(60 * 15)) - .build() - .context("Failed to build HTTP client")?; + let client = http_client(); idempotent_async(&fname, async |path| { let url = data_url.as_ref(); @@ -123,6 +149,64 @@ pub async fn download_data(fname: PathBuf, data_url: impl AsRef) -> Result< .await } +/// Idempotently download many `(path, url)` pairs with bounded parallelism. +/// +/// This is the preferred way to fetch multi-shard datasets (ClickBench partitioned, vector +/// dataset train shards, Public BI tables, etc.) because it: +/// +/// - caps in-flight HTTP requests at `max_concurrency` so we don't overwhelm the upstream +/// or our own network stack, +/// - reuses the shared HTTP client across every shard, +/// - short-circuits on the first error (the remaining in-flight downloads are dropped when +/// the returned future is dropped), +/// - returns the resolved on-disk paths in the same order they were submitted. +/// +/// Pass `0` as `max_concurrency` to use [`DEFAULT_DOWNLOAD_CONCURRENCY`]. +#[tracing::instrument(skip_all, fields(count = tracing::field::Empty, max_concurrency))] +pub async fn download_many(downloads: I, max_concurrency: usize) -> Result> +where + I: IntoIterator, + I::Item: IntoDownload, +{ + let downloads: Vec<(PathBuf, String)> = downloads + .into_iter() + .map(IntoDownload::into_download) + .collect(); + tracing::Span::current().record("count", downloads.len()); + + let concurrency = if max_concurrency == 0 { + DEFAULT_DOWNLOAD_CONCURRENCY + } else { + max_concurrency + }; + + let results: Vec> = stream::iter(downloads) + .map(|(path, url)| async move { download_data(path, url).await }) + .buffered(concurrency) + .collect() + .await; + + results.into_iter().collect() +} + +/// Anything that can be described as a `(target_path, url)` pair accepted by [`download_many`]. +pub trait IntoDownload { + fn into_download(self) -> (PathBuf, String); +} + +impl IntoDownload for (PathBuf, String) { + fn into_download(self) -> (PathBuf, String) { + self + } +} + +impl IntoDownload for (PathBuf, &str) { + fn into_download(self) -> (PathBuf, String) { + (self.0, self.1.to_owned()) + } +} + +#[tracing::instrument(skip_all, fields(input = %input_path.display(), output = %output_path.display()))] pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> Result { idempotent(&output_path, |path| { info!( diff --git a/vortex-bench/src/fineweb/mod.rs b/vortex-bench/src/fineweb/mod.rs index 743e7737313..5acb57f64e9 100644 --- a/vortex-bench/src/fineweb/mod.rs +++ b/vortex-bench/src/fineweb/mod.rs @@ -3,15 +3,14 @@ use std::path::PathBuf; -use tokio::io::AsyncWriteExt; use tracing::info; use url::Url; use crate::Benchmark; use crate::BenchmarkDataset; -use crate::IdempotentPath; use crate::TableSpec; -use crate::idempotent_async; +use crate::datasets::data_downloads::download_data; +use crate::utils::file::resolve_data_url; /// URL to the sample file const SAMPLE_URL: &str = "https://huggingface.co/datasets/HuggingFaceFW/fineweb/resolve/v1.4.0/sample/10BT/001_00000.parquet"; @@ -56,30 +55,7 @@ impl FinewebBenchmark { } fn create_data_url(remote_data_dir: &Option) -> anyhow::Result { - match remote_data_dir { - None => { - let data_dir = "fineweb".to_data_path(); - Url::from_directory_path(&data_dir).map_err(|_| { - anyhow::anyhow!("Failed to create URL from directory path: {:?}", &data_dir) - }) - } - Some(remote_data_dir) => { - if !remote_data_dir.ends_with("/") { - tracing::warn!( - "Supply a --use-remote-data-dir argument which ends in a slash e.g. s3://vortex-bench-dev-eu/develop/12345/fineweb/" - ); - } - tracing::info!( - concat!( - "Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\n", - "If it does not, you should kill this command, locally generate the files (by running without\n", - "--use-remote-data-dir) and upload data/fineweb/ to some remote location.", - ), - remote_data_dir, - ); - Ok(Url::parse(remote_data_dir)?) - } - } + resolve_data_url(remote_data_dir.as_deref(), "fineweb") } } @@ -104,27 +80,7 @@ impl Benchmark for FinewebBenchmark { return Ok(()); } - let parquet = idempotent_async(&self.parquet_path()?, |parquet_path| async move { - info!("Downloading FineWeb Parquet source from HuggingFace"); - - let response = reqwest::get(SAMPLE_URL) - .await? - .error_for_status() - .map_err(|err| { - anyhow::anyhow!("error fetching fineweb sample from HuggingFace: {err}") - })?; - - let bytes = response.bytes().await?; - let mut w = tokio::fs::File::create(parquet_path).await?; - - w.write_all(&bytes).await?; - - w.flush().await?; - - Ok(()) - }) - .await?; - + let parquet = download_data(self.parquet_path()?, SAMPLE_URL).await?; info!("fineweb base data generated in {}", parquet.display()); Ok(()) diff --git a/vortex-bench/src/public_bi.rs b/vortex-bench/src/public_bi.rs index 3602cd2cd33..29c0d01a960 100644 --- a/vortex-bench/src/public_bi.rs +++ b/vortex-bench/src/public_bi.rs @@ -42,8 +42,9 @@ use crate::SESSION; use crate::TableSpec; use crate::conversions::parquet_to_vortex_chunks; use crate::datasets::Dataset; +use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY; use crate::datasets::data_downloads::decompress_bz2; -use crate::datasets::data_downloads::download_data; +use crate::datasets::data_downloads::download_many; use crate::idempotent_async; use crate::workspace_root; @@ -289,16 +290,13 @@ pub struct PBIData { impl PBIData { async fn download_bzips(&self) -> anyhow::Result<()> { - let download_futures = self.tables.iter().map(|table| { - download_data( + let downloads = self.tables.iter().map(|table| { + ( self.get_file_path(&table.name, FileType::CsvBzip2), - table.data_url.as_str(), + table.data_url.as_str().to_owned(), ) }); - let results = join_all(download_futures).await; - for result in results { - result?; - } + download_many(downloads, DEFAULT_DOWNLOAD_CONCURRENCY).await?; Ok(()) } diff --git a/vortex-bench/src/realnest/gharchive.rs b/vortex-bench/src/realnest/gharchive.rs index 154cf945185..86b981a9fbe 100644 --- a/vortex-bench/src/realnest/gharchive.rs +++ b/vortex-bench/src/realnest/gharchive.rs @@ -14,10 +14,10 @@ use url::Url; use crate::Benchmark; use crate::BenchmarkDataset; -use crate::IdempotentPath; use crate::TableSpec; use crate::idempotent; use crate::idempotent_async; +use crate::utils::file::resolve_data_url; /// Template URL for raw JSON dataset fn raw_json_url(hour: usize) -> String { @@ -48,30 +48,7 @@ impl GithubArchiveBenchmark { } fn create_data_url(remote_data_dir: &Option) -> anyhow::Result { - match remote_data_dir { - None => { - let data_dir = "gharchive".to_data_path(); - Url::from_directory_path(&data_dir).map_err(|_| { - anyhow::anyhow!("Failed to create URL from directory path: {:?}", &data_dir) - }) - } - Some(remote_data_dir) => { - if !remote_data_dir.ends_with("/") { - tracing::warn!( - "Supply a --use-remote-data-dir argument which ends in a slash e.g. s3://vortex-bench-dev-eu/develop/12345/gharchive/" - ); - } - tracing::info!( - concat!( - "Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\n", - "If it does not, you should kill this command, locally generate the files (by running without\n", - "--use-remote-data-dir) and upload data/gharchive/ to some remote location.", - ), - remote_data_dir, - ); - Ok(Url::parse(remote_data_dir)?) - } - } + resolve_data_url(remote_data_dir.as_deref(), "gharchive") } } diff --git a/vortex-bench/src/utils/file.rs b/vortex-bench/src/utils/file.rs index b6d202bb29a..c8916d68398 100644 --- a/vortex-bench/src/utils/file.rs +++ b/vortex-bench/src/utils/file.rs @@ -110,6 +110,48 @@ impl IdempotentPath for &Path { } } +/// Resolve the `--use-remote-data-dir` CLI option to a `Url` for a named dataset. +/// +/// When `remote_data_dir` is `None`, returns a `file://` URL pointing at the dataset's local cache +/// directory (`//`). +/// +/// When `remote_data_dir` is `Some(...)`, parses it as a remote URL (typically `s3://` or `gs://`). +/// The user must have pre-uploaded the expected data layout; a warning is logged if the URL does +/// not end in `/`, and an informational message describes the expected layout. +/// +/// This helper replaces the boilerplate `create_data_url()` that used to be duplicated across every +/// benchmark that supports remote data directories (ClickBench, Fineweb, GhArchive, ...). +pub fn resolve_data_url(remote_data_dir: Option<&str>, local_subdir: &str) -> Result { + match remote_data_dir { + None => { + let data_dir = data_dir().join(local_subdir); + Url::from_directory_path(&data_dir).map_err(|_| { + anyhow::anyhow!("Failed to create URL from directory path: {:?}", &data_dir) + }) + } + Some(remote_data_dir) => { + if !remote_data_dir.ends_with('/') { + tracing::warn!( + "Supply a --use-remote-data-dir argument which ends in a slash \ + e.g. s3://vortex-bench-dev-eu/develop/12345/{}/", + local_subdir, + ); + } + tracing::info!( + concat!( + "Assuming data already exists at this remote (e.g. S3, GCS) URL: {}.\n", + "If it does not, you should kill this command, locally generate the files ", + "(by running without\n", + "--use-remote-data-dir) and upload data/{}/ to some remote location.", + ), + remote_data_dir, + local_subdir, + ); + Ok(Url::parse(remote_data_dir)?) + } + } +} + /// Convert a URL scheme to a storage type string /// /// Maps URL schemes (s3, gcs, file) to storage type identifiers diff --git a/vortex-bench/src/vector_dataset/download.rs b/vortex-bench/src/vector_dataset/download.rs index 228c51db771..e8a871ac52a 100644 --- a/vortex-bench/src/vector_dataset/download.rs +++ b/vortex-bench/src/vector_dataset/download.rs @@ -17,11 +17,10 @@ use std::path::PathBuf; use anyhow::Context; use anyhow::Result; -use tokio::task::JoinSet; -use tracing::info; +use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY; use crate::datasets::data_downloads::download_data; -use crate::utils::file::idempotent_async; +use crate::datasets::data_downloads::download_many; use crate::vector_dataset::catalog::VectorDataset; use crate::vector_dataset::layout::LayoutSpec; use crate::vector_dataset::layout::TrainLayout; @@ -95,28 +94,23 @@ pub struct DatasetPaths { /// This has idempotent semantics, so files already present on disk are skipped, and re-runs only /// pay for new files. /// -/// Train shards download in parallel using a shared HTTP client; the small `test.parquet` and -/// `neighbors.parquet` files use the simple [`download_data`] helper. +/// Train shards download via [`download_many`] with bounded parallelism (up to +/// [`DEFAULT_DOWNLOAD_CONCURRENCY`]); the small `test.parquet` and `neighbors.parquet` files use +/// the simple [`download_data`] helper. All HTTP requests share a single pooled client. pub async fn download(ds: VectorDataset, layout: TrainLayout) -> Result { let spec = ds.validate_layout(layout)?; let urls = train_urls(ds, spec); let train_targets = paths::train_files(ds, layout, spec.num_files()); debug_assert_eq!(urls.len(), train_targets.len()); - let mut tasks: JoinSet> = JoinSet::new(); - for (url, target) in urls.into_iter().zip(train_targets.iter().cloned()) { - tasks.spawn(async move { - idempotent_async(target, |tmp| async move { - info!("downloading {}", url); - download_data(tmp, &url).await - }) - .await?; - Ok(()) - }); - } - while let Some(joined) = tasks.join_next().await { - joined.context("train download task panicked")??; - } + let train_downloads: Vec<(PathBuf, String)> = train_targets + .iter() + .cloned() + .zip(urls.into_iter()) + .collect(); + download_many(train_downloads, DEFAULT_DOWNLOAD_CONCURRENCY) + .await + .with_context(|| format!("download train shards for {}", ds.name()))?; let test = download_data(paths::test_path(ds, layout), &test_url(ds)) .await From 1f0b7098cefd9df8d6b897b52dcb5727795a36d6 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Thu, 16 Apr 2026 11:36:52 -0400 Subject: [PATCH 2/3] better progress bar and download workers Signed-off-by: Connor Tsui --- vortex-bench/src/datasets/data_downloads.rs | 620 +++++++++++++++----- 1 file changed, 467 insertions(+), 153 deletions(-) diff --git a/vortex-bench/src/datasets/data_downloads.rs b/vortex-bench/src/datasets/data_downloads.rs index 848e81b45b6..70ffa2a16f7 100644 --- a/vortex-bench/src/datasets/data_downloads.rs +++ b/vortex-bench/src/datasets/data_downloads.rs @@ -1,11 +1,15 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cmp::Ordering; use std::fs::File; -use std::io::Read; -use std::io::Write; +use std::io; +use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use std::sync::LazyLock; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering as AtomicOrdering; use std::time::Duration; use anyhow::Context; @@ -14,152 +18,71 @@ use anyhow::Result; use bzip2::read::BzDecoder; use futures::StreamExt; use futures::stream; +use indicatif::MultiProgress; use indicatif::ProgressBar; use indicatif::ProgressStyle; -use parking_lot::RwLock; +use parking_lot::Mutex; use reqwest::Client; -use reqwest::Response; use tokio::fs::File as TokioFile; use tokio::io::AsyncWriteExt; +use tokio::sync::OwnedSemaphorePermit; +use tokio::sync::Semaphore; use tracing::info; use tracing::warn; use crate::utils::file::idempotent; use crate::utils::file::idempotent_async; -/// Default concurrency limit for bulk downloads. Keeps us polite to the upstream while still -/// saturating a typical 10 Gb link on a parquet-per-shard benchmark. -pub const DEFAULT_DOWNLOAD_CONCURRENCY: usize = 16; +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Public API +//////////////////////////////////////////////////////////////////////////////////////////////////// -/// Shared HTTP client used by every dataset download. -/// -/// Reusing a single client gives us connection pooling, DNS caching, and consistent timeouts -/// across all callers. Each benchmark used to build its own `reqwest::Client` on every download, -/// which both wasted TLS handshakes and made it hard to reason about total in-flight concurrency. -static HTTP_CLIENT: LazyLock = LazyLock::new(|| { - Client::builder() - .read_timeout(Duration::from_secs(60)) - .timeout(Duration::from_secs(60 * 15)) - .build() - .expect("failed to build shared benchmark HTTP client") -}); +/// Default concurrency limit for bulk downloads. Keeps us polite to the upstream while +/// still saturating a typical 10 Gb link on a parquet-per-shard benchmark. +pub const DEFAULT_DOWNLOAD_CONCURRENCY: usize = 16; -/// Access the shared HTTP client. Exposed for callers that need custom request shapes -/// (e.g. streaming VCF parsing) while still benefitting from pooled connections. -pub fn http_client() -> &'static Client { - &HTTP_CLIENT +/// Anything that can be described as a `(target_path, url)` pair accepted by +/// [`download_many`]. +pub trait IntoDownload { + fn into_download(self) -> (PathBuf, String); } -async fn retry_get>, R: Fn() -> F>( - make_req: R, - tmp_path: PathBuf, -) -> Result<()> { - const MAX_ATTEMPTS: u32 = 3; - let mut last_err: Option = None; - let progress = RwLock::new(None::); - - let retry = async || -> Result<()> { - let mut file = TokioFile::create(tmp_path) - .await - .context("Failed to create file")?; - let response = make_req() - .await - .context("Failed to send HTTP request")? - .error_for_status() - .context("HTTP request returned error status")?; - - *progress.write() = response.content_length().map(|total| { - let progress = ProgressBar::new(total); - progress.set_style( - ProgressStyle::with_template( - "[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})", - ) - .expect("valid template"), - ); - progress - }); - - let mut stream = response.bytes_stream(); - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - AsyncWriteExt::write_all(&mut file, &chunk) - .await - .context("Failed to write to file")?; - if let Some(p) = progress.write().as_mut() { - p.inc(chunk.len() as u64) - } - } - - AsyncWriteExt::flush(&mut file).await?; - Ok(()) - }; - - for attempt in 0..MAX_ATTEMPTS { - let outcome = retry.clone()().await; - - match outcome { - Ok(_) => { - if let Some(p) = progress.write().take() { - p.finish_and_clear() - } - return Ok(()); - } - Err(e) => { - if let Some(p) = progress.write().take() { - p.abandon() - } - last_err = Some(e) - } - } - let backoff = Duration::from_secs(1u64 << attempt); - warn!( - "download attempt {} failed; retrying in {:?}", - attempt + 1, - backoff - ); - tokio::time::sleep(backoff).await; +impl IntoDownload for (P, S) +where + P: Into, + S: Into, +{ + fn into_download(self) -> (PathBuf, String) { + (self.0.into(), self.1.into()) } - Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error"))) } /// Idempotently download a single URL to `fname`. /// -/// Uses the shared HTTP client, a 3-attempt exponential backoff retry loop, and an `indicatif` -/// progress bar. If `fname` already exists, the download is skipped. +/// Uses the shared HTTP client, a 3-attempt exponential backoff retry loop with jitter, +/// and an [`indicatif::ProgressBar`]. If `fname` already exists, the download is +/// skipped. #[tracing::instrument(skip_all, fields(url = %data_url.as_ref(), path = %fname.display()))] pub async fn download_data(fname: PathBuf, data_url: impl AsRef) -> Result { - let client = http_client(); - - idempotent_async(&fname, async |path| { - let url = data_url.as_ref(); - info!( - "Downloading {} from {}", - fname.to_str().context("Failed to convert path to string")?, - url - ); - retry_get( - async || { - let res = client.get(url).send().await?; - Ok(res) - }, - path, - ) - .await - }) - .await + download_one(fname, data_url.as_ref(), None).await } /// Idempotently download many `(path, url)` pairs with bounded parallelism. /// -/// This is the preferred way to fetch multi-shard datasets (ClickBench partitioned, vector -/// dataset train shards, Public BI tables, etc.) because it: +/// This is the preferred way to fetch multi-shard datasets (ClickBench partitioned, +/// vector dataset train shards, Public BI tables, etc.) because it: /// -/// - caps in-flight HTTP requests at `max_concurrency` so we don't overwhelm the upstream -/// or our own network stack, +/// - caps in-flight HTTP requests at `max_concurrency` so we don't overwhelm the +/// upstream or our own network stack, /// - reuses the shared HTTP client across every shard, -/// - short-circuits on the first error (the remaining in-flight downloads are dropped when -/// the returned future is dropped), -/// - returns the resolved on-disk paths in the same order they were submitted. +/// - renders a top-of-block `N/total` bar plus a fixed number of reusable slot bars via +/// a shared [`MultiProgress`]: the terminal block size stays constant for the entire +/// run, so nothing "jumps" as shards cycle, +/// - keeps the worker pool continuously full via `buffer_unordered`: as soon as any +/// shard finishes, the next queued shard reuses the freed slot, +/// - short-circuits on the first error (the remaining in-flight downloads are dropped +/// when the returned future is dropped), +/// - returns the resolved on-disk paths in completion order (not submission order). /// /// Pass `0` as `max_concurrency` to use [`DEFAULT_DOWNLOAD_CONCURRENCY`]. #[tracing::instrument(skip_all, fields(count = tracing::field::Empty, max_concurrency))] @@ -174,64 +97,455 @@ where .collect(); tracing::Span::current().record("count", downloads.len()); + if downloads.is_empty() { + return Ok(Vec::new()); + } + let concurrency = if max_concurrency == 0 { DEFAULT_DOWNLOAD_CONCURRENCY } else { max_concurrency }; + let num_slots = downloads.len().min(concurrency); + + let batch = BatchProgress::new(downloads.len() as u64, num_slots, num_slots); let results: Vec> = stream::iter(downloads) - .map(|(path, url)| async move { download_data(path, url).await }) - .buffered(concurrency) + .map(|(path, url)| { + let batch = batch.clone(); + async move { + let result = download_one(path, &url, Some(&batch)).await; + if result.is_ok() { + batch.advance(); + } + result + } + }) + .buffer_unordered(num_slots) .collect() .await; - results.into_iter().collect() -} - -/// Anything that can be described as a `(target_path, url)` pair accepted by [`download_many`]. -pub trait IntoDownload { - fn into_download(self) -> (PathBuf, String); -} + batch.finish(); -impl IntoDownload for (PathBuf, String) { - fn into_download(self) -> (PathBuf, String) { - self - } -} - -impl IntoDownload for (PathBuf, &str) { - fn into_download(self) -> (PathBuf, String) { - (self.0, self.1.to_owned()) - } + results.into_iter().collect() } +/// Idempotently decompress a bzip2 file into `output_path`, streaming the decompressed bytes +/// straight to disk so memory stays bounded. +/// +/// This is used for the public BI dataset. #[tracing::instrument(skip_all, fields(input = %input_path.display(), output = %output_path.display()))] pub fn decompress_bz2(input_path: PathBuf, output_path: PathBuf) -> Result { idempotent(&output_path, |path| { info!( "Decompressing bzip from {} to {}", - input_path - .to_str() - .context("Failed to convert input path to string")?, - output_path - .to_str() - .context("Failed to convert output path to string")? + input_path.display(), + output_path.display() ); let input_file = File::open(&input_path) .with_context(|| format!("Failed to open input file: {:?}", input_path))?; let mut decoder = BzDecoder::new(input_file); - let mut buffer = Vec::new(); - decoder - .read_to_end(&mut buffer) - .context("Failed to decompress bzip2 data")?; - let mut output_file = File::create(path) .with_context(|| format!("Failed to create output file: {:?}", path))?; - output_file - .write_all(&buffer) - .context("Failed to write decompressed data")?; + io::copy(&mut decoder, &mut output_file).context("Failed to decompress bzip2 stream")?; Ok(output_path.clone()) }) } + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Shared HTTP client +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Shared HTTP client used by every dataset download. +/// +/// Reusing a single client gives us connection pooling, DNS caching, and consistent +/// timeouts across all callers. Each benchmark used to build its own +/// [`reqwest::Client`] on every download, which both wasted TLS handshakes and made it +/// hard to reason about total in-flight concurrency. +static HTTP_CLIENT: LazyLock = LazyLock::new(|| { + Client::builder() + .read_timeout(Duration::from_secs(60)) + .timeout(Duration::from_secs(60 * 15)) + .build() + .expect("failed to build shared benchmark HTTP client") +}); + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Progress-bar templates +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Template for a slot that currently holds no download. `{msg}` with an empty message +/// renders as a blank line while still reserving the terminal row. +const IDLE_TEMPLATE: &str = "{msg}"; + +/// Template for a slot that has acquired a download but has not yet received response +/// headers. Shows the filename plus an animated spinner so the user can see we are +/// still making forward progress during TLS + request + first-byte latency. +const CONNECTING_TEMPLATE: &str = "{prefix:>28!} {spinner} connecting..."; + +/// Template for an active download when the response advertised a `Content-Length`. +const KNOWN_SIZE_TEMPLATE: &str = + "{prefix:>28!} [{bar:30.cyan/blue}] {bytes:>9}/{total_bytes:>9} ({bytes_per_sec})"; + +/// Template for an active download when the response size is unknown. +const UNKNOWN_SIZE_TEMPLATE: &str = "{prefix:>28!} {spinner} {bytes} ({bytes_per_sec})"; + +/// Template for the top-of-block `N/total` summary bar rendered by [`download_many`]. +const SHARDS_TEMPLATE: &str = "[{elapsed_precise}] shards [{bar:30.green/white}] {pos}/{len}"; + +/// How often slot spinners redraw. Fast enough to feel alive; slow enough that stderr +/// writes sneaking past `MultiProgress` do not constantly fight for cursor position. +const SLOT_TICK: Duration = Duration::from_millis(80); + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Batch download internals +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Shared rendering state for a batched download. +/// +/// Layout is built once at construction: the shards bar is registered first (top row), +/// then `num_slots` slot bars are registered below it. None of these are ever added to +/// or removed from the [`MultiProgress`] again. The block size on the terminal is +/// exactly `num_slots + 1` rows for the entire run of [`download_many`]. +/// +/// Per-download lifecycle reuses a single slot bar by swapping its style between an +/// idle placeholder and the active progress-bar / spinner variants. A +/// [`tokio::sync::Semaphore`] gates how many slots can be in use at any instant, which +/// lets a future controller adjust concurrency at runtime via +/// [`BatchProgress::set_max_in_flight`] without ever touching the MP layout. +#[derive(Clone)] +struct BatchProgress { + inner: Arc, +} + +struct BatchInner { + shards_bar: ProgressBar, + free: Mutex>, + in_flight: Arc, + current_in_flight: AtomicUsize, + num_slots: usize, + // The MP is kept alive alongside the Arc so bars stay registered and rendered. + // Once the last BatchProgress clone drops, the MP drops and clears the block. + _mp: MultiProgress, +} + +impl BatchProgress { + fn new(total: u64, num_slots: usize, initial_in_flight: usize) -> Self { + let initial_in_flight = initial_in_flight.min(num_slots); + let mp = MultiProgress::new(); + + let shards_bar = mp.add(ProgressBar::new(total)); + shards_bar + .set_style(ProgressStyle::with_template(SHARDS_TEMPLATE).expect("valid template")); + + let idle_style = ProgressStyle::with_template(IDLE_TEMPLATE).expect("valid template"); + let mut slots = Vec::with_capacity(num_slots); + for _ in 0..num_slots { + let bar = mp.add(ProgressBar::new(0)); + bar.set_style(idle_style.clone()); + bar.set_message(""); + bar.enable_steady_tick(SLOT_TICK); + slots.push(bar); + } + + let inner = BatchInner { + shards_bar, + free: Mutex::new(slots), + in_flight: Arc::new(Semaphore::new(initial_in_flight)), + current_in_flight: AtomicUsize::new(initial_in_flight), + num_slots, + _mp: mp, + }; + Self { + inner: Arc::new(inner), + } + } + + /// Wait for an in-flight permit, then claim an idle slot and switch it to the + /// [`CONNECTING_TEMPLATE`] style. The returned guard releases both the permit and + /// the slot on drop. + async fn acquire(&self, prefix: &str) -> SlotGuard { + let permit = Arc::clone(&self.inner.in_flight) + .acquire_owned() + .await + .expect("batch semaphore is never closed while a download is in flight"); + let bar = self + .inner + .free + .lock() + .pop() + .expect("slot free list invariant broken: permits outnumber pre-allocated slots"); + bar.set_style(ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template")); + bar.set_prefix(prefix.to_owned()); + bar.set_message(""); + bar.set_length(0); + bar.reset(); + SlotGuard { + bar, + owner: Arc::clone(&self.inner), + _permit: permit, + } + } + + fn advance(&self) { + self.inner.shards_bar.inc(1); + } + + fn finish(&self) { + self.inner.shards_bar.finish_and_clear(); + } + + /// Adjust how many downloads may run concurrently. Clamped to the pre-allocated + /// slot count. Raising the limit returns immediately; lowering spawns a background + /// task that acquires and forgets the delta so the limit takes effect as active + /// downloads complete naturally, never interrupting an in-flight transfer. + /// + /// The mechanism is in place but no policy currently calls it. A future adaptive + /// controller (error-rate backoff, throughput watchdog, explicit CLI flag) can drop + /// in without any further changes to this module. + #[allow(dead_code)] + pub(crate) fn set_max_in_flight(&self, target: usize) { + let target = target.min(self.inner.num_slots); + let prev = self + .inner + .current_in_flight + .swap(target, AtomicOrdering::Relaxed); + match target.cmp(&prev) { + Ordering::Greater => { + self.inner.in_flight.add_permits(target - prev); + } + Ordering::Less => { + let delta = u32::try_from(prev - target).expect("delta fits in u32"); + let sem = Arc::clone(&self.inner.in_flight); + tokio::spawn(async move { + if let Ok(permit) = sem.acquire_many_owned(delta).await { + permit.forget(); + } + }); + } + Ordering::Equal => {} + } + } +} + +/// RAII handle for a borrowed slot bar. Drives the bar through its active lifecycle and +/// resets it back to the idle placeholder on drop. +struct SlotGuard { + bar: ProgressBar, + owner: Arc, + _permit: OwnedSemaphorePermit, +} + +impl SlotGuard { + fn activate_known(&self, total: u64) { + self.bar + .set_style(ProgressStyle::with_template(KNOWN_SIZE_TEMPLATE).expect("valid template")); + self.bar.set_length(total); + self.bar.reset(); + } + + fn activate_unknown(&self) { + self.bar.set_style( + ProgressStyle::with_template(UNKNOWN_SIZE_TEMPLATE).expect("valid template"), + ); + self.bar.set_length(0); + self.bar.reset(); + } + + fn inc(&self, n: u64) { + self.bar.inc(n); + } + + fn reset_for_retry(&self) { + self.bar + .set_style(ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template")); + self.bar.set_length(0); + self.bar.reset(); + } +} + +impl Drop for SlotGuard { + fn drop(&mut self) { + let bar = self.bar.clone(); + bar.set_style(ProgressStyle::with_template(IDLE_TEMPLATE).expect("valid template")); + bar.set_prefix(""); + bar.set_message(""); + bar.set_length(0); + bar.reset(); + self.owner.free.lock().push(bar); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Core download implementation +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Core download implementation shared by [`download_data`] and [`download_many`]. +/// +/// When `batch` is `Some`, the download reuses one of the pre-allocated slot bars from +/// the batch's [`MultiProgress`]. When `batch` is `None` the download renders its own +/// standalone bar. +async fn download_one(fname: PathBuf, url: &str, batch: Option<&BatchProgress>) -> Result { + let display_name = fname + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("") + .to_owned(); + idempotent_async(&fname, async move |tmp_path| { + retry_get(&HTTP_CLIENT, url, &tmp_path, &display_name, batch).await + }) + .await +} + +/// Perform an HTTP GET into `tmp_path`, retrying up to three times with exponential +/// backoff and a small jitter to avoid lockstep retries across concurrent shards. A +/// partial temp file from an exhausted retry loop is removed before returning the final +/// error. +/// +/// When `batch` is `Some`, a pre-allocated slot bar is reused across retries. When +/// `batch` is `None`, a standalone [`ProgressBar`] is created and cleared at the end. +async fn retry_get( + client: &Client, + url: &str, + tmp_path: &Path, + display_name: &str, + batch: Option<&BatchProgress>, +) -> Result<()> { + const MAX_ATTEMPTS: u32 = 3; + let mut last_err: Option = None; + + let slot = match batch { + Some(b) => Some(b.acquire(display_name).await), + None => None, + }; + let standalone = slot.is_none().then(|| new_standalone_bar(display_name)); + + for attempt in 0..MAX_ATTEMPTS { + if attempt > 0 { + reset_progress_for_retry(slot.as_ref(), standalone.as_ref()); + } + let outcome: Result<()> = async { + let mut file = TokioFile::create(tmp_path) + .await + .context("Failed to create file")?; + let response = client + .get(url) + .send() + .await + .context("Failed to send HTTP request")? + .error_for_status() + .context("HTTP request returned error status")?; + + activate_progress( + slot.as_ref(), + standalone.as_ref(), + response.content_length(), + ); + + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + AsyncWriteExt::write_all(&mut file, &chunk) + .await + .context("Failed to write to file")?; + advance_progress(slot.as_ref(), standalone.as_ref(), chunk.len() as u64); + } + + AsyncWriteExt::flush(&mut file).await?; + Ok(()) + } + .await; + + match outcome { + Ok(()) => { + if let Some(bar) = standalone.as_ref() { + bar.finish_and_clear(); + } + // `slot` drops here, resetting its bar to idle. + return Ok(()); + } + Err(e) => last_err = Some(e), + } + + if attempt + 1 < MAX_ATTEMPTS { + let jitter = Duration::from_millis(rand::random::() % 500); + let backoff = Duration::from_secs(1u64 << attempt) + jitter; + warn!( + "download attempt {} failed; retrying in {:?}", + attempt + 1, + backoff + ); + tokio::time::sleep(backoff).await; + } + } + + if let Some(bar) = standalone.as_ref() { + bar.finish_and_clear(); + } + + if let Err(err) = std::fs::remove_file(tmp_path) { + warn!( + "failed to remove leftover temp download {}: {}", + tmp_path.display(), + err + ); + } + Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error"))) +} + +fn new_standalone_bar(display_name: &str) -> ProgressBar { + let bar = ProgressBar::new(0); + bar.set_style(ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template")); + bar.set_prefix(display_name.to_owned()); + bar.enable_steady_tick(SLOT_TICK); + bar +} + +fn reset_progress_for_retry(slot: Option<&SlotGuard>, standalone: Option<&ProgressBar>) { + if let Some(slot) = slot { + slot.reset_for_retry(); + } else if let Some(bar) = standalone { + bar.set_style(ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template")); + bar.set_length(0); + bar.reset(); + } +} + +fn activate_progress( + slot: Option<&SlotGuard>, + standalone: Option<&ProgressBar>, + content_length: Option, +) { + match (slot, standalone) { + (Some(slot), _) => match content_length { + Some(total) => slot.activate_known(total), + None => slot.activate_unknown(), + }, + (None, Some(bar)) => match content_length { + Some(total) => { + bar.set_style( + ProgressStyle::with_template(KNOWN_SIZE_TEMPLATE).expect("valid template"), + ); + bar.set_length(total); + bar.reset(); + } + None => { + bar.set_style( + ProgressStyle::with_template(UNKNOWN_SIZE_TEMPLATE).expect("valid template"), + ); + bar.set_length(0); + bar.reset(); + } + }, + (None, None) => {} + } +} + +fn advance_progress(slot: Option<&SlotGuard>, standalone: Option<&ProgressBar>, n: u64) { + if let Some(slot) = slot { + slot.inc(n); + } else if let Some(bar) = standalone { + bar.inc(n); + } +} From 61ea8c7f7243eeb077781ff5691f22e4a9fd036a Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Thu, 16 Apr 2026 12:24:10 -0400 Subject: [PATCH 3/3] AIMD retry Signed-off-by: Connor Tsui --- vortex-bench/src/clickbench/data.rs | 3 +- vortex-bench/src/datasets/data_downloads.rs | 450 ++++++++++++++------ vortex-bench/src/public_bi.rs | 3 +- vortex-bench/src/vector_dataset/download.rs | 9 +- 4 files changed, 337 insertions(+), 128 deletions(-) diff --git a/vortex-bench/src/clickbench/data.rs b/vortex-bench/src/clickbench/data.rs index d8d3d31d595..ba2a2104192 100644 --- a/vortex-bench/src/clickbench/data.rs +++ b/vortex-bench/src/clickbench/data.rs @@ -20,7 +20,6 @@ use vortex::error::VortexExpect; use crate::Format; // Re-export for use by clickbench_benchmark pub use crate::conversions::convert_parquet_directory_to_vortex; -use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY; use crate::datasets::data_downloads::download_data; use crate::datasets::data_downloads::download_many; @@ -201,7 +200,7 @@ impl Flavor { let url = format!("https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet"); (output_path, url) }); - download_many(downloads, DEFAULT_DOWNLOAD_CONCURRENCY).await?; + download_many(downloads).await?; } } Ok(()) diff --git a/vortex-bench/src/datasets/data_downloads.rs b/vortex-bench/src/datasets/data_downloads.rs index 70ffa2a16f7..ae1281f8366 100644 --- a/vortex-bench/src/datasets/data_downloads.rs +++ b/vortex-bench/src/datasets/data_downloads.rs @@ -8,9 +8,12 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::LazyLock; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering as AtomicOrdering; use std::time::Duration; +use std::time::Instant; use anyhow::Context; use anyhow::Error; @@ -37,10 +40,6 @@ use crate::utils::file::idempotent_async; // Public API //////////////////////////////////////////////////////////////////////////////////////////////////// -/// Default concurrency limit for bulk downloads. Keeps us polite to the upstream while -/// still saturating a typical 10 Gb link on a parquet-per-shard benchmark. -pub const DEFAULT_DOWNLOAD_CONCURRENCY: usize = 16; - /// Anything that can be described as a `(target_path, url)` pair accepted by /// [`download_many`]. pub trait IntoDownload { @@ -67,26 +66,23 @@ pub async fn download_data(fname: PathBuf, data_url: impl AsRef) -> Result< download_one(fname, data_url.as_ref(), None).await } -/// Idempotently download many `(path, url)` pairs with bounded parallelism. +/// Idempotently download many `(path, url)` pairs with adaptive parallelism. /// /// This is the preferred way to fetch multi-shard datasets (ClickBench partitioned, /// vector dataset train shards, Public BI tables, etc.) because it: /// -/// - caps in-flight HTTP requests at `max_concurrency` so we don't overwhelm the -/// upstream or our own network stack, +/// - starts at `INITIAL_IN_FLIGHT` concurrent downloads and ramps up to +/// `MAX_IN_FLIGHT` as clean completions come in (TCP-style slow-start), then +/// halves on retries to back off from upstream rate limits, /// - reuses the shared HTTP client across every shard, /// - renders a top-of-block `N/total` bar plus a fixed number of reusable slot bars via /// a shared [`MultiProgress`]: the terminal block size stays constant for the entire /// run, so nothing "jumps" as shards cycle, -/// - keeps the worker pool continuously full via `buffer_unordered`: as soon as any -/// shard finishes, the next queued shard reuses the freed slot, /// - short-circuits on the first error (the remaining in-flight downloads are dropped /// when the returned future is dropped), /// - returns the resolved on-disk paths in completion order (not submission order). -/// -/// Pass `0` as `max_concurrency` to use [`DEFAULT_DOWNLOAD_CONCURRENCY`]. -#[tracing::instrument(skip_all, fields(count = tracing::field::Empty, max_concurrency))] -pub async fn download_many(downloads: I, max_concurrency: usize) -> Result> +#[tracing::instrument(skip_all, fields(count = tracing::field::Empty))] +pub async fn download_many(downloads: I) -> Result> where I: IntoIterator, I::Item: IntoDownload, @@ -101,14 +97,9 @@ where return Ok(Vec::new()); } - let concurrency = if max_concurrency == 0 { - DEFAULT_DOWNLOAD_CONCURRENCY - } else { - max_concurrency - }; - let num_slots = downloads.len().min(concurrency); - - let batch = BatchProgress::new(downloads.len() as u64, num_slots, num_slots); + let num_slots = downloads.len().min(MAX_IN_FLIGHT); + let initial_in_flight = INITIAL_IN_FLIGHT.min(num_slots); + let batch = BatchProgress::new(downloads.len() as u64, num_slots, initial_in_flight); let results: Vec> = stream::iter(downloads) .map(|(path, url)| { @@ -191,13 +182,73 @@ const KNOWN_SIZE_TEMPLATE: &str = /// Template for an active download when the response size is unknown. const UNKNOWN_SIZE_TEMPLATE: &str = "{prefix:>28!} {spinner} {bytes} ({bytes_per_sec})"; -/// Template for the top-of-block `N/total` summary bar rendered by [`download_many`]. -const SHARDS_TEMPLATE: &str = "[{elapsed_precise}] shards [{bar:30.green/white}] {pos}/{len}"; +/// Template for the top-of-block summary bar rendered by [`download_many`]. `{pos}/{len}` +/// tracks completed-of-total; `{msg}` is updated on every slot acquire / release to show +/// how many downloads are currently in flight. +const SHARDS_TEMPLATE: &str = + "[{elapsed_precise}] shards [{bar:30.green/white}] {pos}/{len} {msg}"; /// How often slot spinners redraw. Fast enough to feel alive; slow enough that stderr /// writes sneaking past `MultiProgress` do not constantly fight for cursor position. const SLOT_TICK: Duration = Duration::from_millis(80); +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Dynamic concurrency controller +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Number of in-flight downloads to start at. Matches TCP-style slow-start: start small so +/// we don't hammer the upstream on the very first connection, and double from there. +const INITIAL_IN_FLIGHT: usize = 4; + +/// Upper bound on the number of concurrent downloads the controller can ramp up to, and +/// the number of slot rows pre-allocated in the [`MultiProgress`] block. Chosen large +/// enough that the retry-based controller is the effective ceiling, not this constant. +/// The trade-off is that on large batches the MP block will exceed a typical local +/// terminal height — indicatif handles this by drawing the most recent rows plus the +/// top shards bar — but on CI there is no TTY so the visual overflow does not apply. +const MAX_IN_FLIGHT: usize = 256; + +/// Never let the controller drive concurrency below this floor on a flaky network. +/// A value of `1` means the fallback is serial downloads. +const MIN_IN_FLIGHT: usize = 1; + +/// Minimum time between successive halves, in milliseconds. Coalesces simultaneous +/// retries from one upstream hiccup into a single reaction, preventing over-halving. +const HALVE_COOLDOWN_MS: u64 = 1000; + +/// Decide the next in-flight limit after a clean (no-retry) download completes. +/// +/// Returns `Some(new_limit)` if the limit should change, or `None` if it is already at +/// the cap or the computed move would be a no-op. +fn decide_on_success(current: usize, in_slow_start: bool) -> Option { + if current >= MAX_IN_FLIGHT { + return None; + } + let new = if in_slow_start { + current.saturating_mul(2) + } else { + current.saturating_add(1) + } + .min(MAX_IN_FLIGHT); + (new != current).then_some(new) +} + +/// Decide the next in-flight limit after a failed download attempt. +/// +/// Returns `Some(new_limit)` if the limit should be halved now. Returns `None` if the +/// halve is debounced (another halve fired within [`HALVE_COOLDOWN_MS`]) or we are +/// already at the [`MIN_IN_FLIGHT`] floor. +fn decide_on_retry(current: usize, now_ms: u64, last_halve_ms: u64) -> Option { + if now_ms.saturating_sub(last_halve_ms) < HALVE_COOLDOWN_MS { + return None; + } + if current <= MIN_IN_FLIGHT { + return None; + } + let new = (current / 2).max(MIN_IN_FLIGHT); + (new != current).then_some(new) +} + //////////////////////////////////////////////////////////////////////////////////////////////////// // Batch download internals //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -223,13 +274,37 @@ struct BatchInner { shards_bar: ProgressBar, free: Mutex>, in_flight: Arc, + /// Current concurrency limit — the source of truth read by the controller and + /// written via [`BatchProgress::set_max_in_flight`]. current_in_flight: AtomicUsize, num_slots: usize, + /// Controller state: are we still in slow-start (double on success) or have we + /// dropped into additive-increase (`+=1` on success) after the first retry? + in_slow_start: AtomicBool, + /// Millis since [`BatchInner::created_at`] of the most recent halve event, used to + /// debounce bursts of retries from a single upstream hiccup. + last_halve_at_ms: AtomicU64, + created_at: Instant, // The MP is kept alive alongside the Arc so bars stay registered and rendered. // Once the last BatchProgress clone drops, the MP drops and clears the block. _mp: MultiProgress, } +impl BatchInner { + fn elapsed_ms(&self) -> u64 { + u64::try_from(self.created_at.elapsed().as_millis()).unwrap_or(u64::MAX) + } + + /// Refresh the shards bar message to reflect the current in-flight count. Called on + /// every slot acquire / release and from `set_max_in_flight`. + fn refresh_shards_message(&self) { + let active = self.num_slots - self.free.lock().len(); + let limit = self.current_in_flight.load(AtomicOrdering::Relaxed); + self.shards_bar + .set_message(format!("({active} active, limit {limit})")); + } +} + impl BatchProgress { fn new(total: u64, num_slots: usize, initial_in_flight: usize) -> Self { let initial_in_flight = initial_in_flight.min(num_slots); @@ -255,8 +330,12 @@ impl BatchProgress { in_flight: Arc::new(Semaphore::new(initial_in_flight)), current_in_flight: AtomicUsize::new(initial_in_flight), num_slots, + in_slow_start: AtomicBool::new(true), + last_halve_at_ms: AtomicU64::new(0), + created_at: Instant::now(), _mp: mp, }; + inner.refresh_shards_message(); Self { inner: Arc::new(inner), } @@ -281,6 +360,7 @@ impl BatchProgress { bar.set_message(""); bar.set_length(0); bar.reset(); + self.inner.refresh_shards_message(); SlotGuard { bar, owner: Arc::clone(&self.inner), @@ -296,16 +376,39 @@ impl BatchProgress { self.inner.shards_bar.finish_and_clear(); } + /// Called when a download completed on its first attempt (no retries). Drives the + /// slow-start / additive-increase side of AIMD. + fn report_clean_success(&self) { + let current = self.inner.current_in_flight.load(AtomicOrdering::Relaxed); + let in_slow_start = self.inner.in_slow_start.load(AtomicOrdering::Relaxed); + if let Some(new) = decide_on_success(current, in_slow_start) { + self.set_max_in_flight(new); + } + } + + /// Called when a download attempt failed. Drives the halving side of AIMD, with an + /// internal cooldown so a burst of simultaneous retries from one upstream hiccup + /// halves the limit at most once. + fn report_retry(&self) { + let now_ms = self.inner.elapsed_ms(); + let current = self.inner.current_in_flight.load(AtomicOrdering::Relaxed); + let last_halve_ms = self.inner.last_halve_at_ms.load(AtomicOrdering::Relaxed); + if let Some(new) = decide_on_retry(current, now_ms, last_halve_ms) { + self.inner + .in_slow_start + .store(false, AtomicOrdering::Relaxed); + self.inner + .last_halve_at_ms + .store(now_ms, AtomicOrdering::Relaxed); + self.set_max_in_flight(new); + } + } + /// Adjust how many downloads may run concurrently. Clamped to the pre-allocated /// slot count. Raising the limit returns immediately; lowering spawns a background /// task that acquires and forgets the delta so the limit takes effect as active /// downloads complete naturally, never interrupting an in-flight transfer. - /// - /// The mechanism is in place but no policy currently calls it. A future adaptive - /// controller (error-rate backoff, throughput watchdog, explicit CLI flag) can drop - /// in without any further changes to this module. - #[allow(dead_code)] - pub(crate) fn set_max_in_flight(&self, target: usize) { + fn set_max_in_flight(&self, target: usize) { let target = target.min(self.inner.num_slots); let prev = self .inner @@ -326,6 +429,7 @@ impl BatchProgress { } Ordering::Equal => {} } + self.inner.refresh_shards_message(); } } @@ -374,6 +478,7 @@ impl Drop for SlotGuard { bar.set_length(0); bar.reset(); self.owner.free.lock().push(bar); + self.owner.refresh_shards_message(); } } @@ -413,77 +518,91 @@ async fn retry_get( batch: Option<&BatchProgress>, ) -> Result<()> { const MAX_ATTEMPTS: u32 = 3; + let progress = DownloadProgress::new(batch, display_name).await; let mut last_err: Option = None; - let slot = match batch { - Some(b) => Some(b.acquire(display_name).await), - None => None, - }; - let standalone = slot.is_none().then(|| new_standalone_bar(display_name)); - for attempt in 0..MAX_ATTEMPTS { if attempt > 0 { - reset_progress_for_retry(slot.as_ref(), standalone.as_ref()); + progress.reset_for_retry(); } - let outcome: Result<()> = async { - let mut file = TokioFile::create(tmp_path) - .await - .context("Failed to create file")?; - let response = client - .get(url) - .send() - .await - .context("Failed to send HTTP request")? - .error_for_status() - .context("HTTP request returned error status")?; - - activate_progress( - slot.as_ref(), - standalone.as_ref(), - response.content_length(), - ); - - let mut stream = response.bytes_stream(); - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - AsyncWriteExt::write_all(&mut file, &chunk) - .await - .context("Failed to write to file")?; - advance_progress(slot.as_ref(), standalone.as_ref(), chunk.len() as u64); - } - - AsyncWriteExt::flush(&mut file).await?; - Ok(()) - } - .await; - - match outcome { + match single_attempt(client, url, tmp_path, &progress).await { Ok(()) => { - if let Some(bar) = standalone.as_ref() { - bar.finish_and_clear(); + if attempt == 0 + && let Some(b) = batch + { + b.report_clean_success(); } - // `slot` drops here, resetting its bar to idle. + progress.finalize(); return Ok(()); } - Err(e) => last_err = Some(e), + Err(e) => { + if let Some(b) = batch { + b.report_retry(); + } + last_err = Some(e); + } } - if attempt + 1 < MAX_ATTEMPTS { - let jitter = Duration::from_millis(rand::random::() % 500); - let backoff = Duration::from_secs(1u64 << attempt) + jitter; - warn!( - "download attempt {} failed; retrying in {:?}", - attempt + 1, - backoff - ); - tokio::time::sleep(backoff).await; + sleep_with_jitter(attempt).await; } } - if let Some(bar) = standalone.as_ref() { - bar.finish_and_clear(); + progress.finalize(); + cleanup_partial_temp(tmp_path); + Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error"))) +} + +/// Perform one download attempt end to end: create the temp file, issue the GET, stream +/// bytes to disk while advancing the progress bar. Returns on the first error so the +/// retry loop in [`retry_get`] can decide whether to try again. +async fn single_attempt( + client: &Client, + url: &str, + tmp_path: &Path, + progress: &DownloadProgress, +) -> Result<()> { + let mut file = TokioFile::create(tmp_path) + .await + .context("Failed to create file")?; + let response = client + .get(url) + .send() + .await + .context("Failed to send HTTP request")? + .error_for_status() + .context("HTTP request returned error status")?; + + progress.activate(response.content_length()); + + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + AsyncWriteExt::write_all(&mut file, &chunk) + .await + .context("Failed to write to file")?; + progress.inc(chunk.len() as u64); } + AsyncWriteExt::flush(&mut file).await?; + Ok(()) +} + +/// Sleep `2^attempt` seconds plus 0-500 ms of jitter before the next retry. +async fn sleep_with_jitter(attempt: u32) { + let jitter = Duration::from_millis(rand::random::() % 500); + let backoff = Duration::from_secs(1u64 << attempt) + jitter; + warn!( + "download attempt {} failed; retrying in {:?}", + attempt + 1, + backoff + ); + tokio::time::sleep(backoff).await; +} + +/// Best-effort removal of a partial temp file left behind when every retry attempt +/// failed. The UUID-named temp lives under `target/`; leaking it would be mostly +/// harmless but adds up over many CI runs. +fn cleanup_partial_temp(tmp_path: &Path) { if let Err(err) = std::fs::remove_file(tmp_path) { warn!( "failed to remove leftover temp download {}: {}", @@ -491,61 +610,154 @@ async fn retry_get( err ); } - Err(last_err.unwrap_or_else(|| anyhow::anyhow!("retry_get exhausted with no recorded error"))) } -fn new_standalone_bar(display_name: &str) -> ProgressBar { - let bar = ProgressBar::new(0); - bar.set_style(ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template")); - bar.set_prefix(display_name.to_owned()); - bar.enable_steady_tick(SLOT_TICK); - bar +/// Unified progress handle for a single download. Hides the split between pooled slot +/// bars (batched path) and one-off standalone bars (single-download path) so +/// [`retry_get`] does not have to branch on every update. +enum DownloadProgress { + Slot(SlotGuard), + Standalone(ProgressBar), } -fn reset_progress_for_retry(slot: Option<&SlotGuard>, standalone: Option<&ProgressBar>) { - if let Some(slot) = slot { - slot.reset_for_retry(); - } else if let Some(bar) = standalone { - bar.set_style(ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template")); - bar.set_length(0); - bar.reset(); +impl DownloadProgress { + async fn new(batch: Option<&BatchProgress>, display_name: &str) -> Self { + match batch { + Some(b) => Self::Slot(b.acquire(display_name).await), + None => Self::Standalone(new_standalone_bar(display_name)), + } } -} -fn activate_progress( - slot: Option<&SlotGuard>, - standalone: Option<&ProgressBar>, - content_length: Option, -) { - match (slot, standalone) { - (Some(slot), _) => match content_length { - Some(total) => slot.activate_known(total), - None => slot.activate_unknown(), - }, - (None, Some(bar)) => match content_length { - Some(total) => { + fn reset_for_retry(&self) { + match self { + Self::Slot(s) => s.reset_for_retry(), + Self::Standalone(bar) => { + bar.set_style( + ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template"), + ); + bar.set_length(0); + bar.reset(); + } + } + } + + fn activate(&self, content_length: Option) { + match (self, content_length) { + (Self::Slot(s), Some(total)) => s.activate_known(total), + (Self::Slot(s), None) => s.activate_unknown(), + (Self::Standalone(bar), Some(total)) => { bar.set_style( ProgressStyle::with_template(KNOWN_SIZE_TEMPLATE).expect("valid template"), ); bar.set_length(total); bar.reset(); } - None => { + (Self::Standalone(bar), None) => { bar.set_style( ProgressStyle::with_template(UNKNOWN_SIZE_TEMPLATE).expect("valid template"), ); bar.set_length(0); bar.reset(); } - }, - (None, None) => {} + } + } + + fn inc(&self, n: u64) { + match self { + Self::Slot(s) => s.inc(n), + Self::Standalone(bar) => bar.inc(n), + } + } + + /// Tear down any visible state. Standalone bars are explicitly cleared here; + /// slot bars clean themselves up when their [`SlotGuard`] drops. + fn finalize(&self) { + if let Self::Standalone(bar) = self { + bar.finish_and_clear(); + } } } -fn advance_progress(slot: Option<&SlotGuard>, standalone: Option<&ProgressBar>, n: u64) { - if let Some(slot) = slot { - slot.inc(n); - } else if let Some(bar) = standalone { - bar.inc(n); +fn new_standalone_bar(display_name: &str) -> ProgressBar { + let bar = ProgressBar::new(0); + bar.set_style(ProgressStyle::with_template(CONNECTING_TEMPLATE).expect("valid template")); + bar.set_prefix(display_name.to_owned()); + bar.enable_steady_tick(SLOT_TICK); + bar +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + use super::*; + + const COOLDOWN_MS: u64 = HALVE_COOLDOWN_MS; + + #[test] + fn ramp_up_doubles_in_slow_start() { + // Start at INITIAL (4). Each clean success in slow-start doubles until MAX. + let mut cur = INITIAL_IN_FLIGHT; + let expected = [8, 16, 32, 64, 128, 256]; + for want in expected { + let next = decide_on_success(cur, true).expect("should ramp"); + assert_eq!(next, want); + cur = next; + } + // At MAX, further successes are no-ops. + assert_eq!(cur, MAX_IN_FLIGHT); + assert_eq!(decide_on_success(cur, true), None); + assert_eq!(decide_on_success(cur, false), None); + } + + #[test] + fn additive_increase_after_slow_start_exits() { + // Once out of slow-start, successes add 1 instead of doubling. + assert_eq!(decide_on_success(16, false), Some(17)); + assert_eq!(decide_on_success(17, false), Some(18)); + } + + #[test] + fn retry_halves() { + // At 64, a single retry (past the cooldown) halves to 32. + assert_eq!(decide_on_retry(64, COOLDOWN_MS + 1, 0), Some(32)); + assert_eq!(decide_on_retry(32, COOLDOWN_MS + 1, 0), Some(16)); + assert_eq!(decide_on_retry(2, COOLDOWN_MS + 1, 0), Some(1)); + } + + #[test] + fn halve_is_debounced() { + // Three retries at t=100, t=200, t=500 (all within the 1 s cooldown after the + // first halve at t=100) only produce one halve. + let last_halve = 100; + assert_eq!(decide_on_retry(64, 200, last_halve), None); + assert_eq!(decide_on_retry(64, 500, last_halve), None); + // A retry past the cooldown halves again. + assert_eq!( + decide_on_retry(64, last_halve + COOLDOWN_MS + 1, last_halve), + Some(32) + ); + } + + #[test] + fn halve_respects_min_floor() { + // At MIN (1), retries are no-ops — we never go below 1. + assert_eq!(decide_on_retry(MIN_IN_FLIGHT, COOLDOWN_MS + 1, 0), None); + // At 2, halving to 1 is the last step. + assert_eq!(decide_on_retry(2, COOLDOWN_MS + 1, 0), Some(1)); + } + + #[test] + fn ramp_up_respects_max_cap() { + // Even from a large `current`, we never exceed MAX. + assert_eq!( + decide_on_success(MAX_IN_FLIGHT - 1, true), + Some(MAX_IN_FLIGHT) + ); + assert_eq!(decide_on_success(MAX_IN_FLIGHT, true), None); + // Additive at the cap is also a no-op. + assert_eq!(decide_on_success(MAX_IN_FLIGHT, false), None); } } diff --git a/vortex-bench/src/public_bi.rs b/vortex-bench/src/public_bi.rs index 29c0d01a960..fe937876275 100644 --- a/vortex-bench/src/public_bi.rs +++ b/vortex-bench/src/public_bi.rs @@ -42,7 +42,6 @@ use crate::SESSION; use crate::TableSpec; use crate::conversions::parquet_to_vortex_chunks; use crate::datasets::Dataset; -use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY; use crate::datasets::data_downloads::decompress_bz2; use crate::datasets::data_downloads::download_many; use crate::idempotent_async; @@ -296,7 +295,7 @@ impl PBIData { table.data_url.as_str().to_owned(), ) }); - download_many(downloads, DEFAULT_DOWNLOAD_CONCURRENCY).await?; + download_many(downloads).await?; Ok(()) } diff --git a/vortex-bench/src/vector_dataset/download.rs b/vortex-bench/src/vector_dataset/download.rs index e8a871ac52a..a7b7d8c6e89 100644 --- a/vortex-bench/src/vector_dataset/download.rs +++ b/vortex-bench/src/vector_dataset/download.rs @@ -18,7 +18,6 @@ use std::path::PathBuf; use anyhow::Context; use anyhow::Result; -use crate::datasets::data_downloads::DEFAULT_DOWNLOAD_CONCURRENCY; use crate::datasets::data_downloads::download_data; use crate::datasets::data_downloads::download_many; use crate::vector_dataset::catalog::VectorDataset; @@ -94,9 +93,9 @@ pub struct DatasetPaths { /// This has idempotent semantics, so files already present on disk are skipped, and re-runs only /// pay for new files. /// -/// Train shards download via [`download_many`] with bounded parallelism (up to -/// [`DEFAULT_DOWNLOAD_CONCURRENCY`]); the small `test.parquet` and `neighbors.parquet` files use -/// the simple [`download_data`] helper. All HTTP requests share a single pooled client. +/// Train shards download via [`download_many`] with adaptive parallelism; the small +/// `test.parquet` and `neighbors.parquet` files use the simple [`download_data`] helper. +/// All HTTP requests share a single pooled client. pub async fn download(ds: VectorDataset, layout: TrainLayout) -> Result { let spec = ds.validate_layout(layout)?; let urls = train_urls(ds, spec); @@ -108,7 +107,7 @@ pub async fn download(ds: VectorDataset, layout: TrainLayout) -> Result