Skip to content

Commit 5a41af5

Browse files
authored
Add retry support for removeObjects()/completeMultipartUpload() APIs (#1704)
Signed-off-by: Bala.FA <bala@minio.io>
1 parent abbb6e9 commit 5a41af5

9 files changed

Lines changed: 301 additions & 79 deletions

File tree

adminapi/src/main/java/io/minio/admin/MinioAdminClient.java

Lines changed: 71 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,20 @@
4646
import java.util.ArrayList;
4747
import java.util.HashMap;
4848
import java.util.List;
49-
import java.util.Locale;
5049
import java.util.Map;
50+
import java.util.Set;
5151
import java.util.concurrent.TimeUnit;
5252
import java.util.regex.Pattern;
53+
import java.util.stream.IntStream;
5354
import javax.annotation.Nonnull;
5455
import javax.annotation.Nullable;
5556
import okhttp3.HttpUrl;
57+
import okhttp3.Interceptor;
5658
import okhttp3.MediaType;
5759
import okhttp3.OkHttpClient;
5860
import okhttp3.Request;
5961
import okhttp3.RequestBody;
6062
import okhttp3.Response;
61-
import okhttp3.ResponseBody;
6263

6364
/** Client to perform MinIO administration operations. */
6465
public class MinioAdminClient {
@@ -111,12 +112,12 @@ public String toString() {
111112
}
112113

113114
private String userAgent = Utils.getDefaultUserAgent();
114-
private PrintWriter traceStream;
115+
private volatile PrintWriter traceStream;
115116

116117
private HttpUrl baseUrl;
117118
private String region;
118119
private Provider provider;
119-
private OkHttpClient httpClient;
120+
private volatile OkHttpClient httpClient;
120121

121122
private MinioAdminClient(
122123
HttpUrl baseUrl, String region, Provider provider, OkHttpClient httpClient) {
@@ -132,6 +133,39 @@ private Credentials getCredentials() throws MinioException {
132133
return creds;
133134
}
134135

136+
private static int getStatusRetryInterceptorIndex(List<Interceptor> interceptors) {
137+
return IntStream.range(0, interceptors.size())
138+
.filter(i -> interceptors.get(i) instanceof Http.StatusRetryInterceptor)
139+
.findFirst()
140+
.orElse(-1);
141+
}
142+
143+
private OkHttpClient getHttpClient(PrintWriter traceStream) {
144+
if (traceStream == null) return this.httpClient;
145+
146+
OkHttpClient httpClient = this.httpClient;
147+
List<Interceptor> interceptors = httpClient.interceptors();
148+
int i = getStatusRetryInterceptorIndex(interceptors);
149+
Http.StatusRetryInterceptor interceptor =
150+
i < 0 ? null : (Http.StatusRetryInterceptor) interceptors.get(i);
151+
152+
OkHttpClient.Builder builder = httpClient.newBuilder();
153+
if (interceptor == null) {
154+
builder.addInterceptor(new Http.StatusRetryInterceptor(interceptor, traceStream, false));
155+
} else {
156+
builder.interceptors().clear();
157+
for (int j = 0; j < interceptors.size(); j++) {
158+
if (i == j) {
159+
builder.addInterceptor(new Http.StatusRetryInterceptor(interceptor, traceStream, false));
160+
} else {
161+
builder.addInterceptor(interceptors.get(j));
162+
}
163+
}
164+
}
165+
166+
return builder.build();
167+
}
168+
135169
private Response httpExecute(
136170
Http.Method method, Command command, Multimap<String, String> queryParamMap, byte[] body)
137171
throws IOException, MinioException {
@@ -179,40 +213,8 @@ private Response httpExecute(
179213
creds.secretKey(),
180214
request.header("x-amz-content-sha256"));
181215

182-
PrintWriter traceStream = this.traceStream;
183-
if (traceStream != null) {
184-
StringBuilder traceBuilder = new StringBuilder();
185-
traceBuilder.append("---------START-HTTP---------\n");
186-
String encodedPath = request.url().encodedPath();
187-
String encodedQuery = request.url().encodedQuery();
188-
if (encodedQuery != null) encodedPath += "?" + encodedQuery;
189-
traceBuilder.append(request.method()).append(" ").append(encodedPath).append(" HTTP/1.1\n");
190-
traceBuilder.append(
191-
request
192-
.headers()
193-
.toString()
194-
.replaceAll("Signature=([0-9a-f]+)", "Signature=*REDACTED*")
195-
.replaceAll("Credential=([^/]+)", "Credential=*REDACTED*"));
196-
if (body != null) traceBuilder.append("\n").append(new String(body, StandardCharsets.UTF_8));
197-
traceStream.println(traceBuilder.toString());
198-
}
199-
200-
OkHttpClient httpClient = this.httpClient;
216+
OkHttpClient httpClient = getHttpClient(this.traceStream);
201217
Response response = httpClient.newCall(request).execute();
202-
203-
if (traceStream != null) {
204-
String trace =
205-
response.protocol().toString().toUpperCase(Locale.US)
206-
+ " "
207-
+ response.code()
208-
+ "\n"
209-
+ response.headers();
210-
traceStream.println(trace);
211-
ResponseBody responseBody = response.peekBody(1024 * 1024);
212-
traceStream.println(responseBody.string());
213-
traceStream.println("----------END-HTTP----------");
214-
}
215-
216218
if (response.isSuccessful()) return response;
217219

218220
throw new MinioException("Request failed with response: " + response.body().string());
@@ -926,6 +928,38 @@ public void traceOff() {
926928
this.traceStream = null;
927929
}
928930

931+
/**
932+
* Sets request retry parameters. Any null/invalid values disable retry.
933+
*
934+
* <pre>Example:{@code
935+
* minioClient.setRetry(ImmutableSet.of(408, 504), 250, 3);
936+
* }</pre>
937+
*
938+
* @param retryStatusCodes HTTP status codes to be retried.
939+
* @param delayMs Delay between retries.
940+
* @param maxRetries Maximum number of retry attempts.
941+
*/
942+
public synchronized void setRetry(
943+
Set<Integer> retryStatusCodes, Long delayMs, Integer maxRetries) {
944+
Interceptor interceptor =
945+
new Http.StatusRetryInterceptor(
946+
retryStatusCodes, delayMs == null ? 0 : delayMs, maxRetries == null ? 0 : maxRetries);
947+
948+
List<Interceptor> interceptors = this.httpClient.interceptors();
949+
int i = getStatusRetryInterceptorIndex(interceptors);
950+
OkHttpClient.Builder builder = this.httpClient.newBuilder();
951+
if (i >= 0) {
952+
builder.interceptors().clear();
953+
for (int j = 0; j < interceptors.size(); j++) {
954+
builder.addInterceptor(i == j ? interceptor : interceptors.get(j));
955+
}
956+
} else {
957+
builder.addInterceptor(interceptor);
958+
}
959+
960+
this.httpClient = builder.build();
961+
}
962+
929963
public static Builder builder() {
930964
return new Builder();
931965
}

api/src/main/java/io/minio/BaseS3Client.java

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.fasterxml.jackson.databind.MapperFeature;
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.fasterxml.jackson.databind.json.JsonMapper;
24+
import com.google.common.collect.ImmutableSet;
2425
import io.minio.credentials.Credentials;
2526
import io.minio.credentials.Provider;
2627
import io.minio.errors.ErrorResponseException;
@@ -60,6 +61,7 @@
6061
import java.util.concurrent.CompletableFuture;
6162
import java.util.concurrent.CompletionException;
6263
import java.util.concurrent.ConcurrentHashMap;
64+
import java.util.concurrent.ThreadLocalRandom;
6365
import java.util.function.Supplier;
6466
import java.util.logging.Logger;
6567
import java.util.stream.IntStream;
@@ -84,6 +86,8 @@ public abstract class BaseS3Client implements AutoCloseable {
8486
}
8587
}
8688

89+
protected static final Set<String> RETRYABLE_ERRORS =
90+
ImmutableSet.of("InternalError", "RequestTimeout", "ServiceUnavailable", "SlowDown");
8791
protected static final String NO_SUCH_BUCKET_MESSAGE = "Bucket does not exist";
8892
protected static final String NO_SUCH_BUCKET = "NoSuchBucket";
8993
protected static final String NO_SUCH_BUCKET_POLICY = "NoSuchBucketPolicy";
@@ -675,24 +679,8 @@ public CompletableFuture<AbortMultipartUploadResponse> abortMultipartUpload(
675679
});
676680
}
677681

678-
/**
679-
* Do <a
680-
* href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html">CompleteMultipartUpload
681-
* S3 API</a> asynchronously.
682-
*
683-
* @param args {@link CompleteMultipartUploadArgs} object.
684-
* @return {@link CompletableFuture}&lt;{@link ObjectWriteResponse}&gt; object.
685-
*/
686-
public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
687-
CompleteMultipartUploadArgs args) {
688-
checkArgs(args);
689-
args.validateSsec(baseUrl.isHttps());
690-
Http.Body body = null;
691-
try {
692-
body = new Http.Body(new CompleteMultipartUpload(args.parts()), null, null, null);
693-
} catch (MinioException e) {
694-
return Utils.failedFuture(e);
695-
}
682+
private CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
683+
CompleteMultipartUploadArgs args, Http.Body body) {
696684
return executePostAsync(
697685
args,
698686
args.ssec() == null ? null : args.ssec().headers(),
@@ -750,6 +738,59 @@ public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
750738
});
751739
}
752740

