@@ -9,7 +9,7 @@ use anyhow::{Context as _, Result};
99use clap:: { Arg , ArgMatches , Command } ;
1010use console:: style;
1111use itertools:: Itertools as _;
12- use log:: { debug, info , warn} ;
12+ use log:: { debug, warn} ;
1313use objectstore_client:: { ClientBuilder , ExpirationPolicy , Usecase } ;
1414use rayon:: prelude:: * ;
1515use secrecy:: ExposeSecret as _;
@@ -303,6 +303,11 @@ fn read_sidecar_metadata(image_path: &Path) -> Result<HashMap<String, Value>> {
303303 } )
304304}
305305
306+ struct PreparedImage {
307+ path : PathBuf ,
308+ key : String ,
309+ }
310+
306311fn upload_images (
307312 images : Vec < ImageInfo > ,
308313 org : & str ,
@@ -318,30 +323,28 @@ fn upload_images(
318323 let client = ClientBuilder :: new ( options. objectstore . url )
319324 . token ( {
320325 // TODO: replace with auth from `ObjectstoreUploadOptions` when appropriate
321- let auth = match authenticated_api. auth ( ) {
326+ match authenticated_api. auth ( ) {
322327 Auth :: Token ( token) => token. raw ( ) . expose_secret ( ) . to_owned ( ) ,
323- } ;
324- auth
328+ }
325329 } )
326330 . configure_reqwest ( |r| r. connect_timeout ( Duration :: from_secs ( 10 ) ) )
327331 . build ( ) ?;
328332
333+ let scopes = options. objectstore . scopes ;
334+
335+ let find_scope = |name : & str | {
336+ scopes
337+ . iter ( )
338+ . find ( |( k, _) | k == name)
339+ . map ( |( _, v) | v. clone ( ) )
340+ } ;
341+ let org_id = find_scope ( "org" ) . context ( "Missing org in UploadOptions scope" ) ?;
342+ let project_id = find_scope ( "project" ) . context ( "Missing project in UploadOptions scope" ) ?;
343+
329344 let mut scope = Usecase :: new ( "preprod" ) . scope ( ) ;
330- let ( mut org_id, mut project_id) : ( Option < String > , Option < String > ) = ( None , None ) ;
331- for ( key, value) in options. objectstore . scopes . into_iter ( ) {
332- scope = scope. push ( & key, value. clone ( ) ) ;
333- if key == "org" {
334- org_id = Some ( value) ;
335- } else if key == "project" {
336- project_id = Some ( value) ;
337- }
345+ for ( key, value) in scopes {
346+ scope = scope. push ( & key, value) ;
338347 }
339- let Some ( org_id) = org_id else {
340- anyhow:: bail!( "Missing org in UploadOptions scope" ) ;
341- } ;
342- let Some ( project_id) = project_id else {
343- anyhow:: bail!( "Missing project in UploadOptions scope" ) ;
344- } ;
345348
346349 let session = scope. session ( & client) ?;
347350
@@ -350,10 +353,10 @@ fn upload_images(
350353 . build ( )
351354 . context ( "Failed to create tokio runtime" ) ?;
352355
353- let mut many_builder = session. many ( ) ;
354356 let mut manifest_entries = HashMap :: new ( ) ;
355357 let mut collisions: HashMap < String , Vec < String > > = HashMap :: new ( ) ;
356358 let mut kept_paths = HashMap :: new ( ) ;
359+ let mut uploads = Vec :: with_capacity ( images. len ( ) ) ;
357360
358361 let hashed_images: Vec < _ > = images
359362 . into_par_iter ( )
@@ -381,21 +384,7 @@ fn upload_images(
381384 continue ;
382385 }
383386
384- let file = runtime
385- . block_on ( tokio:: fs:: File :: open ( & image. path ) )
386- . with_context ( || {
387- format ! ( "Failed to open image for upload: {}" , image. path. display( ) )
388- } ) ?;
389-
390387 let key = format ! ( "{org_id}/{project_id}/{hash}" ) ;
391- info ! ( "Queueing {} as {key}" , image. relative_path. display( ) ) ;
392-
393- many_builder = many_builder. push (
394- session
395- . put_file ( file)
396- . key ( & key)
397- . expiration_policy ( expiration) ,
398- ) ;
399388
400389 let mut extra = read_sidecar_metadata ( & image. path ) . unwrap_or_else ( |err| {
401390 warn ! ( "Error reading sidecar metadata, ignoring it instead: {err:#}" ) ;
@@ -404,47 +393,60 @@ fn upload_images(
404393 extra. insert ( "content_hash" . to_owned ( ) , serde_json:: Value :: String ( hash) ) ;
405394
406395 kept_paths. insert ( image_file_name. clone ( ) , relative_path) ;
396+ uploads. push ( PreparedImage {
397+ path : image. path ,
398+ key,
399+ } ) ;
407400 manifest_entries. insert (
408401 image_file_name,
409402 ImageMetadata :: new ( image. width , image. height , extra) ,
410403 ) ;
411404 }
412405
413406 if !collisions. is_empty ( ) {
414- let mut details = String :: new ( ) ;
415- for ( name, excluded_paths) in & collisions {
416- let mut all_paths = vec ! [ kept_paths[ name] . as_str( ) ] ;
417- all_paths. extend ( excluded_paths. iter ( ) . map ( |s| s. as_str ( ) ) ) ;
418- details. push_str ( & format ! ( "\n {name}: {}" , all_paths. join( ", " ) ) ) ;
419- }
407+ let details: String = collisions
408+ . iter ( )
409+ . map ( |( name, excluded) | {
410+ let kept = & kept_paths[ name] ;
411+ let all = std:: iter:: once ( kept. as_str ( ) )
412+ . chain ( excluded. iter ( ) . map ( |s| s. as_str ( ) ) )
413+ . join ( ", " ) ;
414+ format ! ( "\n {name}: {all}" )
415+ } )
416+ . collect ( ) ;
420417 warn ! ( "Some images share identical file names. Only the first occurrence of each is included:{details}" ) ;
421418 }
422419
423- let result = runtime . block_on ( async { many_builder . send ( ) . error_for_failures ( ) . await } ) ;
420+ let total_count = uploads . len ( ) ;
424421
425- let uploaded_count = manifest_entries. len ( ) ;
422+ let mut many_builder = session. many ( ) ;
423+ for prepared in uploads {
424+ many_builder = many_builder. push (
425+ session
426+ . put_path ( prepared. path . clone ( ) )
427+ . key ( & prepared. key )
428+ . expiration_policy ( expiration) ,
429+ ) ;
430+ }
426431
427- match result {
428- Ok ( ( ) ) => {
429- println ! (
430- "{} Uploaded {} image {}" ,
431- style( ">" ) . dim( ) ,
432- style( uploaded_count) . yellow( ) ,
433- if uploaded_count == 1 { "file" } else { "files" }
434- ) ;
435- Ok ( manifest_entries)
436- }
437- Err ( errors) => {
438- eprintln ! ( "There were errors uploading images:" ) ;
439- let mut error_count = 0 ;
440- for error in errors {
441- let error = anyhow:: Error :: new ( error) ;
442- eprintln ! ( " {}" , style( format!( "{error:#}" ) ) . red( ) ) ;
443- error_count += 1 ;
444- }
445- anyhow:: bail!( "Failed to upload {error_count} out of {uploaded_count} images" )
432+ let result = runtime. block_on ( async { many_builder. send ( ) . await . error_for_failures ( ) . await } ) ;
433+ if let Err ( errors) = result {
434+ let errors: Vec < _ > = errors. collect ( ) ;
435+ eprintln ! ( "There were errors uploading images:" ) ;
436+ for error in & errors {
437+ eprintln ! ( " {}" , style( format!( "{error:#}" ) ) . red( ) ) ;
446438 }
439+ let error_count = errors. len ( ) ;
440+ anyhow:: bail!( "Failed to upload {error_count} images" ) ;
447441 }
442+
443+ println ! (
444+ "{} Uploaded {} image {}" ,
445+ style( ">" ) . dim( ) ,
446+ style( total_count) . yellow( ) ,
447+ if total_count == 1 { "file" } else { "files" }
448+ ) ;
449+ Ok ( manifest_entries)
448450}
449451
450452#[ cfg( test) ]
0 commit comments