Skip to content

Commit 31c6346

Browse files
fix(snapshots): Chunk image uploads to avoid fd exhaustion and 413 errors
Uploading hundreds of images fails in two ways: all file handles are opened upfront causing EMFILE (too many open files), and the objectstore client batches everything into one HTTP request causing 413 Payload Too Large from the server. Split upload_images() into two phases: pre-process all images without opening files (collision detection, hashing, metadata), then upload in chunks of 100. This bounds both open file descriptors and HTTP request payload size regardless of total image count. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 3c123e0 commit 31c6346

File tree

1 file changed

+79
-55
lines changed

1 file changed

+79
-55
lines changed

src/commands/build/snapshots.rs

Lines changed: 79 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use anyhow::{Context as _, Result};
99
use clap::{Arg, ArgMatches, Command};
1010
use console::style;
1111
use itertools::Itertools as _;
12-
use log::{debug, info, warn};
12+
use log::{debug, warn};
1313
use objectstore_client::{ClientBuilder, ExpirationPolicy, Usecase};
1414
use rayon::prelude::*;
1515
use secrecy::ExposeSecret as _;
@@ -29,6 +29,7 @@ const EXPERIMENTAL_WARNING: &str =
2929

3030
const IMAGE_EXTENSIONS: &[&str] = &["png", "jpg", "jpeg"];
3131
const MAX_PIXELS_PER_IMAGE: u64 = 40_000_000;
32+
const UPLOAD_BATCH_SIZE: usize = 100;
3233

3334
pub fn make_command(command: Command) -> Command {
3435
command
@@ -283,6 +284,11 @@ fn read_sidecar_metadata(image_path: &Path) -> Result<HashMap<String, Value>> {
283284
})
284285
}
285286

