Skip to content

Commit c907649

Browse files
simplifying retry mechanism
1 parent 11a2da2 commit c907649

11 files changed

Lines changed: 54 additions & 761 deletions

File tree

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/accesshelpers/BlobDownloadAsyncResponseConstructorProxy.java

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@
99
import com.azure.core.http.rest.StreamResponse;
1010
import com.azure.storage.blob.models.BlobDownloadAsyncResponse;
1111
import com.azure.storage.blob.models.DownloadRetryOptions;
12-
import com.azure.storage.common.policy.DecoderState;
1312
import reactor.core.publisher.Mono;
1413

15-
import java.util.concurrent.atomic.AtomicReference;
1614
import java.util.function.BiFunction;
1715

1816
/**
@@ -37,16 +35,7 @@ public interface BlobDownloadAsyncResponseConstructorAccessor {
3735
* @param retryOptions Retry options.
3836
*/
3937
BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
40-
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
41-
AtomicReference<DecoderState> decoderStateRef);
42-
43-
/**
44-
* Gets the current decoder state from a {@link BlobDownloadAsyncResponse}.
45-
*
46-
* @param response The {@link BlobDownloadAsyncResponse}.
47-
* @return The current decoder state, or null if not available.
48-
*/
49-
DecoderState getDecoderState(BlobDownloadAsyncResponse response);
38+
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions);
5039
}
5140

5241
/**
@@ -67,8 +56,7 @@ public static void setAccessor(
6756
* @param retryOptions Retry options.
6857
*/
6958
public static BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
70-
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
71-
AtomicReference<DecoderState> decoderStateRef) {
59+
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions) {
7260
// This looks odd but is necessary, it is possible to engage the access helper before anywhere else in the
7361
// application accesses BlobDownloadAsyncResponse which triggers the accessor to be configured. So, if the accessor
7462
// is null this effectively pokes the class to set up the accessor.
@@ -78,16 +66,6 @@ public static BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
7866
}
7967

8068
assert accessor != null;
81-
return accessor.create(sourceResponse, onErrorResume, retryOptions, decoderStateRef);
82-
}
83-
84-
/**
85-
* Gets the current decoder state from a {@link BlobDownloadAsyncResponse}.
86-
*
87-
* @param response The {@link BlobDownloadAsyncResponse}.
88-
* @return The decoder state, or null if not available.
89-
*/
90-
public static DecoderState getDecoderState(BlobDownloadAsyncResponse response) {
91-
return accessor.getDecoderState(response);
69+
return accessor.create(sourceResponse, onErrorResume, retryOptions);
9270
}
9371
}

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/DownloadValidationUtils.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,6 @@
66
import com.azure.core.util.Context;
77
import com.azure.storage.common.StorageChecksumAlgorithm;
88
import com.azure.storage.common.implementation.Constants;
9-
import com.azure.storage.common.policy.AggregateCrcState;
10-
11-
import java.util.concurrent.atomic.AtomicReference;
12-
13-
import com.azure.storage.common.policy.DecoderState;
149

1510
/**
1611
* Centralizes download content validation decisions based on {@link StorageChecksumAlgorithm}.
@@ -39,23 +34,17 @@ public static StorageChecksumAlgorithm resolveAlgorithm(StorageChecksumAlgorithm
3934
}
4035

4136
/**
42-
* Adds structured message decoding context keys when CRC64/AUTO validation is active.
37+
* Adds structured message decoding context key when CRC64/AUTO validation is active.
4338
*
4439
* @param context The base context to augment. Null is treated as {@link Context#NONE}.
4540
* @param algorithm The resolved checksum algorithm.
46-
* @param decoderStateRef Holder for decoder state, populated by the policy.
47-
* @param aggregateCrcState CRC aggregation state across retries.
4841
* @return The augmented context.
4942
*/
50-
public static Context applyStructuredMessageContext(Context context, StorageChecksumAlgorithm algorithm,
51-
AtomicReference<DecoderState> decoderStateRef, AggregateCrcState aggregateCrcState) {
43+
public static Context applyStructuredMessageContext(Context context, StorageChecksumAlgorithm algorithm) {
5244
Context base = context == null ? Context.NONE : context;
5345
if (!isStructuredMessageAlgorithm(algorithm)) {
5446
return base;
5547
}
56-
return base.addData(Constants.STRUCTURED_MESSAGE_RESPONSE_SCOPED_CONTEXT_KEY, true)
57-
.addData(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true)
58-
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_REF_CONTEXT_KEY, decoderStateRef)
59-
.addData(Constants.STRUCTURED_MESSAGE_AGGREGATE_CRC_CONTEXT_KEY, aggregateCrcState);
48+
return base.addData(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
6049
}
6150
}

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/BlobDownloadAsyncResponse.java

Lines changed: 4 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,14 @@
1414
import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders;
1515
import com.azure.storage.blob.implementation.util.ModelHelper;
1616
import com.azure.core.util.logging.ClientLogger;
17-
import com.azure.storage.common.policy.DecoderState;
1817
import reactor.core.publisher.Flux;
1918
import reactor.core.publisher.Mono;
2019

