Skip to content

Commit 289554e

Browse files
committed
PR comments - keep transformerCount in FileAsyncResponseTransformerPublisher
1 parent 1716e46 commit 289554e

2 files changed

Lines changed: 8 additions & 4 deletions

File tree

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.nio.file.Path;
2020
import java.util.Optional;
2121
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2322
import java.util.concurrent.atomic.AtomicLong;
2423
import org.reactivestreams.Subscriber;
2524
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -46,6 +45,8 @@ public class FileAsyncResponseTransformerPublisher<T extends SdkResponse>
4645
private final FileTransformerConfiguration initialConfig;
4746
private final long initialPosition;
4847
private Subscriber<?> subscriber;
48+
private final AtomicLong transformerCount;
49+
4950

5051
public FileAsyncResponseTransformerPublisher(FileAsyncResponseTransformer<?> responseTransformer) {
5152
this.path = Validate.paramNotNull(responseTransformer.path(), "path");
@@ -54,6 +55,7 @@ public FileAsyncResponseTransformerPublisher(FileAsyncResponseTransformer<?> res
5455
"CREATE_OR_APPEND_TO_EXISTING is not supported for non-serial operations");
5556
this.initialConfig = Validate.paramNotNull(responseTransformer.config(), "fileTransformerConfiguration");
5657
this.initialPosition = responseTransformer.position();
58+
this.transformerCount = new AtomicLong(0);
5759
}
5860

5961
@Override
@@ -129,10 +131,14 @@ public void onResponse(T response) {
129131
CompletableFuture<T> delegateFuture = delegate.prepare();
130132
CompletableFutureUtils.forwardResultTo(delegateFuture, future);
131133
CompletableFutureUtils.forwardExceptionTo(future, delegateFuture);
134+
transformerCount.incrementAndGet();
132135
delegate.onResponse(response);
133136
}
134137

135138
private AsyncResponseTransformer<T, T> getDelegateTransformer(Long startAt) {
139+
if (transformerCount.get() == 0) {
140+
return AsyncResponseTransformer.toFile(path, initialConfig);
141+
}
136142
switch (initialConfig.fileWriteOption()) {
137143
case CREATE_NEW:
138144
case CREATE_OR_REPLACE_EXISTING: {

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisherTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,13 @@
2020
import static org.mockito.Mockito.mock;
2121
import static org.mockito.Mockito.when;
2222

23-
import com.google.common.jimfs.Configuration;
2423
import com.google.common.jimfs.Jimfs;
2524
import java.nio.ByteBuffer;
2625
import java.nio.file.FileSystem;
2726
import java.nio.file.Files;
2827
import java.nio.file.Path;
2928
import java.util.ArrayList;
3029
import java.util.Arrays;
31-
import java.util.Collections;
3230
import java.util.List;
3331
import java.util.Optional;
3432
import java.util.concurrent.CompletableFuture;
@@ -54,7 +52,7 @@ class FileAsyncResponseTransformerPublisherTest {
5452

5553
@BeforeEach
5654
void setUp() throws Exception {
57-
fileSystem = Jimfs.newFileSystem(Configuration.unix());
55+
fileSystem = Jimfs.newFileSystem();
5856
testFile = fileSystem.getPath("/test-file.txt");
5957
}
6058

0 commit comments

Comments
 (0)