Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f21b0ba
adding the StructuredMessageDecoder
gunjansingh-msft Oct 8, 2025
e1c23c5
adding the pipeline policy changes
gunjansingh-msft Oct 15, 2025
1d0cc92
smart retry changes
gunjansingh-msft Oct 29, 2025
b7ba234
fixing smart retry impl
gunjansingh-msft Dec 3, 2025
657f987
smart retry changes
gunjansingh-msft Dec 15, 2025
f7939e7
smart retry changes
gunjansingh-msft Dec 17, 2025
89f9292
smart retry changes
gunjansingh-msft Dec 17, 2025
62f72cc
smart retry changes
gunjansingh-msft Mar 16, 2026
09f49f9
adding content validation tests
gunjansingh-msft Mar 27, 2026
973cdf6
code refactoring
gunjansingh-msft Mar 28, 2026
a58dc63
fixing errors i introduced :(
ibrandes Mar 30, 2026
e12aa82
addressing review comments
gunjansingh-msft Apr 2, 2026
2b2e86d
code refactoring based on latest review comments
gunjansingh-msft Apr 3, 2026
11a2da2
code refactoring based on latest review comments
gunjansingh-msft Apr 8, 2026
c907649
simplifying retry mechanism
gunjansingh-msft Apr 8, 2026
5b153a4
removing dead code
gunjansingh-msft Apr 8, 2026
f72cb8b
addressing Kyle's review comments
gunjansingh-msft Apr 17, 2026
20e3303
addressing latest review comments
gunjansingh-msft Apr 17, 2026
b33a785
Merge origin/feature/storage/content-validation into feature/storage/…
gunjansingh-msft Apr 17, 2026
f9befff
refactoring based on latest review comments
gunjansingh-msft Apr 22, 2026
b3298c2
refactoring based on latest review comments
gunjansingh-msft Apr 22, 2026
ed8c3eb
refactoring based on latest review comments
gunjansingh-msft Apr 23, 2026
d7b0c51
refactoring based on latest review comments from kyle
gunjansingh-msft Apr 28, 2026
73f22e9
expanding test coverage
ibrandes Apr 28, 2026
c5f3a8c
removing unused imports
ibrandes Apr 28, 2026
2d5c9b8
recordings
ibrandes Apr 28, 2026
c1a3f1f
adding documentation to decoder classes
ibrandes Apr 29, 2026
3c77a76
small fixes and failure path tests
ibrandes Apr 29, 2026
aaa6414
addressing context comment
ibrandes Apr 29, 2026
e08eaba
analyze error
ibrandes Apr 29, 2026
a4393f5
removing close override from decodedresponse
ibrandes Apr 30, 2026
563c7dc
Merge branch 'feature/storage/content-validation' of https://github.c…
ibrandes Apr 30, 2026
73c0d25
line removal
ibrandes Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.azure.core.http.rest.StreamResponse;
import com.azure.storage.blob.models.BlobDownloadAsyncResponse;
import com.azure.storage.blob.models.DownloadRetryOptions;
import com.azure.storage.common.policy.DecoderState;
import reactor.core.publisher.Mono;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

/**
Expand All @@ -35,7 +37,24 @@ public interface BlobDownloadAsyncResponseConstructorAccessor {
* @param retryOptions Retry options.
*/
BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions);
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
AtomicReference<DecoderState> decoderStateRef);

/**
* Gets the source {@link StreamResponse} from a {@link BlobDownloadAsyncResponse}.
*
* @param response The {@link BlobDownloadAsyncResponse}.
* @return The source {@link StreamResponse}, or null if not available.
*/
StreamResponse getSourceResponse(BlobDownloadAsyncResponse response);
Comment thread
gunjansingh-msft marked this conversation as resolved.
Outdated

/**
* Gets the current decoder state from a {@link BlobDownloadAsyncResponse}.
*
* @param response The {@link BlobDownloadAsyncResponse}.
* @return The current decoder state, or null if not available.
*/
DecoderState getDecoderState(BlobDownloadAsyncResponse response);
}

/**
Expand All @@ -56,7 +75,8 @@ public static void setAccessor(
* @param retryOptions Retry options.
*/
public static BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions) {
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
AtomicReference<DecoderState> decoderStateRef) {
// This looks odd but is necessary, it is possible to engage the access helper before anywhere else in the
// application accesses BlobDownloadAsyncResponse which triggers the accessor to be configured. So, if the accessor
// is null this effectively pokes the class to set up the accessor.
Expand All @@ -66,6 +86,38 @@ public static BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
}

assert accessor != null;
return accessor.create(sourceResponse, onErrorResume, retryOptions);
return accessor.create(sourceResponse, onErrorResume, retryOptions, decoderStateRef);
}

