Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f21b0ba
adding the StructuredMessageDecoder
gunjansingh-msft Oct 8, 2025
e1c23c5
adding the pipeline policy changes
gunjansingh-msft Oct 15, 2025
1d0cc92
smart retry changes
gunjansingh-msft Oct 29, 2025
b7ba234
fixing smart retry impl
gunjansingh-msft Dec 3, 2025
657f987
smart retry changes
gunjansingh-msft Dec 15, 2025
f7939e7
smart retry changes
gunjansingh-msft Dec 17, 2025
89f9292
smart retry changes
gunjansingh-msft Dec 17, 2025
62f72cc
smart retry changes
gunjansingh-msft Mar 16, 2026
09f49f9
adding content validation tests
gunjansingh-msft Mar 27, 2026
973cdf6
code refactoring
gunjansingh-msft Mar 28, 2026
a58dc63
fixing errors i introduced :(
ibrandes Mar 30, 2026
e12aa82
addressing review comments
gunjansingh-msft Apr 2, 2026
2b2e86d
code refactoring based on latest review comments
gunjansingh-msft Apr 3, 2026
11a2da2
code refactoring based on latest review comments
gunjansingh-msft Apr 8, 2026
c907649
simplifying retry mechanism
gunjansingh-msft Apr 8, 2026
5b153a4
removing dead code
gunjansingh-msft Apr 8, 2026
f72cb8b
addressing Kyle's review comments
gunjansingh-msft Apr 17, 2026
20e3303
addressing latest review comments
gunjansingh-msft Apr 17, 2026
b33a785
Merge origin/feature/storage/content-validation into feature/storage/…
gunjansingh-msft Apr 17, 2026
f9befff
refactoring based on latest review comments
gunjansingh-msft Apr 22, 2026
b3298c2
refactoring based on latest review comments
gunjansingh-msft Apr 22, 2026
ed8c3eb
refactoring based on latest review comments
gunjansingh-msft Apr 23, 2026
d7b0c51
refactoring based on latest review comments from kyle
gunjansingh-msft Apr 28, 2026
73f22e9
expanding test coverage
ibrandes Apr 28, 2026
c5f3a8c
removing unused imports
ibrandes Apr 28, 2026
2d5c9b8
recordings
ibrandes Apr 28, 2026
c1a3f1f
adding documentation to decoder classes
ibrandes Apr 29, 2026
3c77a76
small fixes and failure path tests
ibrandes Apr 29, 2026
aaa6414
addressing context comment
ibrandes Apr 29, 2026
e08eaba
analyze error
ibrandes Apr 29, 2026
a4393f5
removing close override from decodedresponse
ibrandes Apr 30, 2026
563c7dc
Merge branch 'feature/storage/content-validation' of https://github.c…
ibrandes Apr 30, 2026
73c0d25
line removal
ibrandes Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.azure.storage.common.policy.ResponseValidationPolicyBuilder;
import com.azure.storage.common.policy.ScrubEtagPolicy;
import com.azure.storage.common.policy.StorageBearerTokenChallengeAuthorizationPolicy;
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy;
import com.azure.storage.common.policy.StorageSharedKeyCredentialPolicy;

import java.net.MalformedURLException;
Expand Down Expand Up @@ -96,6 +97,8 @@ public static HttpPipeline buildPipeline(StorageSharedKeyCredential storageShare
// Closest to API goes first, closest to wire goes last.
List<HttpPipelinePolicy> policies = new ArrayList<>();

policies.add(new StorageContentValidationDecoderPolicy());
Comment thread
ibrandes marked this conversation as resolved.
Outdated

policies.add(getUserAgentPolicy(configuration, logOptions, clientOptions));
policies.add(new RequestIdPolicy());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ChunkedDownloadUtils {
public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse>> downloadFirstChunk(
BlobRange range, ParallelTransferOptions parallelTransferOptions, BlobRequestConditions requestConditions,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloader, boolean eTagLock) {
return downloadFirstChunk(range, parallelTransferOptions, requestConditions, downloader, eTagLock, null);
return downloadFirstChunk(range, parallelTransferOptions, requestConditions, downloader, null, eTagLock, null);
}

/*
Expand All @@ -48,6 +48,22 @@ public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse
BlobRange range, ParallelTransferOptions parallelTransferOptions, BlobRequestConditions requestConditions,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloader, boolean eTagLock,
Context context) {
return downloadFirstChunk(range, parallelTransferOptions, requestConditions, downloader, null, eTagLock,
context);
}

/*
Has a context value for additional download adjustments and an optional fallback downloader for empty blobs.

Download the first chunk. Construct a Mono which will emit the total count for calculating the number of chunks,
access conditions containing the etag to lock on, and the response from downloading the first chunk.
*/
@SuppressWarnings("unchecked")
public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse>> downloadFirstChunk(
Comment thread
ibrandes marked this conversation as resolved.
Outdated
BlobRange range, ParallelTransferOptions parallelTransferOptions, BlobRequestConditions requestConditions,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloader,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> emptyBlobDownloader,
boolean eTagLock, Context context) {
// We will scope our initial download to either be one chunk or the total size.
long initialChunkSize
= range.getCount() != null && range.getCount() < parallelTransferOptions.getBlockSizeLong()
Expand All @@ -69,7 +85,12 @@ public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse
: requestConditions;

// Extract the total length of the blob from the contentRange header. e.g. "bytes 1-6/7"
long totalLength = extractTotalBlobLength(response.getDeserializedHeaders().getContentRange());
// Fall back to contentLength when contentRange isn't present (e.g., 304 responses).
String contentRange = response.getDeserializedHeaders().getContentRange();
Long contentLength = response.getDeserializedHeaders().getContentLength();
long totalLength = contentRange != null
? extractTotalBlobLength(contentRange)
: (contentLength == null ? 0L : contentLength);

if (context != null) {
Optional<Object> contextAdjustment = context.getData(Constants.ADJUSTED_BLOB_LENGTH_KEY);
Expand Down Expand Up @@ -99,7 +120,9 @@ public static Mono<Tuple3<Long, BlobRequestConditions, BlobDownloadAsyncResponse
&& extractTotalBlobLength(
blobStorageException.getResponse().getHeaders().getValue(HttpHeaderName.CONTENT_RANGE)) == 0) {

return downloader.apply(new BlobRange(0, 0L), requestConditions)
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> fallbackDownloader
= emptyBlobDownloader != null ? emptyBlobDownloader : downloader;
return fallbackDownloader.apply(new BlobRange(0, 0L), requestConditions)
// Subscribe on boundElastic instead of elastic as elastic is deprecated and boundElastic
// provided the same functionality with the added benefit that it won't infinitely create
// threads if needed and will instead queue.
Expand Down Expand Up @@ -143,7 +166,7 @@ public static <T> Flux<T> downloadChunk(Integer chunkNum, BlobDownloadAsyncRespo
private static BlobRequestConditions setEtag(BlobRequestConditions requestConditions, String etag) {
// We don't want to modify the user's object, so we'll create a duplicate and set the retrieved etag.
return new BlobRequestConditions().setIfModifiedSince(requestConditions.getIfModifiedSince())
.setIfUnmodifiedSince(requestConditions.getIfModifiedSince())
.setIfUnmodifiedSince(requestConditions.getIfUnmodifiedSince())
Comment thread
gunjansingh-msft marked this conversation as resolved.
Outdated
.setIfMatch(etag)
.setIfNoneMatch(requestConditions.getIfNoneMatch())
.setLeaseId(requestConditions.getLeaseId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.blob.implementation.util;

import com.azure.core.util.Context;
import com.azure.storage.common.StorageChecksumAlgorithm;
import com.azure.storage.common.implementation.Constants;

/**
* Centralizes download content validation decisions based on {@link StorageChecksumAlgorithm}.
* <p>
* Mirrors the pattern established by {@code ContentValidationModeResolver} for uploads.
* <p>
* RESERVED FOR INTERNAL USE.
*/
public final class DownloadValidationUtils {

private DownloadValidationUtils() {
}

/**
* Whether the algorithm requires structured message decoding (CRC64 / AUTO).
*/
public static boolean isStructuredMessageAlgorithm(StorageChecksumAlgorithm algorithm) {
return algorithm == StorageChecksumAlgorithm.CRC64 || algorithm == StorageChecksumAlgorithm.AUTO;
}

/**
* Resolves the effective algorithm, defaulting null to NONE.
*/
public static StorageChecksumAlgorithm resolveAlgorithm(StorageChecksumAlgorithm algorithm) {
return algorithm != null ? algorithm : StorageChecksumAlgorithm.NONE;
}

/**
* Adds structured message decoding context key when CRC64/AUTO validation is active.
*
* @param context The base context to augment. Null is treated as {@link Context#NONE}.
* @param algorithm The resolved checksum algorithm.
* @return The augmented context.
*/
public static Context applyStructuredMessageContext(Context context, StorageChecksumAlgorithm algorithm) {
Context base = context == null ? Context.NONE : context;
if (!isStructuredMessageAlgorithm(algorithm)) {
return base;
}
return base.addData(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.azure.storage.blob.implementation.accesshelpers.BlobDownloadAsyncResponseConstructorProxy;
import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders;
import com.azure.storage.blob.implementation.util.ModelHelper;
import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -29,8 +30,18 @@
public final class BlobDownloadAsyncResponse extends ResponseBase<BlobDownloadHeaders, Flux<ByteBuffer>>
implements Closeable {

private static final ClientLogger LOGGER = new ClientLogger(BlobDownloadAsyncResponse.class);

static {
BlobDownloadAsyncResponseConstructorProxy.setAccessor(BlobDownloadAsyncResponse::new);
BlobDownloadAsyncResponseConstructorProxy
.setAccessor(new BlobDownloadAsyncResponseConstructorProxy.BlobDownloadAsyncResponseConstructorAccessor() {
@Override
public BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume,
DownloadRetryOptions retryOptions) {
return new BlobDownloadAsyncResponse(sourceResponse, onErrorResume, retryOptions);
}
});
Comment thread
gunjansingh-msft marked this conversation as resolved.
Outdated
}

private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
Expand All @@ -56,13 +67,6 @@ public BlobDownloadAsyncResponse(HttpRequest request, int statusCode, HttpHeader
this.retryOptions = null;
}

/**
* Constructs a {@link BlobDownloadAsyncResponse}.
*
* @param sourceResponse The initial Stream Response
* @param onErrorResume Function used to resume.
* @param retryOptions Retry options.
*/
Comment thread
gunjansingh-msft marked this conversation as resolved.
BlobDownloadAsyncResponse(StreamResponse sourceResponse,
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions) {
super(sourceResponse.getRequest(), sourceResponse.getStatusCode(), sourceResponse.getHeaders(),
Expand Down Expand Up @@ -95,7 +99,12 @@ private static Flux<ByteBuffer> createResponseFlux(StreamResponse sourceResponse
*/
public Mono<Void> writeValueToAsync(AsynchronousByteChannel channel, ProgressReporter progressReporter) {
Objects.requireNonNull(channel, "'channel' must not be null");
LOGGER.atVerbose()
.addKeyValue("thread", Thread.currentThread().getName())
.log("BlobDownloadAsyncResponse.writeValueToAsync entry");
if (sourceResponse != null) {
LOGGER.atVerbose()
.log("BlobDownloadAsyncResponse.writeValueToAsync using sourceResponse (IOUtils.transfer)");
return IOUtils.transferStreamResponseToAsynchronousByteChannel(channel, sourceResponse, onErrorResume,
progressReporter, retryOptions.getMaxRetryRequests());
} else if (super.getValue() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.azure.storage.blob.implementation.util.BlobRequestConditionProperty;
import com.azure.storage.blob.implementation.util.BlobSasImplUtil;
import com.azure.storage.blob.implementation.util.ChunkedDownloadUtils;
import com.azure.storage.blob.implementation.util.DownloadValidationUtils;
import com.azure.storage.blob.implementation.util.ModelHelper;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobBeginCopySourceRequestConditions;
Expand Down Expand Up @@ -1259,27 +1260,29 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponseInternal(BlobRange ran
BlobRequestConditions requestConditions, boolean getRangeContentMd5,
StorageChecksumAlgorithm responseChecksumAlgorithm, Context context) {
BlobRange finalRange = range == null ? new BlobRange(0) : range;
Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null;

final StorageChecksumAlgorithm algorithm = DownloadValidationUtils.resolveAlgorithm(responseChecksumAlgorithm);
final boolean isStructuredMessageEnabled = DownloadValidationUtils.isStructuredMessageAlgorithm(algorithm);

final Boolean finalGetMD5 = (!isStructuredMessageEnabled && getRangeContentMd5) ? true : null;

BlobRequestConditions finalRequestConditions
= requestConditions == null ? new BlobRequestConditions() : requestConditions;
DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options;

// The first range should eagerly convert headers as they'll be used to create response types.
Comment thread
gunjansingh-msft marked this conversation as resolved.
Context firstRangeContext = context == null
final Context baseContext = context == null
? new Context("azure-eagerly-convert-headers", true)
: context.addData("azure-eagerly-convert-headers", true);

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5,
firstRangeContext).map(response -> {
final Context downloadContext = DownloadValidationUtils.applyStructuredMessageContext(baseContext, algorithm);
Comment thread
gunjansingh-msft marked this conversation as resolved.
Outdated

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), finalGetMD5,
downloadContext).map(response -> {
BlobsDownloadHeaders blobsDownloadHeaders = new BlobsDownloadHeaders(response.getHeaders());
String eTag = blobsDownloadHeaders.getETag();
BlobDownloadHeaders blobDownloadHeaders = ModelHelper.populateBlobDownloadHeaders(blobsDownloadHeaders,
ModelHelper.getErrorCode(response.getHeaders()));

/*
* If the customer did not specify a count, they are reading to the end of the blob. Extract this value
* from the response for better book-keeping towards the end.
*/
Comment thread
gunjansingh-msft marked this conversation as resolved.
long finalCount;
long initialOffset = finalRange.getOffset();
if (finalRange.getCount() == null) {
Expand All @@ -1289,38 +1292,38 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponseInternal(BlobRange ran
finalCount = finalRange.getCount();
}

// The resume function takes throwable and offset at the destination.
// I.e. offset is relative to the starting point.
Comment thread
gunjansingh-msft marked this conversation as resolved.
BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> {
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
return Mono.error(throwable);
}

long newCount = finalCount - offset;

/*
* It's possible that the network stream will throw an error after emitting all data but before
* completing. Issuing a retry at this stage would leave the download in a bad state with
* incorrect count and offset values. Because we have read the intended amount of data, we can
* ignore the error at the end of the stream.
*/
if (newCount == 0) {
LOGGER.warning("Exception encountered in ReliableDownload after all data read from the network "
+ "but before stream signaled completion. Returning success as all data was downloaded. "
+ "Exception message: " + throwable.getMessage());
return Mono.empty();
Comment thread
ibrandes marked this conversation as resolved.
Comment thread
gunjansingh-msft marked this conversation as resolved.
}

try {
return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions,
eTag, getMD5, context);
long newCount = finalCount - offset;

/*
* It's possible that the network stream will throw an error after emitting all data but
* before completing. Issuing a retry at this stage would leave the download in a bad
* state with incorrect count and offset values. Because we have read the intended amount
* of data, we can ignore the error at the end of the stream.
*/
if (newCount == 0) {
LOGGER.warning(
"Exception encountered in ReliableDownload after all data read from the network "
+ "but before stream signaled completion. Returning success as all data was "
+ "downloaded. Exception message: " + throwable.getMessage());
return Mono.empty();
}

BlobRange retryRange = new BlobRange(initialOffset + offset, newCount);
return downloadRange(retryRange, finalRequestConditions, eTag, finalGetMD5, downloadContext);
Comment thread
ibrandes marked this conversation as resolved.
Outdated
} catch (Exception e) {
return Mono.error(e);
}
};

return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume, finalOptions);
});

Comment thread
gunjansingh-msft marked this conversation as resolved.
Outdated
}

private Mono<StreamResponse> downloadRange(BlobRange range, BlobRequestConditions requestConditions, String eTag,
Expand Down Expand Up @@ -1536,7 +1539,6 @@ Mono<Response<BlobProperties>> downloadToFileWithResponse(BlobDownloadToFileOpti
BlobRequestConditions finalConditions
= options.getRequestConditions() == null ? new BlobRequestConditions() : options.getRequestConditions();

// Default behavior is not to overwrite
Comment thread
gunjansingh-msft marked this conversation as resolved.
Set<OpenOption> openOptions = options.getOpenOptions();
if (openOptions == null) {
openOptions = DEFAULT_OPEN_OPTIONS_SET;
Expand Down
Loading
Loading