1515
1616package software .amazon .awssdk .services .s3 .multipart ;
1717
18+ import static java .util .Base64 .getEncoder ;
1819import static java .util .concurrent .TimeUnit .SECONDS ;
1920import static org .assertj .core .api .Assertions .assertThat ;
21+ import static org .junit .Assert .assertEquals ;
2022import static software .amazon .awssdk .services .s3 .model .ServerSideEncryption .AES256 ;
2123import static software .amazon .awssdk .testutils .service .S3BucketUtils .temporaryBucketName ;
2224
2527import java .io .File ;
2628import java .io .FileInputStream ;
2729import java .io .IOException ;
30+ import java .io .InputStream ;
31+ import java .io .OutputStream ;
2832import java .nio .ByteBuffer ;
2933import java .nio .charset .Charset ;
3034import java .nio .file .Files ;
35+ import java .security .DigestInputStream ;
3136import java .security .MessageDigest ;
37+ import java .security .NoSuchAlgorithmException ;
3238import java .security .SecureRandom ;
3339import java .util .Base64 ;
3440import java .util .List ;
3541import java .util .Map ;
3642import java .util .Optional ;
43+ import java .util .Random ;
3744import java .util .UUID ;
45+ import java .util .concurrent .CompletableFuture ;
3846import java .util .zip .CRC32 ;
3947import java .util .concurrent .ExecutorService ;
4048import java .util .concurrent .Executors ;
4856import org .junit .jupiter .api .Timeout ;
4957import org .reactivestreams .Subscriber ;
5058import software .amazon .awssdk .core .ClientType ;
59+ import software .amazon .awssdk .core .ResponseBytes ;
5160import software .amazon .awssdk .core .ResponseInputStream ;
5261import software .amazon .awssdk .core .async .AsyncRequestBody ;
62+ import software .amazon .awssdk .core .async .AsyncResponseTransformer ;
63+ import software .amazon .awssdk .core .async .BlockingInputStreamAsyncRequestBody ;
5364import software .amazon .awssdk .core .interceptor .Context ;
5465import software .amazon .awssdk .core .interceptor .ExecutionAttributes ;
5566import software .amazon .awssdk .core .interceptor .ExecutionInterceptor ;
5667import software .amazon .awssdk .core .internal .async .FileAsyncRequestBody ;
5768import software .amazon .awssdk .core .sync .ResponseTransformer ;
5869import software .amazon .awssdk .services .s3 .S3AsyncClient ;
70+ import software .amazon .awssdk .services .s3 .S3Client ;
5971import software .amazon .awssdk .services .s3 .S3IntegrationTestBase ;
6072import software .amazon .awssdk .services .s3 .model .ChecksumAlgorithm ;
6173import software .amazon .awssdk .services .s3 .model .ChecksumMode ;
6274import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadRequest ;
6375import software .amazon .awssdk .services .s3 .model .CreateMultipartUploadRequest ;
76+ import software .amazon .awssdk .services .s3 .model .GetObjectRequest ;
6477import software .amazon .awssdk .services .s3 .model .GetObjectResponse ;
78+ import software .amazon .awssdk .services .s3 .model .PutObjectResponse ;
6579import software .amazon .awssdk .services .s3 .model .UploadPartRequest ;
6680import software .amazon .awssdk .services .s3 .utils .ChecksumUtils ;
81+ import software .amazon .awssdk .testutils .FileUtils ;
82+ import software .amazon .awssdk .testutils .RandomTempFile ;
83+ import software .amazon .awssdk .utils .BinaryUtils ;
6784import software .amazon .awssdk .utils .IoUtils ;
6885import software .amazon .awssdk .utils .Md5Utils ;
6986
@@ -72,9 +89,8 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest
7289
7390 private static final String TEST_BUCKET = temporaryBucketName (S3MultipartClientPutObjectIntegrationTest .class );
7491 private static final String TEST_KEY = "testfile.dat" ;
75- private static final int OBJ_SIZE = 19 * 1024 * 1024 ;
92+ private static final int OBJ_SIZE = 1024 * 1024 * 30 ;
7693 private static final CapturingInterceptor CAPTURING_INTERCEPTOR = new CapturingInterceptor ();
77- private static final byte [] CONTENT = RandomStringUtils .randomAscii (OBJ_SIZE ).getBytes (Charset .defaultCharset ());
7894 private static File testFile ;
7995 private static S3AsyncClient mpuS3Client ;
8096 private static ExecutorService executorService = Executors .newFixedThreadPool (2 );
@@ -83,8 +99,8 @@ public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTest
8399 public static void setup () throws Exception {
84100 setUp ();
85101 createBucket (TEST_BUCKET );
86- testFile = File . createTempFile ( "SplittingPublisherTest" , UUID . randomUUID (). toString ());
87- Files . write ( testFile . toPath (), CONTENT );
102+
103+ testFile = new RandomTempFile ( OBJ_SIZE );
88104 mpuS3Client = S3AsyncClient
89105 .builder ()
90106 .region (DEFAULT_REGION )
@@ -108,6 +124,26 @@ public void reset() {
108124 CAPTURING_INTERCEPTOR .reset ();
109125 }
110126
127+ @ Test
128+ public void upload_blockingInputStream_shouldSucceed () throws IOException {
129+ String objectPath = UUID .randomUUID ().toString ();
130+ String expectedMd5 = Md5Utils .md5AsBase64 (testFile );
131+
132+ BlockingInputStreamAsyncRequestBody body = AsyncRequestBody .forBlockingInputStream (null );
133+
134+ CompletableFuture <PutObjectResponse > put =
135+ mpuS3Client .putObject (req -> req .bucket (TEST_BUCKET ).key (objectPath )
136+ .build (), body );
137+ body .writeInputStream (new FileInputStream (testFile ));
138+ put .join ();
139+
140+ ResponseInputStream <GetObjectResponse > objContent = s3 .getObject (r -> r .bucket (TEST_BUCKET ).key (objectPath ),
141+ ResponseTransformer .toInputStream ());
142+
143+ String actualMd5 = BinaryUtils .toBase64 (Md5Utils .computeMD5Hash (objContent ));
144+ assertEquals (expectedMd5 , actualMd5 );
145+ }
146+
111147 @ Test
112148 void putObject_fileRequestBody_objectSentCorrectly () throws Exception {
113149 AsyncRequestBody body = AsyncRequestBody .fromFile (testFile .toPath ());
@@ -127,7 +163,7 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
127163 @ Test
128164 void putObject_inputStreamAsyncRequestBody_objectSentCorrectly () throws Exception {
129165 AsyncRequestBody body = AsyncRequestBody .fromInputStream (
130- new ByteArrayInputStream ( CONTENT ),
166+ new FileInputStream ( testFile ),
131167 Long .valueOf (OBJ_SIZE ),
132168 executorService );
133169 mpuS3Client .putObject (r -> r .bucket (TEST_BUCKET )
@@ -193,7 +229,7 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
193229 @ Test
194230 void putObject_withSSECAndChecksum_objectSentCorrectly () throws Exception {
195231 byte [] secretKey = generateSecretKey ();
196- String b64Key = Base64 . getEncoder ().encodeToString (secretKey );
232+ String b64Key = getEncoder ().encodeToString (secretKey );
197233 String b64KeyMd5 = Md5Utils .md5AsBase64 (secretKey );
198234
199235 AsyncRequestBody body = AsyncRequestBody .fromFile (testFile .toPath ());
@@ -282,17 +318,10 @@ private static String calculateCRC32AsString(String filePath) throws IOException
282318 IoUtils .drainInputStream (cis );
283319 long checksumValue = cis .getChecksum ().getValue ();
284320 byte [] checksumBytes = ByteBuffer .allocate (4 ).putInt ((int ) checksumValue ).array ();
285- return Base64 . getEncoder ().encodeToString (checksumBytes );
321+ return getEncoder ().encodeToString (checksumBytes );
286322 }
287323 }
288324
289- private static String calculateSHA1AsString () throws Exception {
290- MessageDigest md = MessageDigest .getInstance ("SHA-1" );
291- md .update (CONTENT );
292- byte [] checksum = md .digest ();
293- return Base64 .getEncoder ().encodeToString (checksum );
294- }
295-
296325 private static byte [] generateSecretKey () {
297326 KeyGenerator generator ;
298327 try {
0 commit comments