Skip to content

Commit 9e8f51f

Browse files
committed
Add retry support for HTTP execution failure
Signed-off-by: Bala.FA <bala@minio.io>
1 parent f3c353c commit 9e8f51f

3 files changed

Lines changed: 347 additions & 95 deletions

File tree

api/src/main/java/io/minio/BaseS3Client.java

Lines changed: 98 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.fasterxml.jackson.databind.MapperFeature;
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.fasterxml.jackson.databind.json.JsonMapper;
24-
import com.google.common.collect.ImmutableSet;
2524
import io.minio.credentials.Credentials;
2625
import io.minio.credentials.Provider;
2726
import io.minio.errors.ErrorResponseException;
@@ -53,9 +52,8 @@
5352
import java.nio.charset.StandardCharsets;
5453
import java.security.SecureRandom;
5554
import java.util.Arrays;
56-
import java.util.Collections;
5755
import java.util.HashMap;
58-
import java.util.Locale;
56+
import java.util.List;
5957
import java.util.Map;
6058
import java.util.Random;
6159
import java.util.Set;
@@ -67,6 +65,7 @@
6765
import javax.annotation.Nonnull;
6866
import okhttp3.Call;
6967
import okhttp3.Callback;
68+
import okhttp3.Interceptor;
7069
import okhttp3.MediaType;
7170
import okhttp3.OkHttpClient;
7271
import okhttp3.RequestBody;
@@ -100,17 +99,13 @@ public abstract class BaseS3Client implements AutoCloseable {
10099
.build();
101100

102101
private static final String RETRY_HEAD = "RetryHead";
103-
private static final String END_HTTP = "----------END-HTTP----------";
104-
private static final String UPLOAD_ID = "uploadId";
105-
private static final Set<String> TRACE_QUERY_PARAMS =
106-
ImmutableSet.of("retention", "legal-hold", "tagging", UPLOAD_ID, "acl", "attributes");
107-
private PrintWriter traceStream;
102+
private volatile PrintWriter traceStream;
108103
protected final Map<String, String> regionCache = new ConcurrentHashMap<>();
109104
protected String userAgent = Utils.getDefaultUserAgent();
110105

111106
protected Http.BaseUrl baseUrl;
112107
protected Provider provider;
113-
protected OkHttpClient httpClient;
108+
protected volatile OkHttpClient httpClient;
114109
protected boolean closeHttpClient;
115110

116111
protected BaseS3Client(
@@ -137,6 +132,46 @@ public void close() {
137132
}
138133
}
139134

135+
/**
136+
* Sets request retry parameters. Any null/invalid values disable retry.
137+
*
138+
* <pre>Example:{@code
139+
* minioClient.setRetryParameters(ImmutableSet.of(408, 504), 250, 3);
140+
* }</pre>
141+
*
142+
* @param retryStatusCodes HTTP status codes to be retried.
143+
* @param delayMs Delay between retries.
144+
* @param maxRetries Maximum number of retry attempts.
145+
*/
146+
public void setRetry(Set<Integer> retryStatusCodes, Long delayMs, Integer maxRetries) {
147+
Interceptor interceptor =
148+
new Http.StatusRetryInterceptor(
149+
retryStatusCodes, delayMs == null ? 0 : delayMs, maxRetries == null ? 0 : maxRetries);
150+
151+
List<Interceptor> interceptors = this.httpClient.interceptors();
152+
int i = 0;
153+
boolean found = false;
154+
for (i = 0; i < interceptors.size(); i++) {
155+
Interceptor inter = interceptors.get(i);
156+
if (inter instanceof Http.StatusRetryInterceptor) {
157+
found = true;
158+
break;
159+
}
160+
}
161+
162+
OkHttpClient.Builder builder = this.httpClient.newBuilder();
163+
if (found) {
164+
builder.interceptors().clear();
165+
for (int j = 0; j < interceptors.size(); j++) {
166+
builder.addInterceptor(i == j ? interceptor : interceptors.get(j));
167+
}
168+
} else {
169+
builder.addInterceptor(interceptor);
170+
}
171+
172+
this.httpClient = builder.build();
173+
}
174+
140175
/**
141176
* Sets HTTP connect, write and read timeouts. A value of 0 means no timeout, otherwise values
142177
* must be between 1 and Integer.MAX_VALUE when converted to milliseconds.
@@ -268,25 +303,59 @@ private String[] handleRedirectResponse(
268303
return new String[] {code, message};
269304
}
270305

306+
private OkHttpClient getHttpClient(PrintWriter traceStream, Http.S3Request s3request) {
307+
if (traceStream == null) return this.httpClient;
308+
309+
OkHttpClient httpClient = this.httpClient;
310+
List<Interceptor> interceptors = httpClient.interceptors();
311+
int i = 0;
312+
Http.StatusRetryInterceptor interceptor = null;
313+
for (i = 0; i < interceptors.size(); i++) {
314+
Interceptor inter = interceptors.get(i);
315+
if (inter instanceof Http.StatusRetryInterceptor) {
316+
interceptor = (Http.StatusRetryInterceptor) inter;
317+
break;
318+
}
319+
}
320+
321+
OkHttpClient.Builder builder = httpClient.newBuilder();
322+
if (interceptor == null) {
323+
builder.addInterceptor(
324+
new Http.StatusRetryInterceptor(interceptor, traceStream, s3request.object() == null));
325+
} else {
326+
builder.interceptors().clear();
327+
for (int j = 0; j < interceptors.size(); j++) {
328+
if (i == j) {
329+
builder.addInterceptor(
330+
new Http.StatusRetryInterceptor(
331+
interceptor, traceStream, s3request.object() == null));
332+
} else {
333+
builder.addInterceptor(interceptors.get(j));
334+
}
335+
}
336+
}
337+
338+
return builder.build();
339+
}
340+
271341
/** Execute HTTP request asynchronously for given parameters. */
272342
protected CompletableFuture<Response> executeAsync(Http.S3Request s3request, String region) {
273343
Credentials credentials = (provider == null) ? null : provider.fetch();
274344
Http.Request request = null;
345+
PrintWriter traceStream = this.traceStream;
275346
try {
276347
request = s3request.toRequest(baseUrl, region, credentials);
277348
} catch (MinioException e) {
278349
return Utils.failedFuture(e);
279350
}
280351

281-
StringBuilder traceBuilder = new StringBuilder(request.httpTraces());
282-
PrintWriter traceStream = this.traceStream;
283-
if (traceStream != null) traceStream.print(request.httpTraces());
352+
OkHttpClient httpClient = getHttpClient(traceStream, s3request);
353+
boolean statusRetryInterceptorFound = true;
284354

285-
OkHttpClient httpClient = this.httpClient;
286-
// FIXME: enable retry for all request.
287-
// if (!s3request.retryFailure()) {
288-
// httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build();
289-
// }
355+
StringBuilder traceBuilder = new StringBuilder(request.httpTraces());
356+
if (!statusRetryInterceptorFound && traceStream != null) {
357+
traceStream.print(request.httpTraces());
358+
}
290359

291360
okhttp3.Request httpRequest = request.httpRequest();
292361
CompletableFuture<Response> completableFuture = newCompleteableFuture();
@@ -309,57 +378,26 @@ public void onResponse(Call call, final Response response) throws IOException {
309378
}
310379

311380
private void onResponse(final Response response) throws IOException {
312-
String trace =
313-
String.format(
314-
"%s %d %s%n%s",
315-
response.protocol().toString().toUpperCase(Locale.US),
316-
response.code(),
317-
response.message(),
318-
response.headers().toString());
319-
if (!trace.endsWith("\n\n")) {
320-
trace += trace.endsWith("\n") ? "\n" : "\n\n";
381+
String traces =
382+
Http.getResponseTraces(
383+
response,
384+
s3request.method(),
385+
s3request.queryParams(),
386+
s3request.object() == null);
387+
if (!statusRetryInterceptorFound && traceStream != null) {
388+
traceStream.print(traces);
321389
}
322-
traceBuilder.append(trace);
323-
if (traceStream != null) traceStream.print(trace);
324-
325390
if (response.isSuccessful()) {
326-
if (traceStream != null) {
327-
// Trace response body only if the request is not
328-
// GetObject/ListenBucketNotification
329-
// S3 API.
330-
Set<String> keys = s3request.queryParams().keySet();
331-
if ((s3request.method() != Http.Method.GET
332-
|| s3request.object() == null
333-
|| !Collections.disjoint(keys, TRACE_QUERY_PARAMS))
334-
&& !(keys.contains("events")
335-
&& (keys.contains("prefix") || keys.contains("suffix")))) {
336-
String responseBody = response.peekBody(1024 * 1024).string();
337-
traceStream.print(responseBody);
338-
if (!responseBody.endsWith("\n")) traceStream.println();
339-
}
340-
traceStream.println(END_HTTP);
341-
}
342-
343391
completableFuture.complete(response);
344392
return;
345393
}
346394

395+
traceBuilder.append(traces);
347396
String errorXml = null;
348397
try (ResponseBody responseBody = response.body()) {
349398
errorXml = responseBody.string();
350399
}
351400

352-
if (!("".equals(errorXml) && s3request.method().equals(Http.Method.HEAD))) {
353-
traceBuilder.append(errorXml);
354-
if (traceStream != null) traceStream.print(errorXml);
355-
if (!errorXml.endsWith("\n")) {
356-
traceBuilder.append("\n");
357-
if (traceStream != null) traceStream.println();
358-
}
359-
}
360-
traceBuilder.append(END_HTTP).append("\n");
361-
if (traceStream != null) traceStream.println(END_HTTP);
362-
363401
// Error out for Non-XML response from server for non-HEAD requests.
364402
String contentType = response.headers().get(Http.Headers.CONTENT_TYPE);
365403
if (!s3request.method().equals(Http.Method.HEAD)
@@ -635,7 +673,7 @@ protected void checkArgs(BaseArgs args) {
635673
public CompletableFuture<AbortMultipartUploadResponse> abortMultipartUpload(
636674
AbortMultipartUploadArgs args) {
637675
checkArgs(args);
638-
return executeDeleteAsync(args, null, new Http.QueryParameters(UPLOAD_ID, args.uploadId()))
676+
return executeDeleteAsync(args, null, new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId()))
639677
.thenApply(
640678
response -> {
641679
try {
@@ -672,7 +710,7 @@ public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
672710
return executePostAsync(
673711
args,
674712
args.ssec() == null ? null : args.ssec().headers(),
675-
new Http.QueryParameters(UPLOAD_ID, args.uploadId()),
713+
new Http.QueryParameters(Http.UPLOAD_ID, args.uploadId()),
676714
body)
677715
.thenApply(
678716
response -> {
@@ -1191,7 +1229,7 @@ public CompletableFuture<ListObjectVersionsResponse> listObjectVersions(
11911229
public CompletableFuture<ListPartsResponse> listParts(ListPartsArgs args) {
11921230
Http.QueryParameters queryParams =
11931231
new Http.QueryParameters(
1194-
UPLOAD_ID,
1232+
Http.UPLOAD_ID,
11951233
args.uploadId(),
11961234
"max-parts",
11971235
(args.maxParts() != null) ? args.maxParts().toString() : "1000");

0 commit comments

Comments
 (0)