Skip to content

Commit 86c377c

Browse files
fix(snapshots): Chunk image uploads to avoid fd exhaustion and 413 errors
Uploading hundreds of images fails in two ways: all file handles are opened upfront causing EMFILE (too many open files), and the objectstore client batches everything into one HTTP request causing 413 Payload Too Large from the server. Split upload_images() into two phases: pre-process all images without opening files (collision detection, hashing, metadata), then upload in chunks of 100. This bounds both open file descriptors and HTTP request payload size regardless of total image count. Co-Authored-By: Claude <noreply@anthropic.com>
1 parent e744afd commit 86c377c

1 file changed

Lines changed: 81 additions & 56 deletions

File tree

src/commands/build/snapshots.rs

Lines changed: 81 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use anyhow::{Context as _, Result};
99
use clap::{Arg, ArgMatches, Command};
1010
use console::style;
1111
use itertools::Itertools as _;
12-
use log::{debug, info, warn};
12+
use log::{debug, warn};
1313
use objectstore_client::{ClientBuilder, ExpirationPolicy, Usecase};
1414
use secrecy::ExposeSecret as _;
1515
use serde_json::Value;
@@ -28,6 +28,7 @@ const EXPERIMENTAL_WARNING: &str =
2828

2929
const IMAGE_EXTENSIONS: &[&str] = &["png", "jpg", "jpeg"];
3030
const MAX_PIXELS_PER_IMAGE: u64 = 40_000_000;
31+
const UPLOAD_BATCH_SIZE: usize = 100;
3132

3233
pub fn make_command(command: Command) -> Command {
3334
command
@@ -282,6 +283,11 @@ fn read_sidecar_metadata(image_path: &Path) -> Result<HashMap<String, Value>> {
282283
})
283284
}
284285

