2121import com .fasterxml .jackson .databind .MapperFeature ;
2222import com .fasterxml .jackson .databind .ObjectMapper ;
2323import com .fasterxml .jackson .databind .json .JsonMapper ;
24- import com .google .common .collect .ImmutableSet ;
2524import io .minio .credentials .Credentials ;
2625import io .minio .credentials .Provider ;
2726import io .minio .errors .ErrorResponseException ;
5352import java .nio .charset .StandardCharsets ;
5453import java .security .SecureRandom ;
5554import java .util .Arrays ;
56- import java .util .Collections ;
5755import java .util .HashMap ;
58- import java .util .Locale ;
56+ import java .util .List ;
5957import java .util .Map ;
6058import java .util .Random ;
6159import java .util .Set ;
6462import java .util .concurrent .ConcurrentHashMap ;
6563import java .util .function .Supplier ;
6664import java .util .logging .Logger ;
65+ import java .util .stream .IntStream ;
6766import javax .annotation .Nonnull ;
6867import okhttp3 .Call ;
6968import okhttp3 .Callback ;
69+ import okhttp3 .Interceptor ;
7070import okhttp3 .MediaType ;
7171import okhttp3 .OkHttpClient ;
7272import okhttp3 .RequestBody ;
@@ -100,17 +100,13 @@ public abstract class BaseS3Client implements AutoCloseable {
100100 .build ();
101101
102102 private static final String RETRY_HEAD = "RetryHead" ;
103- private static final String END_HTTP = "----------END-HTTP----------" ;
104- private static final String UPLOAD_ID = "uploadId" ;
105- private static final Set <String > TRACE_QUERY_PARAMS =
106- ImmutableSet .of ("retention" , "legal-hold" , "tagging" , UPLOAD_ID , "acl" , "attributes" );
107- private PrintWriter traceStream ;
103+ private volatile PrintWriter traceStream ;
108104 protected final Map <String , String > regionCache = new ConcurrentHashMap <>();
109105 protected String userAgent = Utils .getDefaultUserAgent ();
110106
111107 protected Http .BaseUrl baseUrl ;
112108 protected Provider provider ;
113- protected OkHttpClient httpClient ;
109+ protected volatile OkHttpClient httpClient ;
114110 protected boolean closeHttpClient ;
115111
116112 protected BaseS3Client (
@@ -137,6 +133,45 @@ public void close() {
137133 }
138134 }
139135
136+ private static int getStatusRetryInterceptorIndex (List <Interceptor > interceptors ) {
137+ return IntStream .range (0 , interceptors .size ())
138+ .filter (i -> interceptors .get (i ) instanceof Http .StatusRetryInterceptor )
139+ .findFirst ()
140+ .orElse (-1 );
141+ }
142+
143+ /**
144+ * Sets request retry parameters. Any null/invalid values disable retry.
145+ *
146+ * <pre>Example:{@code
147+ * minioClient.setRetry(ImmutableSet.of(408, 504), 250, 3);
148+ * }</pre>
149+ *
150+ * @param retryStatusCodes HTTP status codes to be retried.
151+ * @param delayMs Delay between retries.
152+ * @param maxRetries Maximum number of retry attempts.
153+ */
154+ public synchronized void setRetry (
155+ Set <Integer > retryStatusCodes , Long delayMs , Integer maxRetries ) {
156+ Interceptor interceptor =
157+ new Http .StatusRetryInterceptor (
158+ retryStatusCodes , delayMs == null ? 0 : delayMs , maxRetries == null ? 0 : maxRetries );
159+
160+ List <Interceptor > interceptors = this .httpClient .interceptors ();
161+ int i = getStatusRetryInterceptorIndex (interceptors );
162+ OkHttpClient .Builder builder = this .httpClient .newBuilder ();
163+ if (i >= 0 ) {
164+ builder .interceptors ().clear ();
165+ for (int j = 0 ; j < interceptors .size (); j ++) {
166+ builder .addInterceptor (i == j ? interceptor : interceptors .get (j ));
167+ }
168+ } else {
169+ builder .addInterceptor (interceptor );
170+ }
171+
172+ this .httpClient = builder .build ();
173+ }
174+
140175 /**
141176 * Sets HTTP connect, write and read timeouts. A value of 0 means no timeout, otherwise values
142177 * must be between 1 and Integer.MAX_VALUE when converted to milliseconds.
@@ -268,25 +303,48 @@ private String[] handleRedirectResponse(
268303 return new String [] {code , message };
269304 }
270305
306+ private OkHttpClient getHttpClient (PrintWriter traceStream , Http .S3Request s3request ) {
307+ if (traceStream == null ) return this .httpClient ;
308+
309+ OkHttpClient httpClient = this .httpClient ;
310+ List <Interceptor > interceptors = httpClient .interceptors ();
311+ int i = getStatusRetryInterceptorIndex (interceptors );
312+ Http .StatusRetryInterceptor interceptor =
313+ i < 0 ? null : (Http .StatusRetryInterceptor ) interceptors .get (i );
314+
315+ OkHttpClient .Builder builder = httpClient .newBuilder ();
316+ if (interceptor == null ) {
317+ builder .addInterceptor (
318+ new Http .StatusRetryInterceptor (interceptor , traceStream , s3request .object () == null ));
319+ } else {
320+ builder .interceptors ().clear ();
321+ for (int j = 0 ; j < interceptors .size (); j ++) {
322+ if (i == j ) {
323+ builder .addInterceptor (
324+ new Http .StatusRetryInterceptor (
325+ interceptor , traceStream , s3request .object () == null ));
326+ } else {
327+ builder .addInterceptor (interceptors .get (j ));
328+ }
329+ }
330+ }
331+
332+ return builder .build ();
333+ }
334+
271335 /** Execute HTTP request asynchronously for given parameters. */
272336 protected CompletableFuture <Response > executeAsync (Http .S3Request s3request , String region ) {
273337 Credentials credentials = (provider == null ) ? null : provider .fetch ();
274338 Http .Request request = null ;
339+ PrintWriter traceStream = this .traceStream ;
275340 try {
276341 request = s3request .toRequest (baseUrl , region , credentials );
277342 } catch (MinioException e ) {
278343 return Utils .failedFuture (e );
279344 }
280345
346+ OkHttpClient httpClient = getHttpClient (traceStream , s3request );
281347 StringBuilder traceBuilder = new StringBuilder (request .httpTraces ());
282- PrintWriter traceStream = this .traceStream ;
283- if (traceStream != null ) traceStream .print (request .httpTraces ());
284-
285- OkHttpClient httpClient = this .httpClient ;
286- // FIXME: enable retry for all request.
287- // if (!s3request.retryFailure()) {
288- // httpClient = httpClient.newBuilder().retryOnConnectionFailure(false).build();
289- // }
290348
291349 okhttp3 .Request httpRequest = request .httpRequest ();
292350 CompletableFuture <Response > completableFuture = newCompleteableFuture ();
@@ -309,57 +367,23 @@ public void onResponse(Call call, final Response response) throws IOException {
309367 }
310368
311369 private void onResponse (final Response response ) throws IOException {
312- String trace =
313- String .format (
314- "%s %d %s%n%s" ,
315- response .protocol ().toString ().toUpperCase (Locale .US ),
316- response .code (),
317- response .message (),
318- response .headers ().toString ());
319- if (!trace .endsWith ("\n \n " )) {
320- trace += trace .endsWith ("\n " ) ? "\n " : "\n \n " ;
321- }
322- traceBuilder .append (trace );
323- if (traceStream != null ) traceStream .print (trace );
324-
370+ String traces =
371+ Http .getResponseTraces (
372+ response ,
373+ s3request .method (),
374+ s3request .queryParams (),
375+ s3request .object () == null );
325376 if (response .isSuccessful ()) {
326- if (traceStream != null ) {
327- // Trace response body only if the request is not
328- // GetObject/ListenBucketNotification
329- // S3 API.
330- Set <String > keys = s3request .queryParams ().keySet ();
331- if ((s3request .method () != Http .Method .GET
332- || s3request .object () == null
333- || !Collections .disjoint (keys , TRACE_QUERY_PARAMS ))
334- && !(keys .contains ("events" )
335- && (keys .contains ("prefix" ) || keys .contains ("suffix" )))) {
336- String responseBody = response .peekBody (1024 * 1024 ).string ();
337- traceStream .print (responseBody );
338- if (!responseBody .endsWith ("\n " )) traceStream .println ();
339- }
340- traceStream .println (END_HTTP );
341- }
342-
343377 completableFuture .complete (response );
344378 return ;
345379 }
346380
381+ traceBuilder .append (traces );
347382 String errorXml = null ;
348383 try (ResponseBody responseBody = response .body ()) {
349384 errorXml = responseBody .string ();
350385 }
351386
352- if (!("" .equals (errorXml ) && s3request .method ().equals (Http .Method .HEAD ))) {
353- traceBuilder .append (errorXml );
354- if (traceStream != null ) traceStream .print (errorXml );
355- if (!errorXml .endsWith ("\n " )) {
356- traceBuilder .append ("\n " );
357- if (traceStream != null ) traceStream .println ();
358- }
359- }
360- traceBuilder .append (END_HTTP ).append ("\n " );
361- if (traceStream != null ) traceStream .println (END_HTTP );
362-
363387 // Error out for Non-XML response from server for non-HEAD requests.
364388 String contentType = response .headers ().get (Http .Headers .CONTENT_TYPE );
365389 if (!s3request .method ().equals (Http .Method .HEAD )
@@ -635,7 +659,7 @@ protected void checkArgs(BaseArgs args) {
635659 public CompletableFuture <AbortMultipartUploadResponse > abortMultipartUpload (
636660 AbortMultipartUploadArgs args ) {
637661 checkArgs (args );
638- return executeDeleteAsync (args , null , new Http .QueryParameters (UPLOAD_ID , args .uploadId ()))
662+ return executeDeleteAsync (args , null , new Http .QueryParameters (Http . UPLOAD_ID , args .uploadId ()))
639663 .thenApply (
640664 response -> {
641665 try {
@@ -672,7 +696,7 @@ public CompletableFuture<ObjectWriteResponse> completeMultipartUpload(
672696 return executePostAsync (
673697 args ,
674698 args .ssec () == null ? null : args .ssec ().headers (),
675- new Http .QueryParameters (UPLOAD_ID , args .uploadId ()),
699+ new Http .QueryParameters (Http . UPLOAD_ID , args .uploadId ()),
676700 body )
677701 .thenApply (
678702 response -> {
@@ -1191,7 +1215,7 @@ public CompletableFuture<ListObjectVersionsResponse> listObjectVersions(
11911215 public CompletableFuture <ListPartsResponse > listParts (ListPartsArgs args ) {
11921216 Http .QueryParameters queryParams =
11931217 new Http .QueryParameters (
1194- UPLOAD_ID ,
1218+ Http . UPLOAD_ID ,
11951219 args .uploadId (),
11961220 "max-parts" ,
11971221 (args .maxParts () != null ) ? args .maxParts ().toString () : "1000" );
0 commit comments