/**
* Gets the source {@link StreamResponse} from a {@link BlobDownloadAsyncResponse}.
*
* @param response The {@link BlobDownloadAsyncResponse}.
* @return The source {@link StreamResponse}, or null if not available.
*/
public static StreamResponse getSourceResponse(BlobDownloadAsyncResponse response) {
if (accessor == null) {
new BlobDownloadAsyncResponse(new HttpRequest(HttpMethod.GET, "http://microsoft.com"), 200,
new HttpHeaders(), null, null);
}

assert accessor != null;
return accessor.getSourceResponse(response);
}

/**
* Gets the current decoder state from a {@link BlobDownloadAsyncResponse}.
*
* @param response The {@link BlobDownloadAsyncResponse}.
* @return The decoder state, or null if not available.
*/
public static DecoderState getDecoderState(BlobDownloadAsyncResponse response) {
if (accessor == null) {
new BlobDownloadAsyncResponse(new HttpRequest(HttpMethod.GET, "http://microsoft.com"), 200,
new HttpHeaders(), null, null);
}

assert accessor != null;
return accessor.getDecoderState(response);
Comment thread
gunjansingh-msft marked this conversation as resolved.
Outdated
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ public final class BlobConstants {
* The number of buffers to use if none is specified on the buffered upload method.
*/
public static final int BLOB_DEFAULT_NUMBER_OF_BUFFERS = 8;
/**
* The legacy default number of concurrent transfers for download operations.
*/
public static final int BLOB_LEGACY_DEFAULT_CONCURRENT_TRANSFERS_COUNT = 5;
/**
* The default range size used for download operations when no checksum validation is requested.
*/
public static final long BLOB_DEFAULT_DOWNLOAD_RANGE_SIZE = 4L * Constants.MB;
/**
* The default initial range size used for download operations when no checksum validation is requested.
*/
public static final long BLOB_DEFAULT_INITIAL_DOWNLOAD_RANGE_SIZE = 256L * Constants.MB;
/**
* The maximum range size used for download operations.
*/
public static final long BLOB_MAX_DOWNLOAD_BYTES = 256L * Constants.MB;
/**
* The maximum range size used when requesting a transactional checksum during download.
*/
public static final long BLOB_MAX_HASH_REQUEST_DOWNLOAD_RANGE = 4L * Constants.MB;
Comment thread
ibrandes marked this conversation as resolved.
Outdated
/**
* If a blob is known to be greater than 100MB, using a larger block size will trigger some server-side
* optimizations. If the block size is not set and the size of the blob is known to be greater than 100MB, this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.azure.storage.common.policy.ResponseValidationPolicyBuilder;
import com.azure.storage.common.policy.ScrubEtagPolicy;
import com.azure.storage.common.policy.StorageBearerTokenChallengeAuthorizationPolicy;
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy;
import com.azure.storage.common.policy.StorageSharedKeyCredentialPolicy;

import java.net.MalformedURLException;
Expand Down Expand Up @@ -133,6 +134,8 @@ public static HttpPipeline buildPipeline(StorageSharedKeyCredential storageShare
policies.add(new AzureSasCredentialPolicy(new AzureSasCredential(sasToken), false));
}

policies.add(new StorageContentValidationDecoderPolicy());

policies.addAll(perRetryPolicies);

HttpPolicyProviders.addAfterRetryPolicies(policies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ChunkedDownloadUtils {
public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse>> downloadFirstChunk(
BlobRange range, ParallelTransferOptions parallelTransferOptions, BlobRequestConditions requestConditions,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloader, boolean eTagLock) {
return downloadFirstChunk(range, parallelTransferOptions, requestConditions, downloader, eTagLock, null);
return downloadFirstChunk(range, parallelTransferOptions, requestConditions, downloader, null, eTagLock, null);
}

/*
Expand All @@ -48,6 +48,22 @@ public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse
BlobRange range, ParallelTransferOptions parallelTransferOptions, BlobRequestConditions requestConditions,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloader, boolean eTagLock,
Context context) {
return downloadFirstChunk(range, parallelTransferOptions, requestConditions, downloader, null, eTagLock,
context);
}

/*
Has a context value for additional download adjustments and an optional fallback downloader for empty blobs.

Download the first chunk. Construct a Mono which will emit the total count for calculating the number of chunks,
access conditions containing the etag to lock on, and the response from downloading the first chunk.
*/
@SuppressWarnings("unchecked")
public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse>> downloadFirstChunk(
Comment thread
ibrandes marked this conversation as resolved.
Outdated
BlobRange range, ParallelTransferOptions parallelTransferOptions, BlobRequestConditions requestConditions,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloader,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> emptyBlobDownloader,
boolean eTagLock, Context context) {
// We will scope our initial download to either be one chunk or the total size.
long initialChunkSize
= range.getCount() != null && range.getCount() < parallelTransferOptions.getBlockSizeLong()
Expand All @@ -69,7 +85,12 @@ public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse
: requestConditions;

// Extract the total length of the blob from the contentRange header. e.g. "bytes 1-6/7"
long totalLength = extractTotalBlobLength(response.getDeserializedHeaders().getContentRange());
// Fall back to contentLength when contentRange isn't present (e.g., 304 responses).
String contentRange = response.getDeserializedHeaders().getContentRange();
Long contentLength = response.getDeserializedHeaders().getContentLength();
long totalLength = contentRange != null
? extractTotalBlobLength(contentRange)
: (contentLength == null ? 0L : contentLength);

if (context != null) {
Optional<Object> contextAdjustment = context.getData(Constants.ADJUSTED_BLOB_LENGTH_KEY);
Expand Down Expand Up @@ -99,7 +120,9 @@ public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse
&& extractTotalBlobLength(
blobStorageException.getResponse().getHeaders().getValue(HttpHeaderName.CONTENT_RANGE)) == 0) {

return downloader.apply(new BlobRange(0, 0L), requestConditions)
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> fallbackDownloader
= emptyBlobDownloader != null ? emptyBlobDownloader : downloader;
return fallbackDownloader.apply(new BlobRange(0), requestConditions)
Comment thread
ibrandes marked this conversation as resolved.
Outdated
// Subscribe on boundElastic instead of elastic as elastic is deprecated and boundElastic
// provided the same functionality with the added benefit that it won't infinitely create
// threads if needed and will instead queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
blockSize = (long) BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
}

Long initialTransferSize = other.getInitialTransferSizeLong();

Integer maxConcurrency = other.getMaxConcurrency();
if (maxConcurrency == null) {
maxConcurrency = BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS;
Expand All @@ -124,7 +126,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
return new ParallelTransferOptions().setBlockSizeLong(blockSize)
.setMaxConcurrency(maxConcurrency)
.setProgressListener(other.getProgressListener())
.setMaxSingleUploadSizeLong(maxSingleUploadSize);
.setMaxSingleUploadSizeLong(maxSingleUploadSize)
.setInitialTransferSizeLong(initialTransferSize);
}

/**
Expand All @@ -143,6 +146,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
blockSize = (long) BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
}

Long initialTransferSize = other.getInitialTransferSizeLong();

Integer maxConcurrency = other.getMaxConcurrency();
if (maxConcurrency == null) {
maxConcurrency = BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS;
Expand All @@ -156,7 +161,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
return new com.azure.storage.common.ParallelTransferOptions().setBlockSizeLong(blockSize)
.setMaxConcurrency(maxConcurrency)
.setProgressListener(other.getProgressListener())
.setMaxSingleUploadSizeLong(maxSingleUploadSize);
.setMaxSingleUploadSizeLong(maxSingleUploadSize)
.setInitialTransferSizeLong(initialTransferSize);
}

/**
Expand All @@ -169,7 +175,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
return new com.azure.storage.common.ParallelTransferOptions().setBlockSizeLong(blobOptions.getBlockSizeLong())
.setMaxConcurrency(blobOptions.getMaxConcurrency())
.setProgressListener(blobOptions.getProgressListener())
.setMaxSingleUploadSizeLong(blobOptions.getMaxSingleUploadSizeLong());
.setMaxSingleUploadSizeLong(blobOptions.getMaxSingleUploadSizeLong())
.setInitialTransferSizeLong(blobOptions.getInitialTransferSizeLong());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
import com.azure.storage.blob.implementation.accesshelpers.BlobDownloadAsyncResponseConstructorProxy;
import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders;
import com.azure.storage.blob.implementation.util.ModelHelper;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.common.policy.DecoderState;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.AsynchronousByteChannel;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

/**
Expand All @@ -29,15 +33,37 @@
public final class BlobDownloadAsyncResponse extends ResponseBase<BlobDownloadHeaders, Flux<ByteBuffer>>
implements Closeable {

private static final ClientLogger LOGGER = new ClientLogger(BlobDownloadAsyncResponse.class);

static {
BlobDownloadAsyncResponseConstructorProxy.setAccessor(BlobDownloadAsyncResponse::new);
BlobDownloadAsyncResponseConstructorProxy
.setAccessor(new BlobDownloadAsyncResponseConstructorProxy.BlobDownloadAsyncResponseConstructorAccessor() {
@Override
public BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
AtomicReference<DecoderState> decoderStateRef) {
return new BlobDownloadAsyncResponse(sourceResponse, onErrorResume, retryOptions, decoderStateRef);
}

@Override
public StreamResponse getSourceResponse(BlobDownloadAsyncResponse response) {
return response.sourceResponse;
}

@Override
public DecoderState getDecoderState(BlobDownloadAsyncResponse response) {
AtomicReference<DecoderState> ref = response.decoderStateRef;
return ref == null ? null : ref.get();
}
});
}

private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

private final StreamResponse sourceResponse;
private final BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume;
private final DownloadRetryOptions retryOptions;
private final AtomicReference<DecoderState> decoderStateRef;

/**
* Constructs a {@link BlobDownloadAsyncResponse}.
Expand All @@ -54,6 +80,7 @@ public BlobDownloadAsyncResponse(HttpRequest request, int statusCode, HttpHeader
this.sourceResponse = null;
this.onErrorResume = null;
this.retryOptions = null;
this.decoderStateRef = null;
}

/**
Expand All @@ -65,11 +92,26 @@ public BlobDownloadAsyncResponse(HttpRequest request, int statusCode, HttpHeader
*/
BlobDownloadAsyncResponse(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions) {
this(sourceResponse, onErrorResume, retryOptions, null);
}

BlobDownloadAsyncResponse(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
AtomicReference<DecoderState> decoderStateRef) {
this(sourceResponse, onErrorResume, retryOptions, decoderStateRef, extractHeaders(sourceResponse));
}

private BlobDownloadAsyncResponse(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
AtomicReference<DecoderState> decoderStateRef, BlobDownloadHeaders deserializedHeaders) {
super(sourceResponse.getRequest(), sourceResponse.getStatusCode(), sourceResponse.getHeaders(),
createResponseFlux(sourceResponse, onErrorResume, retryOptions), extractHeaders(sourceResponse));
createResponseFluxWithContentCrc(sourceResponse, onErrorResume, retryOptions, decoderStateRef,
deserializedHeaders),
deserializedHeaders);
this.sourceResponse = Objects.requireNonNull(sourceResponse, "'sourceResponse' must not be null");
this.onErrorResume = Objects.requireNonNull(onErrorResume, "'onErrorResume' must not be null");
this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' must not be null");
this.decoderStateRef = decoderStateRef;
}

private static BlobDownloadHeaders extractHeaders(StreamResponse response) {
Expand All @@ -87,6 +129,28 @@ private static Flux<ByteBuffer> createResponseFlux(StreamResponse sourceResponse
.defaultIfEmpty(EMPTY_BUFFER);
}

/**
* Builds the response flux and populates ContentCrc64 on the deserialized headers when structured message
* decoding completes successfully.
*/
private static Flux<ByteBuffer> createResponseFluxWithContentCrc(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions,
AtomicReference<DecoderState> decoderStateRef, BlobDownloadHeaders deserializedHeaders) {
Flux<ByteBuffer> flux = createResponseFlux(sourceResponse, onErrorResume, retryOptions);
if (decoderStateRef != null && deserializedHeaders != null) {
flux = flux.doOnComplete(() -> {
DecoderState state = decoderStateRef.get();
if (state != null && state.isFinalized()) {
long crc = state.getComposedCrc64();
byte[] crcBytes = new byte[8];
ByteBuffer.wrap(crcBytes).order(ByteOrder.LITTLE_ENDIAN).putLong(crc);
deserializedHeaders.setContentCrc64(crcBytes);
}
});
}
return flux;
}

/**
* Transfers content bytes to the {@link AsynchronousByteChannel}.
* @param channel The destination {@link AsynchronousByteChannel}.
Expand All @@ -95,7 +159,12 @@ private static Flux<ByteBuffer> createResponseFlux(StreamResponse sourceResponse
*/
public Mono<Void> writeValueToAsync(AsynchronousByteChannel channel, ProgressReporter progressReporter) {
Objects.requireNonNull(channel, "'channel' must not be null");
LOGGER.atVerbose()
.addKeyValue("thread", Thread.currentThread().getName())
.log("BlobDownloadAsyncResponse.writeValueToAsync entry");
if (sourceResponse != null) {
LOGGER.atVerbose()
.log("BlobDownloadAsyncResponse.writeValueToAsync using sourceResponse (IOUtils.transfer)");
return IOUtils.transferStreamResponseToAsynchronousByteChannel(channel, sourceResponse, onErrorResume,
progressReporter, retryOptions.getMaxRetryRequests());
} else if (super.getValue() != null) {
Expand Down
Loading
Loading