Skip to content

Commit 425c826

Browse files
axpnetaeroftp[bot]claude
committed
feat(transfer): converge B2 large-file upload on shared part engine
Introduce run_concurrent_part_upload in providers/multi_thread.rs: the upload-side mirror of the shared concurrent-range download engine. It plans fixed-size parts over a local file (plan_fixed_size_parts: only the last part may be shorter, part size grown to respect the 10000-part protocol cap), reads each part on demand after a slot frees so peak RAM stays bounded to max_parallel * part_size, uploads with a true sliding window (a freed slot starts the next part, no per-batch barrier), aggregates progress, is cancellable, and aborts every sibling on the first error. The transport-specific part is a single closure returning the provider's part identifier; the engine returns them sorted, ready for the finalising call. Converge B2 upload_large_file from a sequential single-URL loop onto this engine. B2 requires an independent upload-part-URL per in-flight part, so the closure acquires one per slot. Session lifecycle stays in the provider, so the failure path is unchanged. B2 now advertises the multipart capability honestly (supports_multipart, parallel parts); the download path stays single-stream so no range capability is overclaimed. S3's native multipart path is deliberately left untouched: it is already live-validated byte-identical and converging it would risk regressing a proven path for no measured gain (tracked, not done here: the same no-churn-on-a-validated-path principle the download side applied to S3's native range path). Add six unit tests for the part planner (even split, uneven final part, zero inputs, protocol-cap part growth, single part, B2-realistic sizing) and a live B2 round-trip test asserting honest capability advertisement plus byte-identical reassembly with an uneven final part. Co-Authored-By: aeroftp[bot] <aeroftp[bot]@users.noreply.github.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fab94a3 commit 425c826

3 files changed

Lines changed: 431 additions & 47 deletions

File tree

src-tauri/src/providers/b2.rs

Lines changed: 80 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ const AUTHORIZE_URL: &str = "https://api.backblazeb2.com/b2api/v4/b2_authorize_a
2626
const SINGLE_UPLOAD_RECOMMENDED_MAX: u64 = 200 * 1024 * 1024; // 200 MB; above this use large-file workflow
2727
const LARGE_FILE_PART_SIZE: u64 = 100 * 1024 * 1024; // 100 MB per part (recommended)
2828
const LARGE_FILE_MIN_PART_SIZE: u64 = 5 * 1024 * 1024; // B2 minimum (last part may be smaller)
29+
/// Concurrent large-file part uploads. B2 requires an independent
30+
/// upload-part-URL per in-flight part (an upload URL is single-threaded), so
31+
/// each slot fetches its own. Peak transient RAM is bounded to
32+
/// `LARGE_FILE_MAX_PARALLEL * LARGE_FILE_PART_SIZE`; 4 keeps that at ~400 MB
33+
/// on a >200 MB upload, the same order of magnitude as comparable clients,
34+
/// and matches the in-repo S3 multipart fan-out for consistency.
35+
const LARGE_FILE_MAX_PARALLEL: usize = 4;
2936
const COPY_MAX_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5 GB hard limit on b2_copy_file
3037
const COPY_PART_MAX_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5 GB per b2_copy_part call
3138
const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024 * 1024 * 1024; // 10 TB practical ceiling
@@ -42,6 +49,7 @@ fn b2_log(_msg: &str) {}
4249

4350
// ─── Configuration ───
4451

52+
#[derive(Clone)]
4553
pub struct B2Config {
4654
pub application_key_id: String,
4755
pub application_key: SecretString,
@@ -288,6 +296,11 @@ pub struct B2UnfinishedUpload {
288296

289297
// ─── Provider ───
290298

299+
/// `Clone` is cheap: `reqwest::Client` is internally `Arc`, the rest are
300+
/// owned `String`/`SecretString` config. It exists so concurrent part-upload
301+
/// workers each get an independent handle (the same pattern S3 uses for its
302+
/// native multipart path), never to duplicate a live connection.
303+
#[derive(Clone)]
291304
pub struct B2Provider {
292305
config: B2Config,
293306
client: reqwest::Client,
@@ -637,61 +650,67 @@ impl B2Provider {
637650
.map_err(|e| ProviderError::ServerError(format!("finish_large_file parse: {}", e)))
638651
}
639652

653+
/// Large-file upload converged on the shared concurrent part-upload
654+
/// engine (PD-UP-1). The B2-specific part is just "fetch a fresh
655+
/// upload-part-URL, PUT this part, return its `contentSha1`": B2 requires
656+
/// an independent upload URL per concurrent part (a URL is
657+
/// single-threaded), so the closure acquires one per slot. Part planning,
658+
/// the 10000-part protocol cap, the only-last-part-may-be-smaller
659+
/// invariant, bounded concurrency, progress, and abort-on-first-error all
660+
/// live once in the shared engine. Session lifecycle stays here: an
661+
/// engine `Err` leaves the started large file unfinished, exactly as the
662+
/// previous sequential path did (B2 auto-expires an unfinished large file
663+
/// and the `upload()` caller still retries once on a master-token
664+
/// failure: no behaviour change on the failure path).
640665
async fn upload_large_file(
641666
&self,
642667
local_path: &str,
643668
key: &str,
644669
size: u64,
645670
progress: Option<Box<dyn Fn(u64, u64) + Send>>,
646671
) -> Result<(), ProviderError> {
672+
use crate::providers::multi_thread::{
673+
run_concurrent_part_upload, ConcurrentPartUploadConfig,
674+
};
675+
647676
let start = self.start_large_file(key).await?;
648-
let part_urls = self.get_upload_part_url(&start.file_id).await?;
649-
let mut file = tokio::fs::File::open(local_path)
650-
.await
651-
.map_err(|e| ProviderError::Other(format!("open local: {}", e)))?;
652-
use tokio::io::AsyncReadExt;
653-
let mut part_sha1s: Vec<String> = Vec::new();
654-
let mut part_number: u32 = 1;
655-
let mut transferred: u64 = 0;
656-
loop {
657-
let remaining = size.saturating_sub(transferred);
658-
if remaining == 0 {
659-
break;
660-
}
661-
let this_part = remaining.min(LARGE_FILE_PART_SIZE);
662-
let mut buf = vec![0u8; this_part as usize];
663-
file.read_exact(&mut buf)
664-
.await
665-
.map_err(|e| ProviderError::Other(format!("read part {}: {}", part_number, e)))?;
666-
// Validate part size: only the LAST part may be smaller than 5 MB
667-
let is_last = remaining == this_part;
668-
if !is_last && this_part < LARGE_FILE_MIN_PART_SIZE {
669-
return Err(ProviderError::InvalidConfig(format!(
670-
"part {} too small ({} < {})",
671-
part_number, this_part, LARGE_FILE_MIN_PART_SIZE
672-
)));
673-
}
674-
let part_resp = self
675-
.upload_part(
676-
&part_urls.upload_url,
677-
&part_urls.authorization_token,
678-
part_number,
679-
buf,
680-
)
681-
.await?;
682-
part_sha1s.push(part_resp.content_sha1);
683-
transferred += this_part;
684-
part_number += 1;
685-
if let Some(ref p) = progress {
686-
p(transferred, size);
687-
}
688-
if part_number > 10_000 {
689-
return Err(ProviderError::InvalidConfig(
690-
"exceeded B2 limit of 10000 parts".into(),
691-
));
677+
let file_id = start.file_id.clone();
678+
679+
let provider = self.clone();
680+
let part_file_id = file_id.clone();
681+
let upload_one_part = move |part_number: u32, data: Vec<u8>| {
682+
let provider = provider.clone();
683+
let file_id = part_file_id.clone();
684+
async move {
685+
let part_urls = provider.get_upload_part_url(&file_id).await?;
686+
let resp = provider
687+
.upload_part(
688+
&part_urls.upload_url,
689+
&part_urls.authorization_token,
690+
part_number,
691+
data,
692+
)
693+
.await?;
694+
Ok::<String, ProviderError>(resp.content_sha1)
692695
}
693-
}
694-
self.finish_large_file(&start.file_id, part_sha1s).await?;
696+
};
697+
698+
let parts = run_concurrent_part_upload(
699+
ConcurrentPartUploadConfig {
700+
local_path: std::path::PathBuf::from(local_path),
701+
total_size: size,
702+
part_size: LARGE_FILE_PART_SIZE,
703+
max_parts: 10_000,
704+
max_parallel: LARGE_FILE_MAX_PARALLEL,
705+
},
706+
upload_one_part,
707+
tokio_util::sync::CancellationToken::new(),
708+
progress,
709+
)
710+
.await?;
711+
712+
let part_sha1s: Vec<String> = parts.into_iter().map(|(_, sha1)| sha1).collect();
713+
self.finish_large_file(&file_id, part_sha1s).await?;
695714
Ok(())
696715
}
697716

@@ -2403,6 +2422,21 @@ impl StorageProvider for B2Provider {
24032422
expires_at,
24042423
})
24052424
}
2425+
2426+
/// Advertise the parallel large-file capability honestly now that
2427+
/// `upload_large_file` genuinely uploads parts concurrently (PD-UP-1).
2428+
/// Only the multipart fields are set: B2's download path stays
2429+
/// single-stream, so `supports_range_download` is left at its default
2430+
/// `false` (no overclaim of a range capability B2 does not honour here).
2431+
fn transfer_optimization_hints(&self) -> super::TransferOptimizationHints {
2432+
super::TransferOptimizationHints {
2433+
supports_multipart: true,
2434+
multipart_threshold: SINGLE_UPLOAD_RECOMMENDED_MAX,
2435+
multipart_part_size: LARGE_FILE_PART_SIZE,
2436+
multipart_max_parallel: LARGE_FILE_MAX_PARALLEL as u8,
2437+
..Default::default()
2438+
}
2439+
}
24062440
}
24072441

24082442
// ─── Helpers ───

0 commit comments

Comments
 (0)