2120
import java.io.Closeable;
2221
import java.io.IOException;
2322
import java.nio.ByteBuffer;
24-
import java.nio.ByteOrder;
2523
import java.nio.channels.AsynchronousByteChannel;
2624
import java.util.Objects;
27-
import java.util.concurrent.atomic.AtomicReference;
2825
import java.util.function.BiFunction;
2926

3027
/**
@@ -40,15 +37,9 @@ public final class BlobDownloadAsyncResponse extends ResponseBase<BlobDownloadHe
4037
.setAccessor(new BlobDownloadAsyncResponseConstructorProxy.BlobDownloadAsyncResponseConstructorAccessor() {
4138
@Override
4239
public BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
43-
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
44-
AtomicReference<DecoderState> decoderStateRef) {
45-
return new BlobDownloadAsyncResponse(sourceResponse, onErrorResume, retryOptions, decoderStateRef);
46-
}
47-
48-
@Override
49-
public DecoderState getDecoderState(BlobDownloadAsyncResponse response) {
50-
AtomicReference<DecoderState> ref = response.decoderStateRef;
51-
return ref == null ? null : ref.get();
40+
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume,
41+
DownloadRetryOptions retryOptions) {
42+
return new BlobDownloadAsyncResponse(sourceResponse, onErrorResume, retryOptions);
5243
}
5344
});
5445
}
@@ -58,7 +49,6 @@ public DecoderState getDecoderState(BlobDownloadAsyncResponse response) {
5849
private final StreamResponse sourceResponse;
5950
private final BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume;
6051
private final DownloadRetryOptions retryOptions;
61-
private final AtomicReference<DecoderState> decoderStateRef;
6252

6353
/**
6454
* Constructs a {@link BlobDownloadAsyncResponse}.
@@ -75,38 +65,15 @@ public BlobDownloadAsyncResponse(HttpRequest request, int statusCode, HttpHeader
7565
this.sourceResponse = null;
7666
this.onErrorResume = null;
7767
this.retryOptions = null;
78-
this.decoderStateRef = null;
7968
}
8069

81-
/**
82-
* Constructs a {@link BlobDownloadAsyncResponse}.
83-
*
84-
* @param sourceResponse The initial Stream Response
85-
* @param onErrorResume Function used to resume.
86-
* @param retryOptions Retry options.
87-
*/
8870
BlobDownloadAsyncResponse(StreamResponse sourceResponse,
8971
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions) {
90-
this(sourceResponse, onErrorResume, retryOptions, null);
91-
}
92-
93-
BlobDownloadAsyncResponse(StreamResponse sourceResponse,
94-
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
95-
AtomicReference<DecoderState> decoderStateRef) {
96-
this(sourceResponse, onErrorResume, retryOptions, decoderStateRef, extractHeaders(sourceResponse));
97-
}
98-
99-
private BlobDownloadAsyncResponse(StreamResponse sourceResponse,
100-
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
101-
AtomicReference<DecoderState> decoderStateRef, BlobDownloadHeaders deserializedHeaders) {
10272
super(sourceResponse.getRequest(), sourceResponse.getStatusCode(), sourceResponse.getHeaders(),
103-
createResponseFluxWithContentCrc(sourceResponse, onErrorResume, retryOptions, decoderStateRef,
104-
deserializedHeaders),
105-
deserializedHeaders);
73+
createResponseFlux(sourceResponse, onErrorResume, retryOptions), extractHeaders(sourceResponse));
10674
this.sourceResponse = Objects.requireNonNull(sourceResponse, "'sourceResponse' must not be null");
10775
this.onErrorResume = Objects.requireNonNull(onErrorResume, "'onErrorResume' must not be null");
10876
this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' must not be null");
109-
this.decoderStateRef = decoderStateRef;
11077
}
11178

11279
private static BlobDownloadHeaders extractHeaders(StreamResponse response) {
@@ -124,28 +91,6 @@ private static Flux<ByteBuffer> createResponseFlux(StreamResponse sourceResponse
12491
.defaultIfEmpty(EMPTY_BUFFER);
12592
}
12693

127-
/**
128-
* Builds the response flux and populates ContentCrc64 on the deserialized headers when structured message
129-
* decoding completes successfully.
130-
*/
131-
private static Flux<ByteBuffer> createResponseFluxWithContentCrc(StreamResponse sourceResponse,
132-
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
133-
AtomicReference<DecoderState> decoderStateRef, BlobDownloadHeaders deserializedHeaders) {
134-
Flux<ByteBuffer> flux = createResponseFlux(sourceResponse, onErrorResume, retryOptions);
135-
if (decoderStateRef != null && deserializedHeaders != null) {
136-
flux = flux.doOnComplete(() -> {
137-
DecoderState state = decoderStateRef.get();
138-
if (state != null && state.isFinalized()) {
139-
long crc = state.getComposedCrc64();
140-
byte[] crcBytes = new byte[8];
141-
ByteBuffer.wrap(crcBytes).order(ByteOrder.LITTLE_ENDIAN).putLong(crc);
142-
deserializedHeaders.setContentCrc64(crcBytes);
143-
}
144-
});
145-
}
146-
return flux;
147-
}
148-
14994
/**
15095
* Transfers content bytes to the {@link AsynchronousByteChannel}.
15196
* @param channel The destination {@link AsynchronousByteChannel}.

0 commit comments

Comments
 (0)