Skip to content

Commit 1a61282

Browse files
BearMinimum98Kevin Zhou
andauthored
Support PutMultipartOptions for MultipartStore::create_multipart for AWS/GCP. (#754)
* Add create_multipart_opts * Add is_empty for TagSet * fix test --------- Co-authored-by: Kevin Zhou <kevinzhou@roblox.com>
1 parent 996e084 commit 1a61282

9 files changed

Lines changed: 220 additions & 5 deletions

File tree

src/aws/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,14 @@ impl MultipartStore for AmazonS3 {
490490
.await
491491
}
492492

493+
async fn create_multipart_opts(
494+
&self,
495+
path: &Path,
496+
opts: PutMultipartOptions,
497+
) -> Result<MultipartId> {
498+
self.client.create_multipart(path, opts).await
499+
}
500+
493501
async fn put_part(
494502
&self,
495503
path: &Path,
@@ -703,6 +711,7 @@ mod tests {
703711
rename_and_copy(&integration).await;
704712
stream_get(&integration).await;
705713
multipart(&integration, &integration).await;
714+
multipart_with_opts(&integration, &integration).await;
706715
multipart_put_part_out_of_order(&integration, &integration).await;
707716
multipart_race_condition(&integration, true).await;
708717
multipart_out_of_order(&integration).await;

src/azure/mod.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
//! Unused blocks will automatically be dropped after 7 days.
2828
//!
2929
use crate::{
30-
CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
30+
CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
3131
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
3232
UploadPart,
3333
multipart::{MultipartStore, PartId},
@@ -308,6 +308,26 @@ impl MultipartStore for MicrosoftAzure {
308308
Ok(String::new())
309309
}
310310

311+
async fn create_multipart_opts(
312+
&self,
313+
path: &Path,
314+
opts: PutMultipartOptions,
315+
) -> Result<MultipartId> {
316+
let PutMultipartOptions {
317+
tags,
318+
attributes,
319+
extensions: _,
320+
} = opts;
321+
322+
if !tags.is_empty() || !attributes.is_empty() {
323+
return Err(Error::NotSupported {
324+
source: "`create_multipart_opts` with non-default options is not supported by MicrosoftAzure".into(),
325+
});
326+
}
327+
328+
self.create_multipart(path).await
329+
}
330+
311331
async fn put_part(
312332
&self,
313333
path: &Path,
@@ -353,12 +373,33 @@ mod tests {
353373
use crate::ObjectStoreExt;
354374
use crate::integration::*;
355375
use crate::tests::*;
376+
use crate::{Attribute, Attributes};
356377
use base64::Engine;
357378
use base64::prelude::BASE64_STANDARD;
358379
use bytes::Bytes;
359380
use std::time::{SystemTime, UNIX_EPOCH};
360381

361382
#[cfg(feature = "reqwest")]
383+
#[tokio::test]
384+
async fn azure_create_multipart_opts_rejects_attributes() {
385+
let integration = MicrosoftAzureBuilder::new()
386+
.with_container_name("test")
387+
.with_use_emulator(true)
388+
.build()
389+
.unwrap();
390+
let opts = PutMultipartOptions {
391+
attributes: Attributes::from_iter([(Attribute::Metadata("key".into()), "value")]),
392+
..Default::default()
393+
};
394+
395+
let err = integration
396+
.create_multipart_opts(&Path::from("test"), opts)
397+
.await
398+
.unwrap_err();
399+
400+
assert!(matches!(err, Error::NotSupported { .. }));
401+
}
402+
362403
#[tokio::test]
363404
async fn azure_blob_test() {
364405
maybe_skip_integration!();

src/gcp/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,14 @@ impl MultipartStore for GoogleCloudStorage {
236236
.await
237237
}
238238

239+
async fn create_multipart_opts(
240+
&self,
241+
path: &Path,
242+
opts: PutMultipartOptions,
243+
) -> Result<MultipartId> {
244+
self.client.multipart_initiate(path, opts).await
245+
}
246+
239247
async fn put_part(
240248
&self,
241249
path: &Path,
@@ -335,6 +343,7 @@ mod test {
335343
// https://github.com/fsouza/fake-gcs-server/issues/852
336344
stream_get(&integration).await;
337345
multipart(&integration, &integration).await;
346+
multipart_with_opts(&integration, &integration).await;
338347
multipart_put_part_out_of_order(&integration, &integration).await;
339348
multipart_race_condition(&integration, true).await;
340349
multipart_out_of_order(&integration).await;

src/integration.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use crate::multipart::MultipartStore;
2929
use crate::path::Path;
3030
use crate::{
3131
Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload,
32-
ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart,
32+
ObjectStore, ObjectStoreExt, PutMode, PutMultipartOptions, PutPayload, UpdateVersion,
33+
WriteMultipart,
3334
};
3435
use bytes::Bytes;
3536
use futures_util::stream::FuturesUnordered;
@@ -1053,6 +1054,41 @@ pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn MultipartStore
10531054
assert_eq!(meta.size, 0);
10541055
}
10551056

1057+
/// Tests [`MultipartStore::create_multipart_opts`]
1058+
pub async fn multipart_with_opts(storage: &dyn ObjectStore, multipart: &dyn MultipartStore) {
1059+
let path = Path::from("test_multipart_with_opts");
1060+
let chunk_size = 5 * 1024 * 1024;
1061+
let chunks = get_chunks(chunk_size, 2);
1062+
let attributes =
1063+
Attributes::from_iter([(Attribute::Metadata("test_key".into()), "test_value")]);
1064+
let opts = PutMultipartOptions {
1065+
attributes: attributes.clone(),
1066+
..Default::default()
1067+
};
1068+
1069+
let id = multipart.create_multipart_opts(&path, opts).await.unwrap();
1070+
1071+
let parts: Vec<_> = futures_util::stream::iter(chunks)
1072+
.enumerate()
1073+
.map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
1074+
.buffered(2)
1075+
.try_collect()
1076+
.await
1077+
.unwrap();
1078+
1079+
multipart
1080+
.complete_multipart(&path, &id, parts)
1081+
.await
1082+
.unwrap();
1083+
1084+
let result = storage.get(&path).await.unwrap();
1085+
assert_eq!(result.meta.size, chunk_size as u64 * 2);
1086+
// Some providers add default attributes, e.g. S3 may report a default Content-Type.
1087+
for (attribute, value) in &attributes {
1088+
assert_eq!(result.attributes.get(attribute), Some(value));
1089+
}
1090+
}
1091+
10561092
/// Tests that [`MultipartStore::put_part`] may be invoked with non-sequential part indices.
10571093
pub async fn multipart_put_part_out_of_order(
10581094
storage: &dyn ObjectStore,

src/memory.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct Storage {
116116

117117
#[derive(Debug, Default, Clone)]
118118
struct PartStorage {
119+
attributes: Attributes,
119120
parts: Vec<Option<Bytes>>,
120121
}
121122

@@ -420,10 +421,25 @@ impl ObjectStore for InMemory {
420421
#[async_trait]
421422
impl MultipartStore for InMemory {
422423
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
424+
self.create_multipart_opts(_path, PutMultipartOptions::default())
425+
.await
426+
}
427+
428+
async fn create_multipart_opts(
429+
&self,
430+
_path: &Path,
431+
opts: PutMultipartOptions,
432+
) -> Result<MultipartId> {
423433
let mut storage = self.storage.write();
424434
let etag = storage.next_etag;
425435
storage.next_etag += 1;
426-
storage.uploads.insert(etag, Default::default());
436+
storage.uploads.insert(
437+
etag,
438+
PartStorage {
439+
attributes: opts.attributes,
440+
parts: Default::default(),
441+
},
442+
);
427443
Ok(etag.to_string())
428444
}
429445

@@ -462,7 +478,7 @@ impl MultipartStore for InMemory {
462478
for x in &upload.parts {
463479
buf.extend_from_slice(x.as_ref().unwrap())
464480
}
465-
let etag = storage.insert(path, buf.into(), Default::default());
481+
let etag = storage.insert(path, buf.into(), upload.attributes);
466482
Ok(PutResult {
467483
e_tag: Some(etag.to_string()),
468484
version: None,
@@ -562,6 +578,7 @@ mod tests {
562578
stream_get(&integration).await;
563579
put_opts(&integration, true).await;
564580
multipart(&integration, &integration).await;
581+
multipart_with_opts(&integration, &integration).await;
565582
put_get_attributes(&integration).await;
566583
multipart_put_part_out_of_order(&integration, &integration).await;
567584
}

src/multipart.rs

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
use async_trait::async_trait;
2525

2626
use crate::path::Path;
27-
use crate::{MultipartId, PutPayload, PutResult, Result};
27+
use crate::{Error, MultipartId, PutMultipartOptions, PutPayload, PutResult, Result};
2828

2929
/// Represents a part of a file that has been successfully uploaded in a multipart upload process.
3030
#[derive(Debug, Clone)]
@@ -46,6 +46,30 @@ pub trait MultipartStore: Send + Sync + 'static {
4646
/// Creates a new multipart upload, returning the [`MultipartId`]
4747
async fn create_multipart(&self, path: &Path) -> Result<MultipartId>;
4848

49+
/// Creates a new multipart upload with the given options, returning the [`MultipartId`]
50+
///
51+
/// This allows callers using the low-level multipart API to provide object attributes,
52+
/// tags, or implementation-specific extensions when initiating the upload.
53+
async fn create_multipart_opts(
54+
&self,
55+
path: &Path,
56+
opts: PutMultipartOptions,
57+
) -> Result<MultipartId> {
58+
let PutMultipartOptions {
59+
tags,
60+
attributes,
61+
extensions: _,
62+
} = opts;
63+
64+
if !tags.is_empty() || !attributes.is_empty() {
65+
return Err(Error::NotSupported {
66+
source: "create_multipart_opts with non-default options".into(),
67+
});
68+
}
69+
70+
self.create_multipart(path).await
71+
}
72+
4973
/// Uploads a new part with index `part_idx`
5074
///
5175
/// `part_idx` should be an integer in the range `0..N` where `N` is the number of
@@ -82,3 +106,58 @@ pub trait MultipartStore: Send + Sync + 'static {
82106
/// Aborts a multipart upload
83107
async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()>;
84108
}
109+
110+
#[cfg(test)]
111+
mod tests {
112+
use super::*;
113+
use crate::Extensions;
114+
115+
struct TestMultipartStore;
116+
117+
#[async_trait]
118+
impl MultipartStore for TestMultipartStore {
119+
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
120+
Ok("test".into())
121+
}
122+
123+
async fn put_part(
124+
&self,
125+
_path: &Path,
126+
_id: &MultipartId,
127+
_part_idx: usize,
128+
_data: PutPayload,
129+
) -> Result<PartId> {
130+
unreachable!()
131+
}
132+
133+
async fn complete_multipart(
134+
&self,
135+
_path: &Path,
136+
_id: &MultipartId,
137+
_parts: Vec<PartId>,
138+
) -> Result<PutResult> {
139+
unreachable!()
140+
}
141+
142+
async fn abort_multipart(&self, _path: &Path, _id: &MultipartId) -> Result<()> {
143+
unreachable!()
144+
}
145+
}
146+
147+
#[tokio::test]
148+
async fn default_create_multipart_opts_ignores_extensions() {
149+
let mut extensions = Extensions::new();
150+
extensions.insert("extension");
151+
let opts = PutMultipartOptions {
152+
extensions,
153+
..Default::default()
154+
};
155+
156+
let id = TestMultipartStore
157+
.create_multipart_opts(&Path::from("test"), opts)
158+
.await
159+
.unwrap();
160+
161+
assert_eq!(id, "test");
162+
}
163+
}

src/prefix.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,15 @@ impl<T: MultipartStore> MultipartStore for PrefixStore<T> {
210210
self.inner.create_multipart(&full_path).await
211211
}
212212

213+
async fn create_multipart_opts(
214+
&self,
215+
path: &Path,
216+
opts: PutMultipartOptions,
217+
) -> Result<MultipartId> {
218+
let full_path = self.full_path(path);
219+
self.inner.create_multipart_opts(&full_path, opts).await
220+
}
221+
213222
async fn put_part(
214223
&self,
215224
path: &Path,
@@ -392,6 +401,7 @@ mod tests {
392401
let store = PrefixStore::new(InMemory::new(), "prefix");
393402

394403
multipart(&store, &store).await;
404+
multipart_with_opts(&store, &store).await;
395405
multipart_put_part_out_of_order(&store, &store).await;
396406
multipart_out_of_order(&store).await;
397407
multipart_race_condition(&store, true).await;

src/tags.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ impl TagSet {
4141
pub fn encoded(&self) -> &str {
4242
&self.0
4343
}
44+
45+
/// Return whether this [`TagSet`] contains any tags
46+
pub fn is_empty(&self) -> bool {
47+
self.0.is_empty()
48+
}
4449
}
4550

4651
#[cfg(test)]

src/throttle.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,14 @@ impl<T: MultipartStore> MultipartStore for ThrottledStore<T> {
310310
self.inner.create_multipart(path).await
311311
}
312312

313+
async fn create_multipart_opts(
314+
&self,
315+
path: &Path,
316+
opts: PutMultipartOptions,
317+
) -> Result<MultipartId> {
318+
self.inner.create_multipart_opts(path, opts).await
319+
}
320+
313321
async fn put_part(
314322
&self,
315323
path: &Path,
@@ -400,6 +408,7 @@ mod tests {
400408
copy_if_not_exists(&store).await;
401409
stream_get(&store).await;
402410
multipart(&store, &store).await;
411+
multipart_with_opts(&store, &store).await;
403412
multipart_put_part_out_of_order(&store, &store).await;
404413
}
405414

0 commit comments

Comments
 (0)