Skip to content

Commit cac9172

Browse files
authored
feat: DH-18145: Write timeout configuration with S3 (deephaven#6650)
1 parent e6397c7 commit cac9172

11 files changed

Lines changed: 373 additions & 60 deletions

File tree

extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ClientFactory.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,13 @@
1717
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
1818
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
1919
import software.amazon.awssdk.core.exception.SdkClientException;
20-
import software.amazon.awssdk.core.retry.RetryMode;
2120
import software.amazon.awssdk.http.SdkHttpClient;
2221
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
2322
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
2423
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
2524
import software.amazon.awssdk.regions.Region;
26-
import software.amazon.awssdk.services.s3.S3AsyncClient;
27-
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
28-
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
29-
import software.amazon.awssdk.services.s3.S3Client;
30-
import software.amazon.awssdk.services.s3.S3ClientBuilder;
25+
import software.amazon.awssdk.retries.StandardRetryStrategy;
26+
import software.amazon.awssdk.services.s3.*;
3127
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
3228

3329
import java.time.Duration;
@@ -46,6 +42,10 @@ final class S3ClientFactory {
4642
getOrComputeThreadCountProperty("S3.numFutureCompletionThreads", -1);
4743
private static final int NUM_SCHEDULED_EXECUTOR_THREADS =
4844
getOrComputeThreadCountProperty("S3.numScheduledExecutorThreads", 5);
45+
// The default retries value of 3 matches the value from the deprecated
46+
// software.amazon.awssdk.core.retry.RetryMode#STANDARD
47+
// which allowed 2 retries (3 attempts).
48+
static final int RETRY_STRATEGY_MAX_ATTEMPTS = 3;
4949

5050
private static final Logger log = LoggerFactory.getLogger(S3ClientFactory.class);
5151
private static final Map<HttpClientConfig, SdkAsyncHttpClient> httpAsyncClientCache = new ConcurrentHashMap<>();
@@ -150,9 +150,9 @@ private static <Builder extends SdkClientBuilder<Builder, Client>, Client> void
150150
// If we find that the STANDARD retry policy does not work well in all situations, we might
151151
// try experimenting with ADAPTIVE retry policy, potentially with fast fail.
152152
// .retryPolicy(RetryPolicy.builder(RetryMode.ADAPTIVE).fastFailRateLimiting(true).build())
153-
.retryPolicy(RetryMode.STANDARD)
154-
.apiCallAttemptTimeout(instructions.readTimeout().dividedBy(3))
155-
.apiCallTimeout(instructions.readTimeout())
153+
.retryStrategy(StandardRetryStrategy.builder()
154+
.maxAttempts(RETRY_STRATEGY_MAX_ATTEMPTS)
155+
.build())
156156
// Adding a metrics publisher may be useful for debugging, but it's very verbose.
157157
// .addMetricPublisher(LoggingMetricPublisher.create(Level.INFO, Format.PRETTY))
158158
.scheduledExecutorService(ensureScheduledExecutor());

extensions/s3/src/main/java/io/deephaven/extensions/s3/S3CompletableOutputStream.java

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,22 @@
77
import io.deephaven.util.channel.CompletableOutputStream;
88
import org.jetbrains.annotations.NotNull;
99
import org.jetbrains.annotations.Nullable;
10+
import software.amazon.awssdk.awscore.AwsRequest;
1011
import software.amazon.awssdk.core.async.AsyncRequestBody;
1112
import software.amazon.awssdk.services.s3.S3AsyncClient;
1213
import software.amazon.awssdk.services.s3.S3Uri;
1314
import software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils;
14-
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
15-
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
16-
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
17-
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
18-
import software.amazon.awssdk.services.s3.model.CompletedPart;
19-
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
20-
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
21-
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
22-
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
15+
import software.amazon.awssdk.services.s3.model.*;
2316

2417
import java.io.IOException;
2518
import java.net.URI;
2619
import java.nio.ByteBuffer;
27-
import java.util.ArrayList;
28-
import java.util.Collections;
29-
import java.util.Comparator;
30-
import java.util.List;
31-
import java.util.Objects;
32-
import java.util.concurrent.CancellationException;
33-
import java.util.concurrent.CompletableFuture;
34-
import java.util.concurrent.CompletionException;
35-
import java.util.concurrent.ExecutionException;
36-
import java.util.concurrent.Semaphore;
20+
import java.time.Duration;
21+
import java.util.*;
22+
import java.util.concurrent.*;
3723

3824
import static io.deephaven.extensions.s3.S3ReadContext.handleS3Exception;
25+
import static io.deephaven.extensions.s3.S3Utils.addTimeout;
3926

4027
class S3CompletableOutputStream extends CompletableOutputStream {
4128

@@ -290,10 +277,10 @@ public void close() throws IOException {
290277
////////// Helper methods and classes //////////
291278

292279
private String initiateMultipartUpload() throws IOException {
293-
final CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
280+
final CreateMultipartUploadRequest.Builder builder = CreateMultipartUploadRequest.builder()
294281
.bucket(uri.bucket().orElseThrow())
295-
.key(uri.key().orElseThrow())
296-
.build();
282+
.key(uri.key().orElseThrow());
283+
final CreateMultipartUploadRequest createMultipartUploadRequest = applyOverrideConfiguration(builder).build();
297284
// Note: We can add support for other parameters like tagging, storage class, encryption, permissions, etc. in
298285
// future
299286
final CompletableFuture<CreateMultipartUploadResponse> initiateUploadFuture =
@@ -306,6 +293,7 @@ private String initiateMultipartUpload() throws IOException {
306293
try {
307294
response = initiateUploadFuture.get();
308295
} catch (final InterruptedException | ExecutionException | CancellationException e) {
296+
initiateUploadFuture.cancel(true);
309297
throw handleS3Exception(e, String.format("initiating multipart upload for uri %s", uri), s3Instructions);
310298
}
311299
return response.uploadId();
@@ -334,12 +322,12 @@ private void sendPartRequest(boolean onDone) throws IOException {
334322
return;
335323
}
336324

337-
final UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
325+
final UploadPartRequest.Builder builder = UploadPartRequest.builder()
338326
.bucket(uri.bucket().orElseThrow())
339327
.key(uri.key().orElseThrow())
340328
.uploadId(uploadId)
341-
.partNumber(nextPartNumber)
342-
.build();
329+
.partNumber(nextPartNumber);
330+
final UploadPartRequest uploadPartRequest = applyOverrideConfiguration(builder).build();
343331

344332
final int partNumber = nextPartNumber;
345333
final CompletableFuture<UploadPartResponse> uploadPartFuture = s3AsyncClient.uploadPart(uploadPartRequest,
@@ -449,17 +437,20 @@ private void completeMultipartUpload() throws IOException {
449437
// Sort the completed parts by part number, as required by S3
450438
completedParts.sort(Comparator.comparingInt(CompletedPart::partNumber));
451439

452-
final CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder()
440+
final CompleteMultipartUploadRequest.Builder builder = CompleteMultipartUploadRequest.builder()
453441
.bucket(uri.bucket().orElseThrow())
454442
.key(uri.key().orElseThrow())
455443
.uploadId(uploadId)
456444
.multipartUpload(CompletedMultipartUpload.builder()
457445
.parts(completedParts)
458-
.build())
459-
.build();
446+
.build());
447+
final CompleteMultipartUploadRequest completeRequest = applyOverrideConfiguration(builder).build();
448+
final CompletableFuture<CompleteMultipartUploadResponse> uploadFuture =
449+
s3AsyncClient.completeMultipartUpload(completeRequest);
460450
try {
461-
s3AsyncClient.completeMultipartUpload(completeRequest).get();
451+
uploadFuture.get();
462452
} catch (final InterruptedException | ExecutionException | CancellationException e) {
453+
uploadFuture.cancel(true);
463454
final IOException ex = handleS3Exception(e,
464455
String.format("completing multipart upload for uri %s", uri), s3Instructions);
465456
failAll(ex);
@@ -503,19 +494,32 @@ private void abortMultipartUpload() throws IOException {
503494
}
504495

505496
// Initiate the abort request
506-
final AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder()
497+
final AbortMultipartUploadRequest.Builder builder = AbortMultipartUploadRequest.builder()
507498
.bucket(uri.bucket().orElseThrow())
508499
.key(uri.key().orElseThrow())
509-
.uploadId(uploadId)
510-
.build();
500+
.uploadId(uploadId);
501+
final AbortMultipartUploadRequest abortRequest = applyOverrideConfiguration(builder).build();
511502
final CompletableFuture<AbortMultipartUploadResponse> future = s3AsyncClient.abortMultipartUpload(abortRequest);
512503

513504
// Wait for the abort to complete
514505
try {
515506
future.get();
516507
} catch (final InterruptedException | ExecutionException | CancellationException e) {
508+
future.cancel(true);
517509
throw handleS3Exception(e,
518510
String.format("aborting multipart upload for uri %s", uri), s3Instructions);
519511
}
520512
}
513+
514+
/**
515+
* Applies the write timeout, then generates a request for the given builder and request class.
516+
*
517+
* @param builder an instance of a {@link S3Request.Builder} class
518+
* @return the builder
519+
*/
520+
private <B extends AwsRequest.Builder> B applyOverrideConfiguration(@NotNull B builder) {
521+
final Duration writeTimeout = s3Instructions.writeTimeout();
522+
builder.overrideConfiguration(b -> addTimeout(b, writeTimeout));
523+
return builder;
524+
}
521525
}

extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Instructions.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,13 @@ public abstract class S3Instructions implements LogOutputAppendable {
3333
private static final int DEFAULT_FRAGMENT_SIZE = 1 << 16; // 64 KiB
3434
private static final int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB
3535
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2);
36-
private static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2);
36+
@VisibleForTesting
37+
static final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2);
38+
@VisibleForTesting
39+
static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofSeconds(2);
3740
private static final int DEFAULT_NUM_CONCURRENT_WRITE_PARTS = 64;
3841
private static final int MIN_CONCURRENT_WRITE_PARTS = 1;
42+
private static final Duration MIN_READ_WRITE_TIMEOUT = Duration.ofMillis(1);
3943

4044
/**
4145
* We set default part size to 10 MiB. The maximum number of parts allowed is 10,000. This means maximum size of a
@@ -115,6 +119,16 @@ public Duration readTimeout() {
115119
return DEFAULT_READ_TIMEOUT;
116120
}
117121

122+
/**
123+
* The amount of time to wait when writing a fragment before giving up and timing out, defaults to 2 seconds. The
124+
* implementation may choose to internally retry the request multiple times, so long as the total time does not
125+
* exceed this timeout.
126+
*/
127+
@Default
128+
public Duration writeTimeout() {
129+
return DEFAULT_WRITE_TIMEOUT;
130+
}
131+
118132
/**
119133
* The credentials to use when reading or writing to S3. By default, uses {@link Credentials#resolving()}.
120134
*/
@@ -220,6 +234,8 @@ public interface Builder {
220234

221235
Builder readTimeout(Duration connectionTimeout);
222236

237+
Builder writeTimeout(Duration connectionTimeout);
238+
223239
Builder credentials(Credentials credentials);
224240

225241
Builder endpointOverride(URI endpointOverride);
@@ -307,6 +323,24 @@ final void boundsCheckMinNumConcurrentWriteParts() {
307323
}
308324
}
309325

326+
@Check
327+
final void boundsCheckReadTimeout() {
328+
if (MIN_READ_WRITE_TIMEOUT.compareTo(readTimeout()) > 0) {
329+
throw new IllegalArgumentException(
330+
"readTimeout(=" + readTimeout() + ") must be >= " +
331+
MIN_READ_WRITE_TIMEOUT);
332+
}
333+
}
334+
335+
@Check
336+
final void boundsCheckWriteTimeout() {
337+
if (MIN_READ_WRITE_TIMEOUT.compareTo(writeTimeout()) > 0) {
338+
throw new IllegalArgumentException(
339+
"writeTimeout(=" + writeTimeout() + ") must be >= " +
340+
MIN_READ_WRITE_TIMEOUT);
341+
}
342+
}
343+
310344
@Check
311345
final void boundsCheckMaxNumConcurrentWriteParts() {
312346
if (numConcurrentWriteParts() > maxConcurrentRequests()) {

extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadRequest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import java.util.concurrent.TimeoutException;
3232
import java.util.function.BiConsumer;
3333

34+
import static io.deephaven.extensions.s3.S3Utils.addTimeout;
35+
3436
/**
3537
* A request for a single fragment of an S3 object, which can be used concurrently.
3638
*
@@ -312,11 +314,13 @@ private int requestLength() {
312314
}
313315

314316
private GetObjectRequest getObjectRequest() {
315-
return GetObjectRequest.builder()
317+
final GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
316318
.bucket(s3Uri.bucket().orElseThrow())
317319
.key(s3Uri.key().orElseThrow())
318-
.range("bytes=" + from + "-" + to)
319-
.build();
320+
.range("bytes=" + from + "-" + to);
321+
final Duration readTimeout = instructions.readTimeout();
322+
requestBuilder.overrideConfiguration(b -> addTimeout(b, readTimeout));
323+
return requestBuilder.build();
320324
}
321325

322326
String requestStr() {

extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.util.concurrent.ExecutionException;
4040
import java.util.concurrent.TimeUnit;
4141
import java.util.concurrent.TimeoutException;
42+
import java.time.Duration;
43+
import java.util.concurrent.*;
4244
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4345
import java.util.stream.Stream;
4446
import java.util.stream.StreamSupport;
@@ -47,6 +49,7 @@
4749
import static io.deephaven.base.FileUtils.REPEATED_URI_SEPARATOR_PATTERN;
4850
import static io.deephaven.base.FileUtils.URI_SEPARATOR;
4951
import static io.deephaven.extensions.s3.S3ReadContext.handleS3Exception;
52+
import static io.deephaven.extensions.s3.S3Utils.addTimeout;
5053

5154
/**
5255
* {@link SeekableChannelsProvider} implementation that is used to fetch objects from an S3-compatible API.
@@ -225,12 +228,17 @@ private void fetchNextBatch() throws IOException {
225228
// Add a delimiter to the request if we don't want to fetch all files recursively
226229
requestBuilder.delimiter("/");
227230
}
228-
final long readTimeoutNanos = s3Instructions.readTimeout().toNanos();
231+
final Duration readTimeout = s3Instructions.readTimeout();
232+
final long readTimeoutNanos = readTimeout.toNanos();
233+
requestBuilder.overrideConfiguration(b -> addTimeout(b, readTimeout));
234+
229235
final ListObjectsV2Request request = requestBuilder.continuationToken(continuationToken).build();
236+
final CompletableFuture<ListObjectsV2Response> responseFuture = s3AsyncClient.listObjectsV2(request);
230237
final ListObjectsV2Response response;
231238
try {
232-
response = s3AsyncClient.listObjectsV2(request).get(readTimeoutNanos, TimeUnit.NANOSECONDS);
239+
response = responseFuture.get(readTimeoutNanos, TimeUnit.NANOSECONDS);
233240
} catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) {
241+
responseFuture.cancel(true);
234242
throw handleS3Exception(e, String.format("fetching list of files in directory %s", directory),
235243
s3Instructions);
236244
}
@@ -292,14 +300,16 @@ long fetchFileSize(@NotNull final S3Uri s3Uri) throws IOException {
292300
log.debug().append("Head: ").append(s3Uri.toString()).endl();
293301
}
294302
final HeadObjectResponse headObjectResponse;
303+
final HeadObjectRequest.Builder requestBuilder = HeadObjectRequest.builder()
304+
.bucket(s3Uri.bucket().orElseThrow())
305+
.key(s3Uri.key().orElseThrow());
306+
final Duration readTimeout = s3Instructions.readTimeout();
307+
requestBuilder.overrideConfiguration(b -> addTimeout(b, readTimeout));
308+
final CompletableFuture<HeadObjectResponse> responseFuture = s3AsyncClient.headObject(requestBuilder.build());
295309
try {
296-
headObjectResponse = s3AsyncClient
297-
.headObject(HeadObjectRequest.builder()
298-
.bucket(s3Uri.bucket().orElseThrow())
299-
.key(s3Uri.key().orElseThrow())
300-
.build())
301-
.get(s3Instructions.readTimeout().toNanos(), TimeUnit.NANOSECONDS);
310+
headObjectResponse = responseFuture.get(readTimeout.toNanos(), TimeUnit.NANOSECONDS);
302311
} catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) {
312+
responseFuture.cancel(true);
303313
throw handleS3Exception(e, String.format("fetching HEAD for file %s", s3Uri), s3Instructions);
304314
}
305315
final long fileSize = headObjectResponse.contentLength();

extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Utils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,16 @@
44
package io.deephaven.extensions.s3;
55

66
import org.jetbrains.annotations.NotNull;
7+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
78
import software.amazon.awssdk.profiles.ProfileFile;
89
import software.amazon.awssdk.profiles.ProfileFileLocation;
910

1011
import java.nio.file.Path;
12+
import java.time.Duration;
1113
import java.util.Optional;
1214

15+
import static io.deephaven.extensions.s3.S3ClientFactory.RETRY_STRATEGY_MAX_ATTEMPTS;
16+
1317
class S3Utils {
1418

1519
/**
@@ -48,4 +52,15 @@ private static void addProfileFile(
4852
.content(path)
4953
.build());
5054
}
55+
56+
/**
57+
* Helper function to add timeout to the builder.
58+
*
59+
* @param builder the {@link AwsRequestOverrideConfiguration.Builder} to add the timeout to
60+
* @param timeout the timeout to add
61+
*/
62+
static void addTimeout(AwsRequestOverrideConfiguration.Builder builder, final Duration timeout) {
63+
builder.apiCallAttemptTimeout(timeout.dividedBy(RETRY_STRATEGY_MAX_ATTEMPTS))
64+
.apiCallTimeout(timeout);
65+
}
5166
}

0 commit comments

Comments
 (0)