Skip to content

Commit 3895c79

Browse files
committed
Add completed part log.
Make Sonar happy - remove unused fields - remove unused test imports and comments - uncomment test - add missing asserts
1 parent 6a7f9ae commit 3895c79

6 files changed

Lines changed: 116 additions & 15 deletions

File tree

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,9 @@
3939
@SdkInternalApi
4040
public class FileAsyncResponseTransformerPublisher<T extends SdkResponse>
4141
implements SdkPublisher<AsyncResponseTransformer<T, T>> {
42-
private static final Logger log = Logger.loggerFor(FileAsyncResponseTransformerPublisher.class);
4342

4443
private final Path path;
4544
private final FileTransformerConfiguration initialConfig;
46-
private final long initialPosition;
4745
private Subscriber<?> subscriber;
4846
private final AtomicLong transformerCount;
4947

@@ -54,7 +52,6 @@ public FileAsyncResponseTransformerPublisher(FileAsyncResponseTransformer<?> res
5452
!= FileTransformerConfiguration.FileWriteOption.CREATE_OR_APPEND_TO_EXISTING,
5553
"CREATE_OR_APPEND_TO_EXISTING is not supported for non-serial operations");
5654
this.initialConfig = Validate.paramNotNull(responseTransformer.config(), "fileTransformerConfiguration");
57-
this.initialPosition = responseTransformer.position();
5855
this.transformerCount = new AtomicLong(0);
5956
}
6057

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisherTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2020
import static org.assertj.core.api.Assertions.fail;
21-
import static org.assertj.core.api.Assertions.in;
2221
import static org.mockito.Mockito.mock;
2322
import static org.mockito.Mockito.when;
2423