286+
struct PreparedImage {
287+
path: PathBuf,
288+
key: String,
289+
}
290+
285291
fn upload_images(
286292
images: Vec<ImageInfo>,
287293
org: &str,
@@ -297,30 +303,28 @@ fn upload_images(
297303
let client = ClientBuilder::new(options.objectstore.url)
298304
.token({
299305
// TODO: replace with auth from `ObjectstoreUploadOptions` when appropriate
300-
let auth = match authenticated_api.auth() {
306+
match authenticated_api.auth() {
301307
Auth::Token(token) => token.raw().expose_secret().to_owned(),
302-
};
303-
auth
308+
}
304309
})
305310
.configure_reqwest(|r| r.connect_timeout(Duration::from_secs(10)))
306311
.build()?;
307312

313+
let scopes = options.objectstore.scopes;
314+
315+
let find_scope = |name: &str| {
316+
scopes
317+
.iter()
318+
.find(|(k, _)| k == name)
319+
.map(|(_, v)| v.clone())
320+
};
321+
let org_id = find_scope("org").context("Missing org in UploadOptions scope")?;
322+
let project_id = find_scope("project").context("Missing project in UploadOptions scope")?;
323+
308324
let mut scope = Usecase::new("preprod").scope();
309-
let (mut org_id, mut project_id): (Option<String>, Option<String>) = (None, None);
310-
for (key, value) in options.objectstore.scopes.into_iter() {
311-
scope = scope.push(&key, value.clone());
312-
if key == "org" {
313-
org_id = Some(value);
314-
} else if key == "project" {
315-
project_id = Some(value);
316-
}
325+
for (key, value) in scopes {
326+
scope = scope.push(&key, value);
317327
}
318-
let Some(org_id) = org_id else {
319-
anyhow::bail!("Missing org in UploadOptions scope");
320-
};
321-
let Some(project_id) = project_id else {
322-
anyhow::bail!("Missing project in UploadOptions scope");
323-
};
324328

325329
let session = scope.session(&client)?;
326330

@@ -329,10 +333,11 @@ fn upload_images(
329333
.build()
330334
.context("Failed to create tokio runtime")?;
331335

332-
let mut many_builder = session.many();
333336
let mut manifest_entries = HashMap::new();
334337
let mut collisions: HashMap<String, Vec<String>> = HashMap::new();
335-
let mut kept_paths: HashMap<String, String> = HashMap::new();
338+
let mut kept_paths = HashMap::new();
339+
let mut uploads = Vec::with_capacity(images.len());
340+
336341
for image in images {
337342
debug!("Processing image: {}", image.path.display());
338343

@@ -354,21 +359,7 @@ fn upload_images(
354359
}
355360

356361
let hash = compute_sha256_hash(&image.path)?;
357-
let file = runtime
358-
.block_on(tokio::fs::File::open(&image.path))
359-
.with_context(|| {
360-
format!("Failed to open image for upload: {}", image.path.display())
361-
})?;
362-
363362
let key = format!("{org_id}/{project_id}/{hash}");
364-
info!("Queueing {} as {key}", image.relative_path.display());
365-
366-
many_builder = many_builder.push(
367-
session
368-
.put_file(file)
369-
.key(&key)
370-
.expiration_policy(expiration),
371-
);
372363

373364
let mut extra = read_sidecar_metadata(&image.path).unwrap_or_else(|err| {
374365
warn!("Error reading sidecar metadata, ignoring it instead: {err:#}");
@@ -377,47 +368,81 @@ fn upload_images(
377368
extra.insert("content_hash".to_owned(), serde_json::Value::String(hash));
378369

379370
kept_paths.insert(image_file_name.clone(), relative_path);
371+
uploads.push(PreparedImage {
372+
path: image.path,
373+
key,
374+
});
380375
manifest_entries.insert(
381376
image_file_name,
382377
ImageMetadata::new(image.width, image.height, extra),
383378
);
384379
}
385380

386381
if !collisions.is_empty() {
387-
let mut details = String::new();
388-
for (name, excluded_paths) in &collisions {
389-
let mut all_paths = vec![kept_paths[name].as_str()];
390-
all_paths.extend(excluded_paths.iter().map(|s| s.as_str()));
391-
details.push_str(&format!("\n {name}: {}", all_paths.join(", ")));
392-
}
382+
let details: String = collisions
383+
.iter()
384+
.map(|(name, excluded)| {
385+
let kept = &kept_paths[name];
386+
let all = std::iter::once(kept.as_str())
387+
.chain(excluded.iter().map(|s| s.as_str()))
388+
.join(", ");
389+
format!("\n {name}: {all}")
390+
})
391+
.collect();
393392
warn!("Some images share identical file names. Only the first occurrence of each is included:{details}");
394393
}
395394

396-
let result = runtime.block_on(async { many_builder.send().error_for_failures().await });
395+
let total_count = uploads.len();
396+
let total_batches = total_count.div_ceil(UPLOAD_BATCH_SIZE);
397397

398-
let uploaded_count = manifest_entries.len();
398+
for (batch_idx, chunk) in uploads.chunks(UPLOAD_BATCH_SIZE).enumerate() {
399+
debug!(
400+
"Uploading batch {}/{total_batches} ({} images)",
401+
batch_idx + 1,
402+
chunk.len()
403+
);
399404

400-
match result {
401-
Ok(()) => {
402-
println!(
403-
"{} Uploaded {} image {}",
404-
style(">").dim(),
405-
style(uploaded_count).yellow(),
406-
if uploaded_count == 1 { "file" } else { "files" }
405+
let mut many_builder = session.many();
406+
for prepared in chunk {
407+
let file = runtime
408+
.block_on(tokio::fs::File::open(&prepared.path))
409+
.with_context(|| {
410+
format!(
411+
"Failed to open image for upload: {}",
412+
prepared.path.display()
413+
)
414+
})?;
415+
416+
many_builder = many_builder.push(
417+
session
418+
.put_file(file)
419+
.key(&prepared.key)
420+
.expiration_policy(expiration),
407421
);
408-
Ok(manifest_entries)
409422
}
410-
Err(errors) => {
423+
424+
let result = runtime.block_on(async { many_builder.send().error_for_failures().await });
425+
if let Err(errors) = result {
426+
let errors: Vec<_> = errors.collect();
411427
eprintln!("There were errors uploading images:");
412-
let mut error_count = 0;
413-
for error in errors {
414-
let error = anyhow::Error::new(error);
428+
for error in &errors {
415429
eprintln!(" {}", style(format!("{error:#}")).red());
416-
error_count += 1;
417430
}
418-
anyhow::bail!("Failed to upload {error_count} out of {uploaded_count} images")
431+
let error_count = errors.len();
432+
let batch_num = batch_idx + 1;
433+
anyhow::bail!(
434+
"Failed to upload {error_count} images in batch {batch_num}/{total_batches}"
435+
);
419436
}
420437
}
438+
439+
println!(
440+
"{} Uploaded {} image {}",
441+
style(">").dim(),
442+
style(total_count).yellow(),
443+
if total_count == 1 { "file" } else { "files" }
444+
);
445+
Ok(manifest_entries)
421446
}
422447

423448
#[cfg(test)]

0 commit comments

Comments
 (0)