741+
private CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
742+
CompleteMultipartUploadArgs args, Http.Body body, int attempt) {
743+
return completeMultipartUpload(args, body)
744+
.handle(
745+
(response, ex) -> {
746+
if (ex == null) return CompletableFuture.completedFuture(response);
747+
748+
ex = ex.getCause();
749+
if (attempt == Math.max(1, args.maxRetries()) - 1
750+
|| !(ex instanceof ErrorResponseException)
751+
|| !RETRYABLE_ERRORS.contains(
752+
((ErrorResponseException) ex).errorResponse().code())) {
753+
return Utils.<ObjectWriteResponse>failedFuture(ex);
754+
}
755+
756+
if (args.delayMs() > 0) {
757+
long maxBackoffLimit = args.delayMs() * (1L << (attempt + 1));
758+
long jitteredDelay = ThreadLocalRandom.current().nextLong(0, maxBackoffLimit);
759+
try {
760+
Thread.sleep(jitteredDelay);
761+
} catch (InterruptedException e) {
762+
Thread.currentThread().interrupt();
763+
return Utils.<ObjectWriteResponse>failedFuture(
764+
new IllegalStateException("Retry timeout interrupted", e));
765+
}
766+
}
767+
768+
return completeMultipartUpload(args, body, attempt + 1);
769+
})
770+
.thenCompose(future -> future);
771+
}
772+
773+
/**
774+
* Do <a
775+
* href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html">CompleteMultipartUpload
776+
* S3 API</a> asynchronously.
777+
*
778+
* @param args {@link CompleteMultipartUploadArgs} object.
779+
* @return {@link CompletableFuture}&lt;{@link ObjectWriteResponse}&gt; object.
780+
*/
781+
public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
782+
CompleteMultipartUploadArgs args) {
783+
checkArgs(args);
784+
args.validateSsec(baseUrl.isHttps());
785+
Http.Body body = null;
786+
try {
787+
body = new Http.Body(new CompleteMultipartUpload(args.parts()), null, null, null);
788+
} catch (MinioException e) {
789+
return Utils.failedFuture(e);
790+
}
791+
return completeMultipartUpload(args, body, 0);
792+
}
793+
753794
/**
754795
* Do <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html">CopyObject S3
755796
* API</a> asynchronously.

api/src/main/java/io/minio/CompleteMultipartUploadArgs.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,17 @@ public class CompleteMultipartUploadArgs extends ObjectArgs {
2525
private String uploadId;
2626
private Part[] parts;
2727
private ServerSideEncryption.CustomerKey ssec;
28+
private long delayMs = 100L;
29+
private int maxRetries = 5;
2830

2931
protected CompleteMultipartUploadArgs() {}
3032

3133
public CompleteMultipartUploadArgs(ComposeObjectArgs args, String uploadId, Part[] parts) {
3234
super(args);
3335
this.uploadId = uploadId;
3436
this.parts = parts;
37+
this.delayMs = args.delayMs();
38+
this.maxRetries = args.maxRetries();
3539
}
3640

3741
public CompleteMultipartUploadArgs(PutObjectBaseArgs args, String uploadId, Part[] parts) {
@@ -41,6 +45,8 @@ public CompleteMultipartUploadArgs(PutObjectBaseArgs args, String uploadId, Part
4145
if (args.sse() != null && args.sse() instanceof ServerSideEncryption.CustomerKey) {
4246
this.ssec = (ServerSideEncryption.CustomerKey) args.sse();
4347
}
48+
this.delayMs = args.delayMs();
49+
this.maxRetries = args.maxRetries();
4450
}
4551

4652
public String uploadId() {
@@ -55,6 +61,14 @@ public ServerSideEncryption.CustomerKey ssec() {
5561
return ssec;
5662
}
5763

64+
public long delayMs() {
65+
return delayMs;
66+
}
67+
68+
public int maxRetries() {
69+
return maxRetries;
70+
}
71+
5872
public void validateSsec(boolean isHttps) {
5973
checkSse(ssec, isHttps);
6074
}
@@ -89,6 +103,18 @@ public Builder ssec(ServerSideEncryption.CustomerKey ssec) {
89103
operations.add(args -> args.ssec = ssec);
90104
return this;
91105
}
106+
107+
/** Set delay between retries. Value &lt;= 0 makes no delay (default 100ms). */
108+
public Builder delayMs(long delayMs) {
109+
operations.add(args -> args.delayMs = delayMs);
110+
return this;
111+
}
112+
113+
/** Set maximum retry between failure. Value &lt;= 0 disables retry (default 5). */
114+
public Builder maxRetries(int maxRetries) {
115+
operations.add(args -> args.maxRetries = maxRetries);
116+
return this;
117+
}
92118
}
93119

94120
@Override
@@ -99,11 +125,13 @@ public boolean equals(Object o) {
99125
CompleteMultipartUploadArgs that = (CompleteMultipartUploadArgs) o;
100126
return Objects.equals(uploadId, that.uploadId)
101127
&& Arrays.equals(parts, that.parts)
102-
&& Objects.equals(ssec, that.ssec);
128+
&& Objects.equals(ssec, that.ssec)
129+
&& delayMs == that.delayMs
130+
&& maxRetries == that.maxRetries;
103131
}
104132

105133
@Override
106134
public int hashCode() {
107-
return Objects.hash(super.hashCode(), uploadId, parts, ssec);
135+
return Objects.hash(super.hashCode(), uploadId, parts, ssec, delayMs, maxRetries);
108136
}
109137
}

0 commit comments

Comments
 (0)