@@ -6,31 +6,22 @@ use std::fmt::Display;
66use std:: path:: Path ;
77use std:: str:: FromStr ;
88use std:: sync:: LazyLock ;
9- use std:: time:: Duration ;
109
1110use arrow_schema:: DataType ;
1211use arrow_schema:: Field ;
1312use arrow_schema:: Schema ;
1413use arrow_schema:: TimeUnit ;
15- use bytes:: Bytes ;
1614use clap:: ValueEnum ;
17- use futures:: StreamExt ;
18- use indicatif:: ProgressBar ;
19- use indicatif:: ProgressStyle ;
20- use reqwest:: IntoUrl ;
2115use serde:: Deserialize ;
2216use serde:: Serialize ;
23- use tokio:: fs:: File ;
24- use tokio:: io:: AsyncWriteExt ;
2517use tokio:: task:: JoinSet ;
2618use tracing:: info;
27- use tracing:: warn;
2819use vortex:: error:: VortexExpect ;
2920
3021use crate :: Format ;
3122// Re-export for use by clickbench_benchmark
3223pub use crate :: conversions:: convert_parquet_directory_to_vortex;
33- use crate :: idempotent_async ;
24+ use crate :: datasets :: data_downloads :: download_data ;
3425
3526pub static HITS_SCHEMA : LazyLock < Schema > = LazyLock :: new ( || {
3627 use DataType :: * ;
@@ -190,40 +181,25 @@ impl Display for Flavor {
190181
191182impl Flavor {
192183 // TODO(joe): move these elsewhere.
193- pub async fn download (
194- & self ,
195- client : reqwest:: Client ,
196- basepath : impl AsRef < Path > ,
197- ) -> anyhow:: Result < ( ) > {
184+ pub async fn download ( & self , basepath : impl AsRef < Path > ) -> anyhow:: Result < ( ) > {
198185 let basepath = basepath. as_ref ( ) ;
199186 match self {
200187 Flavor :: Single => {
201188 let output_path = basepath. join ( Format :: Parquet . name ( ) ) . join ( "hits.parquet" ) ;
202- idempotent_async ( output_path. as_path ( ) , |output_path| async move {
203- info ! ( "Downloading single clickbench file" ) ;
204- let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet" ;
205- download_large_file ( & client, url, & output_path) . await ?;
206- anyhow:: Ok ( ( ) )
207- } )
208- . await ?;
189+ info ! ( "Downloading single clickbench file" ) ;
190+ let url = "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_single/hits.parquet" ;
191+ download_data ( output_path, url) . await ?;
209192 }
210193 Flavor :: Partitioned => {
211194 // The clickbench-provided file is missing some higher-level type info, so we reprocess it
212195 // to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
213196
214197 let mut tasks = ( 0_u32 ..100 ) . map ( |idx| {
215198 let output_path = basepath. join ( Format :: Parquet . name ( ) ) . join ( format ! ( "hits_{idx}.parquet" ) ) ;
216- let client = client. clone ( ) ;
217199
218- idempotent_async ( output_path, move |output_path| async move {
219- info ! ( "Downloading file {idx}" ) ;
220- let url = format ! ( "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet" ) ;
221- let body = retry_get ( & client, url) . await ?;
222- let mut file = File :: create ( output_path) . await ?;
223- file. write_all ( & body) . await ?;
224-
225- anyhow:: Ok ( ( ) )
226- } )
200+ info ! ( "Downloading file {idx}" ) ;
201+ let url = format ! ( "https://pub-3ba949c0f0354ac18db1f0f14f0a2c52.r2.dev/clickbench/parquet_many/hits_{idx}.parquet" ) ;
202+ download_data ( output_path, url)
227203 } ) . collect :: < JoinSet < _ > > ( ) ;
228204
229205 while let Some ( task) = tasks. join_next ( ) . await {
@@ -234,66 +210,3 @@ impl Flavor {
234210 Ok ( ( ) )
235211 }
236212}
237-
238- /// Downloads a large file with streaming and progress indication.
239- async fn download_large_file (
240- client : & reqwest:: Client ,
241- url : & str ,
242- output_path : & Path ,
243- ) -> anyhow:: Result < ( ) > {
244- let response = client. get ( url) . send ( ) . await ?. error_for_status ( ) ?;
245-
246- let total_size = response. content_length ( ) . unwrap_or ( 0 ) ;
247-
248- let progress = ProgressBar :: new ( total_size) ;
249- progress. set_style (
250- ProgressStyle :: with_template (
251- "[{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} ({bytes_per_sec})" ,
252- )
253- . expect ( "valid template" ) ,
254- ) ;
255-
256- let mut file = File :: create ( output_path) . await ?;
257- let mut stream = response. bytes_stream ( ) ;
258-
259- while let Some ( chunk) = stream. next ( ) . await {
260- let chunk = chunk?;
261- file. write_all ( & chunk) . await ?;
262- progress. inc ( chunk. len ( ) as u64 ) ;
263- }
264-
265- progress. finish ( ) ;
266- Ok ( ( ) )
267- }
268-
269- async fn retry_get ( client : & reqwest:: Client , url : impl IntoUrl ) -> anyhow:: Result < Bytes > {
270- let url = url. as_str ( ) ;
271- let make_req = || async { client. get ( url) . send ( ) . await } ;
272-
273- let mut body = None ;
274-
275- for attempt in 0 ..3 {
276- match make_req ( ) . await . and_then ( |r| r. error_for_status ( ) ) {
277- Ok ( r) => match r. bytes ( ) . await {
278- Ok ( b) => {
279- body = Some ( b) ;
280- break ;
281- }
282- Err ( e) => {
283- warn ! ( "Request errored with {e}, retying for the {attempt} time" ) ;
284- }
285- } ,
286- Err ( e) => {
287- warn ! ( "Request errored with {e}, retying for the {attempt} time" ) ;
288- }
289- }
290-
291- // Very basic backoff mechanism
292- tokio:: time:: sleep ( Duration :: from_secs ( attempt + 1 ) ) . await ;
293- }
294-
295- match body {
296- Some ( v) => Ok ( v) ,
297- None => anyhow:: bail!( "Exahusted retry attempts for {url}" ) ,
298- }
299- }
0 commit comments