Skip to content

Commit 68b24de

Browse files
committed
aws: support copies >5GB
1 parent 9dc8d7d commit 68b24de

4 files changed

Lines changed: 187 additions & 43 deletions

File tree

src/aws/builder.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ use url::Url;
4242
/// Default metadata endpoint
4343
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";
4444

45+
/// AWS S3 does not support copy operations larger than 5 GiB in a single request. See [Copy
46+
/// Object](https://docs.aws.amazon.com/AmazonS3/latest/userguide/copy-object.html) for reference.
47+
const MAX_SINGLE_REQUEST_COPY_SIZE: u64 = 5 * 1024 * 1024 * 1024;
48+
4549
/// A specialized `Error` for object store-related errors
4650
#[derive(Debug, thiserror::Error)]
4751
enum Error {
@@ -186,6 +190,10 @@ pub struct AmazonS3Builder {
186190
request_payer: ConfigValue<bool>,
187191
/// The [`HttpConnector`] to use
188192
http_connector: Option<Arc<dyn HttpConnector>>,
193+
/// Threshold (bytes) above which copy uses multipart copy. If not set, defaults to 5 GiB.
194+
multipart_copy_threshold: Option<ConfigValue<u64>>,
195+
/// Preferred multipart copy part size (bytes). If not set, defaults to 5 GiB.
196+
multipart_copy_part_size: Option<ConfigValue<u64>>,
189197
}
190198

191199
/// Configuration keys for [`AmazonS3Builder`]
@@ -396,6 +404,10 @@ pub enum AmazonS3ConfigKey {
396404

397405
/// Encryption options
398406
Encryption(S3EncryptionConfigKey),
407+
/// Threshold (bytes) to switch to multipart copy
408+
MultipartCopyThreshold,
409+
/// Preferred multipart copy part size (bytes)
410+
MultipartCopyPartSize,
399411
}
400412

401413
impl AsRef<str> for AmazonS3ConfigKey {
@@ -428,6 +440,8 @@ impl AsRef<str> for AmazonS3ConfigKey {
428440
Self::RequestPayer => "aws_request_payer",
429441
Self::Client(opt) => opt.as_ref(),
430442
Self::Encryption(opt) => opt.as_ref(),
443+
Self::MultipartCopyThreshold => "aws_multipart_copy_threshold",
444+
Self::MultipartCopyPartSize => "aws_multipart_copy_part_size",
431445
}
432446
}
433447
}
@@ -466,6 +480,12 @@ impl FromStr for AmazonS3ConfigKey {
466480
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
467481
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
468482
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
483+
"aws_multipart_copy_threshold" | "multipart_copy_threshold" => {
484+
Ok(Self::MultipartCopyThreshold)
485+
}
486+
"aws_multipart_copy_part_size" | "multipart_copy_part_size" => {
487+
Ok(Self::MultipartCopyPartSize)
488+
}
469489
// Backwards compatibility
470490
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
471491
"aws_server_side_encryption" => Ok(Self::Encryption(
@@ -631,6 +651,12 @@ impl AmazonS3Builder {
631651
self.encryption_customer_key_base64 = Some(value.into())
632652
}
633653
},
654+
AmazonS3ConfigKey::MultipartCopyThreshold => {
655+
self.multipart_copy_threshold = Some(ConfigValue::Deferred(value.into()))
656+
}
657+
AmazonS3ConfigKey::MultipartCopyPartSize => {
658+
self.multipart_copy_part_size = Some(ConfigValue::Deferred(value.into()))
659+
}
634660
};
635661
self
636662
}
@@ -698,6 +724,14 @@ impl AmazonS3Builder {
698724
self.encryption_customer_key_base64.clone()
699725
}
700726
},
727+
AmazonS3ConfigKey::MultipartCopyThreshold => self
728+
.multipart_copy_threshold
729+
.as_ref()
730+
.map(|x| x.to_string()),
731+
AmazonS3ConfigKey::MultipartCopyPartSize => self
732+
.multipart_copy_part_size
733+
.as_ref()
734+
.map(|x| x.to_string()),
701735
}
702736
}
703737

@@ -990,6 +1024,18 @@ impl AmazonS3Builder {
9901024
self
9911025
}
9921026