287+
struct PreparedImage {
288+
path: PathBuf,
289+
key: String,
290+
}
291+
286292
fn upload_images(
287293
images: Vec<ImageInfo>,
288294
org: &str,
@@ -298,30 +304,28 @@ fn upload_images(
298304
let client = ClientBuilder::new(options.objectstore.url)
299305
.token({
300306
// TODO: replace with auth from `ObjectstoreUploadOptions` when appropriate
301-
let auth = match authenticated_api.auth() {
307+
match authenticated_api.auth() {
302308
Auth::Token(token) => token.raw().expose_secret().to_owned(),
303-
};
304-
auth
309+
}
305310
})
306311
.configure_reqwest(|r| r.connect_timeout(Duration::from_secs(10)))
307312
.build()?;
308313

314+
let scopes = options.objectstore.scopes;
315+
316+
let find_scope = |name: &str| {
317+
scopes
318+
.iter()
319+
.find(|(k, _)| k == name)
320+
.map(|(_, v)| v.clone())
321+
};
322+
let org_id = find_scope("org").context("Missing org in UploadOptions scope")?;
323+
let project_id = find_scope("project").context("Missing project in UploadOptions scope")?;
324+
309325
let mut scope = Usecase::new("preprod").scope();
310-
let (mut org_id, mut project_id): (Option<String>, Option<String>) = (None, None);
311-
for (key, value) in options.objectstore.scopes.into_iter() {
312-
scope = scope.push(&key, value.clone());
313-
if key == "org" {
314-
org_id = Some(value);
315-
} else if key == "project" {
316-
project_id = Some(value);
317-
}
326+
for (key, value) in scopes {
327+
scope = scope.push(&key, value);
318328
}
319-
let Some(org_id) = org_id else {
320-
anyhow::bail!("Missing org in UploadOptions scope");
321-
};
322-
let Some(project_id) = project_id else {
323-
anyhow::bail!("Missing project in UploadOptions scope");
324-
};
325329

326330
let session = scope.session(&client)?;
327331

@@ -330,10 +334,10 @@ fn upload_images(
330334
.build()
331335
.context("Failed to create tokio runtime")?;
332336

333-
let mut many_builder = session.many();
334337
let mut manifest_entries = HashMap::new();
335338
let mut collisions: HashMap<String, Vec<String>> = HashMap::new();
336339
let mut kept_paths = HashMap::new();
340+
let mut uploads = Vec::with_capacity(images.len());
337341

338342
let hashed_images: Vec<_> = images
339343
.into_par_iter()
@@ -361,21 +365,7 @@ fn upload_images(
361365
continue;
362366
}
363367

364-
let file = runtime
365-
.block_on(tokio::fs::File::open(&image.path))
366-
.with_context(|| {
367-
format!("Failed to open image for upload: {}", image.path.display())
368-
})?;
369-
370368
let key = format!("{org_id}/{project_id}/{hash}");
371-
info!("Queueing {} as {key}", image.relative_path.display());
372-
373-
many_builder = many_builder.push(
374-
session
375-
.put_file(file)
376-
.key(&key)
377-
.expiration_policy(expiration),
378-
);
379369

380370
let mut extra = read_sidecar_metadata(&image.path).unwrap_or_else(|err| {
381371
warn!("Error reading sidecar metadata, ignoring it instead: {err:#}");
@@ -384,47 +374,81 @@ fn upload_images(
384374
extra.insert("content_hash".to_owned(), serde_json::Value::String(hash));
385375

386376
kept_paths.insert(image_file_name.clone(), relative_path);
377+
uploads.push(PreparedImage {
378+
path: image.path,
379+
key,
380+
});
387381
manifest_entries.insert(
388382
image_file_name,
389383
ImageMetadata::new(image.width, image.height, extra),
390384
);
391385
}
392386

393387
if !collisions.is_empty() {
394-
let mut details = String::new();
395-
for (name, excluded_paths) in &collisions {
396-
let mut all_paths = vec![kept_paths[name].as_str()];
397-
all_paths.extend(excluded_paths.iter().map(|s| s.as_str()));
398-
details.push_str(&format!("\n {name}: {}", all_paths.join(", ")));
399-
}
388+
let details: String = collisions
389+
.iter()
390+
.map(|(name, excluded)| {
391+
let kept = &kept_paths[name];
392+
let all = std::iter::once(kept.as_str())
393+
.chain(excluded.iter().map(|s| s.as_str()))
394+
.join(", ");
395+
format!("\n {name}: {all}")
396+
})
397+
.collect();
400398
warn!("Some images share identical file names. Only the first occurrence of each is included:{details}");
401399
}
402400

403-
let result = runtime.block_on(async { many_builder.send().error_for_failures().await });
401+
let total_count = uploads.len();
402+
let total_batches = total_count.div_ceil(UPLOAD_BATCH_SIZE);
404403

405-
let uploaded_count = manifest_entries.len();
404+
for (batch_idx, chunk) in uploads.chunks(UPLOAD_BATCH_SIZE).enumerate() {
405+
debug!(
406+
"Uploading batch {}/{total_batches} ({} images)",
407+
batch_idx + 1,
408+
chunk.len()
409+
);
406410

407-
match result {
408-
Ok(()) => {
409-
println!(
410-
"{} Uploaded {} image {}",
411-
style(">").dim(),
412-
style(uploaded_count).yellow(),
413-
if uploaded_count == 1 { "file" } else { "files" }
411+
let mut many_builder = session.many();
412+
for prepared in chunk {
413+
let file = runtime
414+
.block_on(tokio::fs::File::open(&prepared.path))
415+
.with_context(|| {
416+
format!(
417+
"Failed to open image for upload: {}",
418+
prepared.path.display()
419+
)
420+
})?;
421+
422+
many_builder = many_builder.push(
423+
session
424+
.put_file(file)
425+
.key(&prepared.key)
426+
.expiration_policy(expiration),
414427
);
415-
Ok(manifest_entries)
416428
}
417-
Err(errors) => {
429+
430+
let result = runtime.block_on(async { many_builder.send().error_for_failures().await });
431+
if let Err(errors) = result {
432+
let errors: Vec<_> = errors.collect();
418433
eprintln!("There were errors uploading images:");
419-
let mut error_count = 0;
420-
for error in errors {
421-
let error = anyhow::Error::new(error);
434+
for error in &errors {
422435
eprintln!(" {}", style(format!("{error:#}")).red());
423-
error_count += 1;
424436
}
425-
anyhow::bail!("Failed to upload {error_count} out of {uploaded_count} images")
437+
let error_count = errors.len();
438+
let batch_num = batch_idx + 1;
439+
anyhow::bail!(
440+
"Failed to upload {error_count} images in batch {batch_num}/{total_batches}"
441+
);
426442
}
427443
}
444+
445+
println!(
446+
"{} Uploaded {} image {}",
447+
style(">").dim(),
448+
style(total_count).yellow(),
449+
if total_count == 1 { "file" } else { "files" }
450+
);
451+
Ok(manifest_entries)
428452
}
429453

430454
#[cfg(test)]

0 commit comments

Comments
 (0)