@@ -42,7 +41,6 @@
4241
import org.junit.jupiter.api.AfterEach;
4342
import org.junit.jupiter.api.BeforeEach;
4443
import org.junit.jupiter.api.Test;
45-
import org.junit.jupiter.api.TestInstance;
4644
import org.junit.jupiter.params.ParameterizedTest;
4745
import org.junit.jupiter.params.provider.MethodSource;
4846
import org.reactivestreams.Subscriber;
@@ -60,7 +58,7 @@ class FileAsyncResponseTransformerPublisherTest {
6058
private Path testFile;
6159

6260
@BeforeEach
63-
void setUp() throws Exception {
61+
void setUp() {
6462
fileSystem = Jimfs.newFileSystem();
6563
testFile = fileSystem.getPath(String.format("/test-file-%s.txt", UUID.randomUUID()));
6664
}
@@ -75,8 +73,6 @@ void tearDown() throws Exception {
7573
void singleDemand_shouldEmitOneTransformer(
7674
Function<Path, AsyncResponseTransformer<SdkResponse, SdkResponse>> transformerFunction) throws Exception {
7775
// Given
78-
// FileAsyncResponseTransformer<SdkResponse> initialTransformer =
79-
// (FileAsyncResponseTransformer<SdkResponse>) AsyncResponseTransformer.<SdkResponse>toFile(testFile);
8076

8177
AsyncResponseTransformer<SdkResponse, SdkResponse> initialTransformer = transformerFunction.apply(testFile);
8278
createFileIfNeeded(initialTransformer);
@@ -151,6 +147,7 @@ public void request(long n) {
151147

152148
@Override
153149
public void cancel() {
150+
// unused for tests
154151
}
155152
});
156153
}
@@ -212,6 +209,7 @@ public void onError(Throwable t) {
212209

213210
@Override
214211
public void onComplete() {
212+
// unused for test
215213
}
216214
});
217215

@@ -228,7 +226,7 @@ public void onComplete() {
228226
initialTransformer.config().fileWriteOption() == FileTransformerConfiguration.FileWriteOption.WRITE_TO_POSITION
229227
? (int) initialTransformer.position()
230228
: 0;
231-
assertThat(fileContent.length).isEqualTo(80 + offset);
229+
assertThat(fileContent).hasSize(80 + offset);
232230
for (int i = 0; i < numTransformers; i++) {
233231
int startPos = i * 10;
234232
byte[] expectedData = new byte[10];
@@ -290,7 +288,7 @@ private static Stream<Function<Path, AsyncResponseTransformer<SdkResponse, SdkRe
290288
}
291289

292290
@Test
293-
void createOrAppendToExisting_shouldThrowException() throws Exception {
291+
void createOrAppendToExisting_shouldThrowException() {
294292
AsyncResponseTransformer<Object, Object> initialTransformer = AsyncResponseTransformer.toFile(
295293
testFile,
296294
FileTransformerConfiguration.builder()

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriber.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,8 @@ private int nextPart() {
443443
synchronized (initialCompletedParts) {
444444
int part = partNumber.incrementAndGet();
445445
while (initialCompletedParts.contains(part)) {
446+
final int finalPart = part;
447+
log.debug(() -> "skipping part " + finalPart + " because already completed");
446448
part = partNumber.incrementAndGet();
447449
}
448450
return part;

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/ParallelMultipartDownloaderSubscriberWiremockTest.java

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515

1616
package software.amazon.awssdk.services.s3.internal.multipart;
1717

18+
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
1821
import static org.assertj.core.api.Assertions.assertThat;
22+
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.MULTIPART_DOWNLOAD_RESUME_CONTEXT;
1923

2024
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo;
2125
import com.github.tomakehurst.wiremock.junit5.WireMockTest;
@@ -25,13 +29,15 @@
2529
import java.nio.file.FileSystem;
2630
import java.nio.file.Files;
2731
import java.nio.file.Path;
32+
import java.util.Set;
2833
import java.util.concurrent.CompletableFuture;
2934
import org.junit.jupiter.api.AfterEach;
3035
import org.junit.jupiter.api.BeforeEach;
3136
import org.junit.jupiter.api.Test;
3237
import org.junit.jupiter.params.ParameterizedTest;
3338
import org.junit.jupiter.params.provider.ValueSource;
3439
import org.reactivestreams.Subscriber;
40+
import org.testng.collections.Sets;
3541
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
3642
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
3743
import software.amazon.awssdk.core.FileTransformerConfiguration;
@@ -56,7 +62,7 @@ class ParallelMultipartDownloaderSubscriberWiremockTest {
5662
private Path testFile;
5763

5864
@BeforeEach
59-
public void init(WireMockRuntimeInfo wiremock) throws Exception {
65+
void init(WireMockRuntimeInfo wiremock) throws Exception {
6066
s3AsyncClient = S3AsyncClient.builder()
6167
.credentialsProvider(StaticCredentialsProvider.create(
6268
AwsBasicCredentials.create("key", "secret")))
@@ -146,4 +152,85 @@ void singlePartObject_shouldCompleteWithoutMultipart() throws Exception {
146152
utils.verifyCorrectAmountOfRequestsMade(1);
147153
}
148154

155+
@Test
156+
void whenPartsAlreadyCompleted_shouldDownloadOnlyMissingParts() throws Exception {
157+
int partSize = 1024;
158+
int numParts = 90;
159+
160+
// make sure the file contains all zero
161+
Files.write(testFile, new byte[partSize * numParts]);
162+
163+
Set<Integer> completedParts = Sets.newHashSet(5, 12, 23, 34, 45, 56, 67, 78, 81, 89);
164+
byte[] expectedBody = stubAllPartsWithCompleted(testBucket, testKey, numParts, partSize, completedParts);
165+
166+
FileTransformerConfiguration fileTransformerConfiguration = FileTransformerConfiguration
167+
.builder()
168+
.fileWriteOption(FileTransformerConfiguration.FileWriteOption.WRITE_TO_POSITION)
169+
.failureBehavior(FileTransformerConfiguration.FailureBehavior.DELETE)
170+
.position(0L)
171+
.build();
172+
173+
AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> transformer =
174+
AsyncResponseTransformer.toFile(testFile, fileTransformerConfiguration);
175+
176+
AsyncResponseTransformer.SplitResult<GetObjectResponse, GetObjectResponse> split = transformer.split(
177+
SplittingTransformerConfiguration.builder()
178+
.bufferSizeInBytes(1024 * 32L)
179+
.build());
180+
181+
MultipartDownloadResumeContext context = new MultipartDownloadResumeContext(completedParts, 0L);
182+
CompletableFuture<GetObjectResponse> resultFuture = new CompletableFuture<>();
183+
Subscriber<AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>> subscriber =
184+
new ParallelMultipartDownloaderSubscriber(
185+
s3AsyncClient,
186+
GetObjectRequest.builder()
187+
.overrideConfiguration(c -> c.putExecutionAttribute(MULTIPART_DOWNLOAD_RESUME_CONTEXT, context))
188+
.bucket(testBucket)
189+
.key(testKey)
190+
.build(),
191+
resultFuture,
192+
50);
193+
194+
split.publisher().subscribe(subscriber);
195+
GetObjectResponse getObjectResponse = resultFuture.join();
196+
197+
assertThat(Files.exists(testFile)).isTrue();
198+
byte[] actualBody = Files.readAllBytes(testFile);
199+
assertThat(actualBody).isEqualTo(expectedBody);
200+
assertThat(getObjectResponse).isNotNull();
201+
202+
verifyCorrectRequestsMade(numParts, completedParts);
203+
}
204+
205+
private void verifyCorrectRequestsMade(int numParts, Set<Integer> completedParts) {
206+
String urlTemplate = ".*partNumber=%d\\b.*";
207+
for (int i = 1; i <= numParts; i++) {
208+
if (completedParts.contains(i)) {
209+
verify(0, getRequestedFor(urlMatching(String.format(urlTemplate, i))));
210+
} else {
211+
verify(getRequestedFor(urlMatching(String.format(urlTemplate, i))));
212+
}
213+
}
214+
verify(0, getRequestedFor(urlMatching(String.format(urlTemplate, numParts + 1))));
215+
216+
}
217+
218+
private byte[] stubAllPartsWithCompleted(String testBucket, String testKey, int amountOfPartToTest, int partSize,
219+
Set<Integer> completedParts) {
220+
byte[] expectedBody = new byte[amountOfPartToTest * partSize];
221+
for (int i = 0; i < amountOfPartToTest; i++) {
222+
if (completedParts.contains(i + 1)) {
223+
// fill with zero
224+
byte[] individualBody = new byte[partSize];
225+
System.arraycopy(individualBody, 0, expectedBody, i * partSize, individualBody.length);
226+
} else {
227+
byte[] individualBody = utils.stubForPart(testBucket, testKey, i + 1, amountOfPartToTest, partSize);
228+
System.arraycopy(individualBody, 0, expectedBody, i * partSize, individualBody.length);
229+
}
230+
}
231+
return expectedBody;
232+
233+
}
234+
235+
149236
}

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartClientGetObjectWiremockTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,8 @@ public <T> void errorOnThirdPart_shouldCompleteExceptionallyOnlyPartsGreaterThan
147147
}
148148
}
149149

150-
// todo temporary, remove when support for resume is added to multipart file download
151-
// @ParameterizedTest
152-
// @MethodSource("partSizeAndTransformerParams")
150+
@ParameterizedTest
151+
@MethodSource("partSizeAndTransformerParams")
153152
public <T> void partCountValidationFailure_shouldThrowException(
154153
AsyncResponseTransformerTestSupplier<T> supplier,
155154
int partSize) {

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/multipart/S3MultipartFileDownloadWiremockTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class S3MultipartFileDownloadWiremockTest {
6363
private Path testFile;
6464

6565
@BeforeEach
66-
public void init(WireMockRuntimeInfo wiremock) throws Exception {
66+
void init(WireMockRuntimeInfo wiremock) {
6767
s3AsyncClient = S3AsyncClient.builder()
6868
.credentialsProvider(StaticCredentialsProvider.create(
6969
AwsBasicCredentials.create("key", "secret")))
@@ -316,7 +316,25 @@ void errorOnFirstPart_retryable_thenSucceeds() throws Exception {
316316
.withHeader("ETag", "test-etag")
317317
.withBody(part3Data)));
318318

319+
CompletableFuture<GetObjectResponse> resp = s3AsyncClient.getObject(b -> b
320+
.bucket(testBucket)
321+
.key(testKey)
322+
.build(),
323+
AsyncResponseTransformer.toFile(testFile));
319324

325+
assertThat(resp).succeedsWithin(Duration.of(10, ChronoUnit.SECONDS));
326+
327+
verify(exactly(3), getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=1", testBucket, testKey))));
328+
verify(exactly(1), getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=2", testBucket, testKey))));
329+
verify(exactly(1), getRequestedFor(urlEqualTo(String.format("/%s/%s?partNumber=3", testBucket, testKey))));
330+
331+
assertThat(Files.exists(testFile)).isTrue();
332+
byte[] actualBody = Files.readAllBytes(testFile);
333+
byte[] expectedBody = new byte[partSize * totalPart];
334+
System.arraycopy(part1Data, 0, expectedBody, 0, partSize);
335+
System.arraycopy(part2Data, 0, expectedBody, partSize, partSize);
336+
System.arraycopy(part3Data, 0, expectedBody, partSize * 2, partSize);
337+
assertThat(actualBody).isEqualTo(expectedBody);
320338
}
321339

322340
@Test

0 commit comments

Comments
 (0)