1027+
/// Set threshold (bytes) above which copy uses multipart copy
1028+
pub fn with_multipart_copy_threshold(mut self, threshold_bytes: u64) -> Self {
1029+
self.multipart_copy_threshold = Some(ConfigValue::Parsed(threshold_bytes));
1030+
self
1031+
}
1032+
1033+
/// Set preferred multipart copy part size (bytes)
1034+
pub fn with_multipart_copy_part_size(mut self, part_size_bytes: u64) -> Self {
1035+
self.multipart_copy_part_size = Some(ConfigValue::Parsed(part_size_bytes));
1036+
self
1037+
}
1038+
9931039
/// Create a [`AmazonS3`] instance from the provided values,
9941040
/// consuming `self`.
9951041
pub fn build(mut self) -> Result<AmazonS3> {
@@ -1147,6 +1193,17 @@ impl AmazonS3Builder {
11471193
S3EncryptionHeaders::default()
11481194
};
11491195

1196+
let multipart_copy_threshold = self
1197+
.multipart_copy_threshold
1198+
.map(|val| val.get())
1199+
.transpose()?
1200+
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
1201+
let multipart_copy_part_size = self
1202+
.multipart_copy_part_size
1203+
.map(|val| val.get())
1204+
.transpose()?
1205+
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
1206+
11501207
let config = S3Config {
11511208
region,
11521209
endpoint: self.endpoint,
@@ -1164,6 +1221,8 @@ impl AmazonS3Builder {
11641221
conditional_put: self.conditional_put.get()?,
11651222
encryption_headers,
11661223
request_payer: self.request_payer.get()?,
1224+
multipart_copy_threshold,
1225+
multipart_copy_part_size,
11671226
};
11681227

11691228
let http_client = http.connect(&config.client_options)?;

src/aws/client.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ impl From<Error> for crate::Error {
138138
pub(crate) enum PutPartPayload<'a> {
139139
Part(PutPayload),
140140
Copy(&'a Path),
141+
CopyRange(&'a Path, std::ops::Range<u64>),
141142
}
142143

143144
impl Default for PutPartPayload<'_> {
@@ -208,6 +209,10 @@ pub(crate) struct S3Config {
208209
pub conditional_put: S3ConditionalPut,
209210
pub request_payer: bool,
210211
pub(super) encryption_headers: S3EncryptionHeaders,
212+
/// Threshold in bytes above which copy will use multipart copy
213+
pub multipart_copy_threshold: u64,
214+
/// Preferred multipart copy part size in bytes (None => auto)
215+
pub multipart_copy_part_size: u64,
211216
}
212217

213218
impl S3Config {
@@ -676,7 +681,10 @@ impl S3Client {
676681
part_idx: usize,
677682
data: PutPartPayload<'_>,
678683
) -> Result<PartId> {
679-
let is_copy = matches!(data, PutPartPayload::Copy(_));
684+
let is_copy = matches!(
685+
data,
686+
PutPartPayload::Copy(_) | PutPartPayload::CopyRange(_, _)
687+
);
680688
let part = (part_idx + 1).to_string();
681689

682690
let mut request = self
@@ -690,6 +698,18 @@ impl S3Client {
690698
"x-amz-copy-source",
691699
&format!("{}/{}", self.config.bucket, encode_path(path)),
692700
),
701+
PutPartPayload::CopyRange(path, range) => {
702+
// AWS expects inclusive end for copy range header
703+
let start = range.start;
704+
let end_inclusive = range.end.saturating_sub(1);
705+
let range_value = format!("bytes={}-{}", start, end_inclusive);
706+
request
707+
.header(
708+
"x-amz-copy-source",
709+
&format!("{}/{}", self.config.bucket, encode_path(path)),
710+
)
711+
.header("x-amz-copy-source-range", &range_value)
712+
}
693713
};
694714

695715
if self

src/aws/mod.rs

Lines changed: 98 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,63 @@ impl AmazonS3 {
103103
fn path_url(&self, path: &Path) -> String {
104104
self.client.config.path_url(path)
105105
}
106+
107+
/// Perform a multipart copy operation.
108+
///
109+
/// If the multipart upload fails, make a best effort attempt to clean it up. It's the caller's
110+
/// responsibility to add a lifecycle rule if guaranteed cleanup is required, as we cannot
111+
/// protect against an ill-timed process crash.
112+
async fn copy_multipart(
113+
&self,
114+
from: &Path,
115+
to: &Path,
116+
size: u64,
117+
mode: CompleteMultipartMode,
118+
) -> Result<()> {
119+
let upload_id = self
120+
.client
121+
.create_multipart(to, PutMultipartOptions::default())
122+
.await?;
123+
124+
// S3 requires minimum 5 MiB per part (except final) and max 10,000 parts
125+
let part_size = self.client.config.multipart_copy_part_size;
126+
127+
let mut parts = Vec::new();
128+
let mut offset: u64 = 0;
129+
let mut idx: usize = 0;
130+
let res = async {
131+
while offset < size {
132+
let end = if size - offset <= part_size {
133+
size
134+
} else {
135+
offset + part_size
136+
};
137+
let payload = if offset == 0 && end == size {
138+
PutPartPayload::Copy(from)
139+
} else {
140+
PutPartPayload::CopyRange(from, offset..end)
141+
};
142+
let part = self.client.put_part(to, &upload_id, idx, payload).await?;
143+
parts.push(part);
144+
idx += 1;
145+
offset = end;
146+
}
147+
self.client
148+
.complete_multipart(to, &upload_id, parts, mode)
149+
.await
150+
.map(|_| ())
151+
}
152+
.await;
153+
154+
// If the multipart upload failed, make a best effort attempt to
155+
// clean it up. It's the caller's responsibility to add a
156+
// lifecycle rule if guaranteed cleanup is required, as we
157+
// cannot protect against an ill-timed process crash.
158+
if res.is_err() {
159+
let _ = self.client.abort_multipart(to, &upload_id).await;
160+
}
161+
res
162+
}
106163
}
107164

108165
#[async_trait]
@@ -316,11 +373,28 @@ impl ObjectStore for AmazonS3 {
316373
}
317374

318375
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
319-
self.client
320-
.copy_request(from, to)
321-
.idempotent(true)
322-
.send()
323-
.await?;
376+
// Determine source size to decide between single CopyObject and multipart copy
377+
let head_meta = self
378+
.client
379+
.get_opts(
380+
from,
381+
GetOptions {
382+
head: true,
383+
..Default::default()
384+
},
385+
)
386+
.await?
387+
.meta;
388+
if head_meta.size <= self.client.config.multipart_copy_threshold {
389+
self.client
390+
.copy_request(from, to)
391+
.idempotent(true)
392+
.send()
393+
.await?;
394+
} else {
395+
self.copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Overwrite)
396+
.await?;
397+
}
324398
Ok(())
325399
}
326400

@@ -329,45 +403,27 @@ impl ObjectStore for AmazonS3 {
329403
Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
330404
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
331405
Some(S3CopyIfNotExists::Multipart) => {
332-
let upload_id = self
406+
let head_meta = self
333407
.client
334-
.create_multipart(to, PutMultipartOptions::default())
335-
.await?;
336-
337-
let res = async {
338-
let part_id = self
339-
.client
340-
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
341-
.await?;
342-
match self
343-
.client
344-
.complete_multipart(
345-
to,
346-
&upload_id,
347-
vec![part_id],
348-
CompleteMultipartMode::Create,
349-
)
350-
.await
351-
{
352-
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
408+
.get_opts(
409+
from,
410+
GetOptions {
411+
head: true,
412+
..Default::default()
413+
},
414+
)
415+
.await?
416+
.meta;
417+
return self
418+
.copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Create)
419+
.await
420+
.map_err(|err| match err {
421+
Error::Precondition { .. } => Error::AlreadyExists {
353422
path: to.to_string(),
354-
source: Box::new(e),
355-
}),
356-
Ok(_) => Ok(()),
357-
Err(e) => Err(e),
358-
}
359-
}
360-
.await;
361-
362-
// If the multipart upload failed, make a best effort attempt to
363-
// clean it up. It's the caller's responsibility to add a
364-
// lifecycle rule if guaranteed cleanup is required, as we
365-
// cannot protect against an ill-timed process crash.
366-
if res.is_err() {
367-
let _ = self.client.abort_multipart(to, &upload_id).await;
368-
}
369-
370-
return res;
423+
source: Box::new(err),
424+
},
425+
other => other,
426+
});
371427
}
372428
#[allow(deprecated)]
373429
Some(S3CopyIfNotExists::Dynamo(lock)) => {

src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,15 @@ impl Parse for u32 {
112112
}
113113
}
114114

115+
impl Parse for u64 {
116+
fn parse(v: &str) -> Result<Self> {
117+
Self::from_str(v).map_err(|_| Error::Generic {
118+
store: "Config",
119+
source: format!("failed to parse \"{v}\" as u64").into(),
120+
})
121+
}
122+
}
123+
115124
impl Parse for HeaderValue {
116125
fn parse(v: &str) -> Result<Self> {
117126
Self::from_str(v).map_err(|_| Error::Generic {

0 commit comments

Comments
 (0)