Skip to content

Storage - Content Validation Decoder Implementation#47016

Open
gunjansingh-msft wants to merge 16 commits intofeature/storage/content-validationfrom
feature/storage/pipelinepolicy
Open

Storage - Content Validation Decoder Implementation#47016
gunjansingh-msft wants to merge 16 commits intofeature/storage/content-validationfrom
feature/storage/pipelinepolicy

Conversation

@gunjansingh-msft
Copy link
Copy Markdown
Member

@gunjansingh-msft gunjansingh-msft commented Oct 15, 2025

No description provided.

@github-actions github-actions bot added the Storage Storage Service (Queues, Blobs, Files) label Oct 15, 2025
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Oct 15, 2025

API Change Check

APIView identified API level changes in this PR and created the following API reviews

com.azure:azure-storage-blob
com.azure:azure-storage-common

Copy link
Copy Markdown
Member

@kyleknap kyleknap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's looking good! It's great to see us getting a working end to implementation of downloads. I just left some higher-level comments as we keeping hashing out the download implementation.

@gunjansingh-msft gunjansingh-msft requested a review from a team as a code owner December 3, 2025 15:54
Copy link
Copy Markdown
Member

@ibrandes ibrandes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will be 100% honest - i was really struggling to walk through the logic in here, there's so much to look at x) my comments should highlight some of the more confusing areas. don't worry about polishing anything too much, but adding more subfunctions, centralizing util logic, and reducing the nesting should make this a lot easier to follow.

overall, i think you're going down the right path with how you utilize and set the decoder state. also, definitely look into downloadToFileImpl as mentioned in one of my comments - ProgressListener and ProgressReporter might be able to help us reduce some of the boilerplate code you've had to write.

@github-actions
Copy link
Copy Markdown
Contributor

Hi @gunjansingh-msft. Thank you for your interest in helping to improve the Azure SDK experience and for your contribution. We've noticed that there hasn't been recent engagement on this pull request. If this is still an active work stream, please let us know by pushing some changes or leaving a comment. Otherwise, we'll close this out in 7 days.

@github-actions github-actions bot added no-recent-activity There has been no recent activity on this issue. and removed no-recent-activity There has been no recent activity on this issue. labels Mar 13, 2026
Copy link
Copy Markdown
Member

@ibrandes ibrandes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments about things i immediately noticed while reviewing - going to continue to review logic over the next day or so :)

Comment on lines +105 to +127
/**
* Gets the size of the first range requested when downloading.
* @return The initial transfer size.
*/
public Long getInitialTransferSizeLong() {
return this.initialTransferSize;
}

/**
* Sets the size of the first range requested when downloading.
* This value may be larger than the block size used for subsequent ranges.
*
* @param initialTransferSize The initial transfer size.
* @return The ParallelTransferOptions object itself.
*/
public ParallelTransferOptions setInitialTransferSizeLong(Long initialTransferSize) {
if (initialTransferSize != null) {
StorageImplUtils.assertInBounds("initialTransferSize", initialTransferSize, 1, Long.MAX_VALUE);
}
this.initialTransferSize = initialTransferSize;
return this;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParallelTransferOptions is publicly exposed to customers - is initialTransferSize a value we want them to be able to configure? if not, we should move this somewhere internal.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .NET SDK explicitly exposes InitialTransferSize as a public, customer-configurable property on their equivalent of ParallelTransferOptions. Java implementation mirrors this design exactly. let me know if you feel otherwise.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .NET SDK explicitly exposes InitialTransferSize as a public, customer-configurable property on their equivalent of ParallelTransferOptions. the Java implementation mirrors this design exactly. let me know if you feel otherwise.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like a separate feature to me - i don't think it is necessary for content validation specifically. we should remove it and keep our changes minimal.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements structured-message decoding and CRC64-based content validation for Storage Blob downloads, including smarter retry behavior that can resume from validated segment boundaries.

Changes:

  • Adds StorageContentValidationDecoderPolicy + supporting decoder state/helpers to decode/validate structured message responses and surface machine-readable retry offsets.
  • Extends download plumbing (range retries, download-to-file) to support structured-message decoding and aggregate CRC validation across retries/partitions.
  • Adds/updates unit and integration tests for decoder behavior, CRC helpers, and blob download scenarios.

Reviewed changes

Copilot reviewed 29 out of 29 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicyTest.java Adds unit tests for retry-offset/decoder-offset parsing helpers.
sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecoderTests.java Adds chunk-splitting and retry-boundary tests for the structured message decoder.
sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/contentvalidation/StorageCrc64CalculatorTests.java Adds coverage for new CRC helper overloads (computeTwo, ByteBuffer, slice).
sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/StorageCommonTestUtils.java Makes JDK HTTP client selection fail fast when unavailable (test infra robustness).
sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java Improves partial-response fault injection (range tracking, configurable truncation).
sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockDownloadHttpResponse.java Enhances mock response to support body collection and proper resource closing.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java New pipeline policy to decode structured messages, validate CRC, and emit retry offsets.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/DecoderState.java New state carrier for per-response decoder progress and retry offset computation.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/DecodedResponse.java Wraps an HttpResponse with a decoded body Flux while managing response lifetime.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/ContentValidationDecoderUtils.java Adds parsing helpers and download eligibility checks for the decoding policy.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/AggregateCrcState.java Aggregates CRC/segment info across retries for end-to-end validation.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java Adds initialTransferSize option (common) for tuning the first download range size.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecodingStream.java Adds a structured-message decoding wrapper utility for Flux streams.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecoder.java New incremental decoder with segment CRC validation and smart-retry boundary tracking.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/contentvalidation/StorageCrc64Calculator.java Adds CRC overloads for array slices, ByteBuffers, and dual CRC computation.
sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java Introduces context keys and config toggles used by structured message decoding/retries.
sdk/storage/azure-storage-common/checkstyle-suppressions.xml Updates suppressions to new checkstyle check names and adds one suppression entry.
sdk/storage/azure-storage-blob/src/test/resources/logback-test.xml Adds module test logging configuration.
sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java Adds sync integration tests for CRC64 structured-message decoding download APIs.
sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageAsyncDecoderDownloadTests.java Adds async integration tests including interruption/retry scenarios.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java Adjusts sync download-to-file option handling and concurrency behavior.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java Adds structured-message retry logic and reworks download-to-file flow for validation.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java Adds initialTransferSize option (blob models) and propagates it through defaults/wrapping.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/BlobDownloadAsyncResponse.java Plumbs decoder state through response construction and sets ContentCrc64 post-consumption.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java Propagates initialTransferSize through defaulting and wrapping helpers.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ChunkedDownloadUtils.java Enhances first-chunk logic (contentLength fallback, optional empty-blob downloader).
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BuilderHelper.java Adds the decoding policy into the blob pipeline.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobConstants.java Introduces download range sizing and concurrency-related constants.
sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/accesshelpers/BlobDownloadAsyncResponseConstructorProxy.java Extends access helper to carry decoder state and expose source response/decoder state accessors.

Comment on lines +32 to +34
byte[] data = new byte[copy.remaining()];
copy.get(data);
runningCrc = StorageCrc64Calculator.compute(data, runningCrc);
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateCrcState.appendPayload copies the payload into a new byte[] on every call. Since StorageCrc64Calculator now has compute(ByteBuffer, long), this can be updated to compute directly from the ByteBuffer (using a duplicate/slice) to avoid per-buffer allocations and extra memory copies during large downloads.

Suggested change
byte[] data = new byte[copy.remaining()];
copy.get(data);
runningCrc = StorageCrc64Calculator.compute(data, runningCrc);
runningCrc = StorageCrc64Calculator.compute(copy, runningCrc);

Copilot uses AI. Check for mistakes.
Comment on lines +2105 to +2107
byte[] bytes = new byte[length];
slice.get(bytes);
crc.updateAndGet(previous -> StorageCrc64Calculator.compute(bytes, previous));
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crc64TrackingAsynchronousByteChannel.updateCrc allocates a new byte[] and copies data for every write completion. This can significantly increase GC pressure for large downloads; consider leveraging StorageCrc64Calculator.compute(ByteBuffer, long) over a duplicate/slice of the written region to avoid the extra allocation/copy.

Suggested change
byte[] bytes = new byte[length];
slice.get(bytes);
crc.updateAndGet(previous -> StorageCrc64Calculator.compute(bytes, previous));
crc.updateAndGet(previous -> StorageCrc64Calculator.compute(slice, previous));

Copilot uses AI. Check for mistakes.
Comment on lines +151 to +166
LOGGER.atInfo()
.addKeyValue("newBytes", buffer.remaining())
.addKeyValue("decoderOffset", decoder.getMessageOffset())
.addKeyValue("lastCompleteSegment", decoder.getLastCompleteSegmentStart())
.addKeyValue("totalDecodedPayload", decoder.getTotalDecodedPayloadBytes())
.log("Received buffer in decodeStream");

try {
StructuredMessageDecoder.DecodeResult result = decoder.decodeChunk(buffer);

LOGGER.atInfo()
.addKeyValue("status", result.getStatus())
.addKeyValue("bytesConsumed", result.getBytesConsumed())
.addKeyValue("decoderOffset", decoder.getMessageOffset())
.addKeyValue("lastCompleteSegment", decoder.getLastCompleteSegmentStart())
.log("Decode chunk result");
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StorageContentValidationDecoderPolicy logs per-buffer and per-chunk decode details at INFO. For downloads this can produce extremely high-volume logs and impact performance/operational noise; consider moving these hot-path logs to verbose/debug level (keeping INFO for one-time lifecycle events or failures).

Copilot uses AI. Check for mistakes.
Comment on lines +653 to +658
LOGGER.atInfo()
.addKeyValue("newBytes", buffer.remaining())
.addKeyValue("pendingBytes", pendingBytes.size())
.addKeyValue("decoderOffset", messageOffset)
.addKeyValue("lastCompleteSegment", lastCompleteSegmentStart)
.log("Received buffer in decode");
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StructuredMessageDecoder emits very frequent INFO logs on the decode hot path (every chunk/header/footer). This will be noisy and can materially slow down high-throughput downloads; consider switching these to verbose/debug and reserving INFO for summary or error conditions.

Copilot uses AI. Check for mistakes.
Comment on lines +231 to +232
byte[] data = "Hello World!".getBytes();
long expected = StorageCrc64Calculator.compute(data, 0);
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests use String.getBytes() without specifying a charset, which depends on the platform default. Use a fixed charset (e.g., StandardCharsets.UTF_8) to keep the test deterministic across environments.

Copilot uses AI. Check for mistakes.
Comment on lines +239 to +240
byte[] data = "Hello World!".getBytes();
long expected = StorageCrc64Calculator.compute(data, 0);
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests use String.getBytes() without specifying a charset, which depends on the platform default. Use a fixed charset (e.g., StandardCharsets.UTF_8) to keep the test deterministic across environments.

Copilot uses AI. Check for mistakes.
Comment on lines +119 to +123
byte[] originalData = new byte[0];

// For zero-length data, encoder behavior varies - let's test with minimal data
byte[] minimalData = new byte[1];
ThreadLocalRandom.current().nextBytes(minimalData);
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In handlesZeroLengthSegment, the local variable originalData is unused (and the test ends up encoding/decoding minimalData instead). Removing the unused variable and/or adjusting the test name/comment to match what is actually being validated will improve clarity and avoid style-check failures.

Copilot uses AI. Check for mistakes.

com.azure.storage.common.ParallelTransferOptions parallelTransferOptions = options.getParallelTransferOptions();
Integer maxConcurrency = parallelTransferOptions == null ? null : parallelTransferOptions.getMaxConcurrency();
if (maxConcurrency != null && maxConcurrency <= 1) {
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adjustOptionsForSyncDownload forces maxConcurrency to 1 whenever it’s null or > 1. This changes the behavior/performance characteristics of sync downloadToFile* calls even when the caller explicitly requested parallelism; if this is required for a specific scenario (e.g., structured decoding limitations), consider gating it to that scenario and/or documenting the behavior clearly.

Suggested change
if (maxConcurrency != null && maxConcurrency <= 1) {
/*
* For synchronous downloads, we default to a concurrency of 1 when the caller has not explicitly set a
* maxConcurrency value. If the caller has provided a non-null maxConcurrency (including values greater
* than 1), we preserve that configuration rather than overriding it here.
*/
if (maxConcurrency != null) {

Copilot uses AI. Check for mistakes.
</encoder>
</appender>

<root level="INFO">
Copy link

Copilot AI Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the test logging root level to INFO will emit a lot of logs across the entire azure-storage-blob test suite, which can slow tests and clutter CI output. Consider raising this to WARN/ERROR (or scoping INFO to specific loggers) and enabling INFO only when troubleshooting.

Suggested change
<root level="INFO">
<!-- Use WARN by default to keep test and CI output concise. Temporarily raise to INFO when troubleshooting. -->
<root level="WARN">

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

@ibrandes ibrandes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple more things i noticed:

Comment on lines +135 to +139
if (JDK_HTTP_HTTP_CLIENT == null) {
throw new IllegalStateException(
"JDK HTTP client is not available (e.g. requires Java 19+ for virtual threads). "
+ "Use NETTY, OK_HTTP, or VERTX instead.");
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this added?

/**
* Structured message header value for CRC64 validation.
*/
public static final String STRUCTURED_MESSAGE_CRC64_BODY_TYPE = "XSM/1.0; properties=crc64";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already in StructuredMessageConstants.java‎

Comment on lines +1282 to +1287
final StorageChecksumAlgorithm algorithm
= responseChecksumAlgorithm != null ? responseChecksumAlgorithm : StorageChecksumAlgorithm.NONE;
final boolean isStructuredMessageEnabled = isStructuredMessageAlgorithm(algorithm);
final boolean isMd5Enabled = algorithm == StorageChecksumAlgorithm.MD5;

final Boolean finalGetMD5 = (!isStructuredMessageEnabled && (getRangeContentMd5 || isMd5Enabled)) ? true : null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logic for resolving the validation type and adding flags to the context can be consolidated into util methods. by consolidating it, instead of re-writing the logic for every client method, we ideally just have to call the util method. i have an example of this in my encoder PR here: link. ideally, after a couple of review cycles, we can combine our util methods into one file, but for now, it would be good to create your own to establish the pattern.

Comment on lines +2112 to +2204
private static final class Md5TrackingAsynchronousByteChannel implements AsynchronousByteChannel {
private final AsynchronousByteChannel channel;
private final MessageDigest digest;

long position = chunkNum * finalParallelTransferOptions.getBlockSizeLong();
return response.writeValueToAsync(IOUtils.toAsynchronousByteChannel(file, position), progressReporter);
private Md5TrackingAsynchronousByteChannel(AsynchronousByteChannel channel) {
this.channel = channel;
try {
this.digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw LOGGER.logExceptionAsError(new IllegalStateException("MD5 MessageDigest unavailable.", e));
}
}

private byte[] getMd5() {
synchronized (digest) {
return digest.digest();
}
}

@Override
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
channel.read(dst, attachment, handler);
}

@Override
public Future<Integer> read(ByteBuffer dst) {
return channel.read(dst);
}

@Override
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
int startPos = src.position();
ByteBuffer duplicate = src.duplicate();
channel.write(src, attachment, new CompletionHandler<Integer, A>() {
@Override
public void completed(Integer result, A att) {
if (result != null && result > 0) {
updateMd5(duplicate, startPos, result);
}
handler.completed(result, att);
}

@Override
public void failed(Throwable exc, A att) {
handler.failed(exc, att);
}
});
}

@Override
public Future<Integer> write(ByteBuffer src) {
CompletableFuture<Integer> future = new CompletableFuture<>();
int startPos = src.position();
ByteBuffer duplicate = src.duplicate();
channel.write(src, src, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result != null && result > 0) {
updateMd5(duplicate, startPos, result);
}
future.complete(result);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
future.completeExceptionally(exc);
}
});
return future;
}

@Override
public boolean isOpen() {
return channel.isOpen();
}

@Override
public void close() throws IOException {
channel.close();
}

private void updateMd5(ByteBuffer buffer, int startPos, int length) {
if (length <= 0) {
return;
}

ByteBuffer slice = buffer.duplicate();
slice.position(startPos);
slice.limit(startPos + length);
synchronized (digest) {
digest.update(slice);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding support for md5 isn't in the scope of content validation - this can be removed.

Comment on lines +1702 to +1765
= (range, conditions) -> isStructuredMessageEnabled
? this.downloadStreamWithResponseInternal(range, downloadRetryOptions, conditions, rangeGetContentMd5,
responseChecksumAlgorithm, downloadContext)
: this.downloadStreamWithResponse(range, downloadRetryOptions, conditions, rangeGetContentMd5,
downloadContext);
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> emptyBlobDownloadFunc = (range,
conditions) -> this.downloadStreamWithResponse(range, downloadRetryOptions, conditions, false, context);

boolean checksumValidationEnabled = isStructuredMessageEnabled || rangeGetContentMd5 || isMd5Enabled;
boolean md5ValidationEnabled = !isStructuredMessageEnabled && (rangeGetContentMd5 || isMd5Enabled);

long rangeSize = blockSizeProvided
? Math.min(finalParallelTransferOptions.getBlockSizeLong(), BlobConstants.BLOB_MAX_DOWNLOAD_BYTES)
: (checksumValidationEnabled
? BlobConstants.BLOB_MAX_HASH_REQUEST_DOWNLOAD_RANGE
: BlobConstants.BLOB_DEFAULT_DOWNLOAD_RANGE_SIZE);

Long requestedInitialTransferSize = finalParallelTransferOptions.getInitialTransferSizeLong();
long initialRangeSize = requestedInitialTransferSize != null && requestedInitialTransferSize > 0
? requestedInitialTransferSize
: (checksumValidationEnabled
? BlobConstants.BLOB_MAX_HASH_REQUEST_DOWNLOAD_RANGE
: BlobConstants.BLOB_DEFAULT_INITIAL_DOWNLOAD_RANGE_SIZE);

int maxConcurrency = maxConcurrencyProvided
? finalParallelTransferOptions.getMaxConcurrency()
: getDefaultDownloadConcurrency();

com.azure.storage.common.ParallelTransferOptions initialParallelTransferOptions
= new com.azure.storage.common.ParallelTransferOptions().setBlockSizeLong(initialRangeSize);

boolean useMasterCrc = isStructuredMessageEnabled;

LOGGER.atVerbose()
.addKeyValue("thread", Thread.currentThread().getName())
.log("BlobAsyncClientBase.downloadToFileImpl calling downloadFirstChunk");
return ChunkedDownloadUtils
.downloadFirstChunk(finalRange, finalParallelTransferOptions, requestConditions, downloadFunc, true,
context)
.downloadFirstChunk(finalRange, initialParallelTransferOptions, requestConditions, downloadFunc,
emptyBlobDownloadFunc, true, downloadContext)
.doOnSuccess(t -> LOGGER.atVerbose()
.addKeyValue("newCount", t != null ? t.getT1() : null)
.addKeyValue("thread", Thread.currentThread().getName())
.log("BlobAsyncClientBase downloadFirstChunk returned"))
.flatMap(setupTuple3 -> {
long newCount = setupTuple3.getT1();
BlobRequestConditions finalConditions = setupTuple3.getT2();
BlobDownloadAsyncResponse initialResponse = setupTuple3.getT3();
LOGGER.atVerbose()
.addKeyValue("newCount", newCount)
.addKeyValue("thread", Thread.currentThread().getName())
.log("BlobAsyncClientBase flatMap after first chunk");

if (initialResponse.getStatusCode() == 304 || newCount == 0) {
return Mono.fromCallable(() -> {
Response<BlobProperties> propertiesResponse
= ModelHelper.buildBlobPropertiesResponse(initialResponse);
try {
initialResponse.close();
} catch (IOException e) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
}
return propertiesResponse;
});
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should figure out how to consolidate this logic more. i don't think its realistic to apply all of this logic to a method every time we need to add content validation to a new API. i think it would be beneficial to look into how our existing decryption policy works and determine what patterns we can re-use from there.

Comment on lines +1829 to +1866
private static Mono<Void> drainQueuedResponses(ArrayDeque<CompletableFuture<BlobDownloadAsyncResponse>> running,
Iterator<BlobRange> remainingIterator,
BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloadFunc,
BlobRequestConditions finalConditions, AsynchronousByteChannel targetChannel, ProgressReporter progressReporter,
boolean useMasterCrc, ComposedCrcState composedCrcState, boolean md5ValidationEnabled) {
return Mono.defer(() -> {
CompletableFuture<BlobDownloadAsyncResponse> nextFuture = running.poll();
if (nextFuture == null) {
return Mono.empty();
}

if (remainingIterator.hasNext()) {
BlobRange nextRange = remainingIterator.next();
running.add(downloadFunc.apply(nextRange, finalConditions).toFuture());
}

return Mono.fromFuture(nextFuture)
.flatMap(response -> writeBodyToFile(response, targetChannel,
progressReporter == null ? null : progressReporter.createChild(), useMasterCrc, composedCrcState,
md5ValidationEnabled).doFinally(signalType -> closeQuietly(response)))
.then(Mono.defer(() -> drainQueuedResponses(running, remainingIterator, downloadFunc, finalConditions,
targetChannel, progressReporter, useMasterCrc, composedCrcState, md5ValidationEnabled)));
});
}

private static void closePendingResponses(ArrayDeque<CompletableFuture<BlobDownloadAsyncResponse>> running) {
CompletableFuture<BlobDownloadAsyncResponse> future;
while ((future = running.poll()) != null) {
future.whenComplete((response, throwable) -> {
if (response != null) {
closeQuietly(response);
}
});
if (!future.isDone()) {
future.cancel(true);
}
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure i understand why it is necessary to utilize AsynchronousByteChannel in the client when we hadn't done so before.

Comment on lines +1944 to +1962
private static void validateResponseMd5(BlobDownloadAsyncResponse response,
Md5TrackingAsynchronousByteChannel md5Channel) {
if (md5Channel == null) {
return;
}

byte[] expectedMd5
= response.getDeserializedHeaders() == null ? null : response.getDeserializedHeaders().getContentMd5();
if (expectedMd5 == null || expectedMd5.length == 0) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException("Content-MD5 header missing from download response."));
}

byte[] actualMd5 = md5Channel.getMd5();
if (!Arrays.equals(expectedMd5, actualMd5)) {
throw LOGGER
.logExceptionAsError(new IllegalArgumentException("MD5 mismatch detected in download response."));
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be removed - md5 is not in the scope of this project.

Comment on lines +1990 to +2016
private static final class ComposedCrcState {
private long crc;
private long length;

private void append(long nextCrc, long nextLength) {
if (nextLength <= 0) {
return;
}

if (length == 0) {
crc = nextCrc;
length = nextLength;
return;
}

crc = StorageCrc64Calculator.concat(0, 0, crc, length, 0, nextCrc, nextLength);
length += nextLength;
}

private long getCrc() {
return crc;
}

private long getLength() {
return length;
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary? i thought cumulative crc64 calculations were something that was going to be added on later down the line?

Comment on lines +34 to +53
/**
* The legacy default number of concurrent transfers for download operations.
*/
public static final int BLOB_LEGACY_DEFAULT_CONCURRENT_TRANSFERS_COUNT = 5;
/**
* The default range size used for download operations when no checksum validation is requested.
*/
public static final long BLOB_DEFAULT_DOWNLOAD_RANGE_SIZE = 4L * Constants.MB;
/**
* The default initial range size used for download operations when no checksum validation is requested.
*/
public static final long BLOB_DEFAULT_INITIAL_DOWNLOAD_RANGE_SIZE = 256L * Constants.MB;
/**
* The maximum range size used for download operations.
*/
public static final long BLOB_MAX_DOWNLOAD_BYTES = 256L * Constants.MB;
/**
* The maximum range size used when requesting a transactional checksum during download.
*/
public static final long BLOB_MAX_HASH_REQUEST_DOWNLOAD_RANGE = 4L * Constants.MB;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i think we should absolutely keep our changes as minimal as possible and revert it. changing these default values can have a huge impact on the entirety of the SDK, and it would require a significant amount of java specific research to determine if these numbers are right for our language. also, it may be necessary to re-run perf tests as these concurrency values can have a big impact on the previously generated numbers.

Comment on lines +105 to +127
/**
* Gets the size of the first range requested when downloading.
* @return The initial transfer size.
*/
public Long getInitialTransferSizeLong() {
return this.initialTransferSize;
}

/**
* Sets the size of the first range requested when downloading.
* This value may be larger than the block size used for subsequent ranges.
*
* @param initialTransferSize The initial transfer size.
* @return The ParallelTransferOptions object itself.
*/
public ParallelTransferOptions setInitialTransferSizeLong(Long initialTransferSize) {
if (initialTransferSize != null) {
StorageImplUtils.assertInBounds("initialTransferSize", initialTransferSize, 1, Long.MAX_VALUE);
}
this.initialTransferSize = initialTransferSize;
return this;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like a separate feature to me - i don't think it is necessary for content validation specifically. we should remove it and keep our changes minimal.

Comment on lines +1817 to +1827
return drainQueuedResponses(running, remainingIterator, downloadFunc, finalConditions, targetChannel,
progressReporter, useMasterCrc, composedCrcState, md5ValidationEnabled)
.doFinally(signalType -> closePendingResponses(running))
.then(Mono.fromCallable(() -> {
if (useMasterCrc) {
validateComposedCrc(composedCrcState, crcChannel);
}
return ModelHelper.buildBlobPropertiesResponse(initialResponse);
}));
});
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, by removing the composedCrc logic, we can cut down on a lot of this additional logic.

try {
final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions
= ModelHelper.wrapBlobOptions(ModelHelper.populateAndApplyDefaults(parallelTransferOptions));
= parallelTransferOptions == null ? null : ModelHelper.wrapBlobOptions(parallelTransferOptions);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to apply defaults here - finalParallelTransferOptions = ModelHelper.wrapBlobOptions(ModelHelper.populateAndApplyDefaults(parallelTransferOptions));

BlobRequestConditions finalConditions
= options.getRequestConditions() == null ? new BlobRequestConditions() : options.getRequestConditions();

// Default behavior is not to overwrite
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add this back

int numChunks = ChunkedDownloadUtils.calculateNumBlocks(newCount,
finalParallelTransferOptions.getBlockSizeLong());

// In case it is an empty blob, this ensures we still actually perform a download operation.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add back

progressReporter == null ? null : progressReporter.createChild()).flux()),
finalParallelTransferOptions.getMaxConcurrency())

// Only the first download call returns a value.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add back


BlobRange finalRange = options.getRange() == null ? new BlobRange(0) : options.getRange();
final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions
com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions

}
policies.add(new MetadataValidationPolicy());

policies.add(new StorageContentValidationDecoderPolicy());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a thought came up in my head, and I think it might be worth looking into - if we insert the decoder policy at the start of the pipeline (i.e. right after List<HttpPipelinePolicy> policies = new ArrayList<>();), i think it could simplify the retry logic a lot. for standard http retries, because the decryption policy is placed before the retry policy, each retry re-enters the policy with a fresh response body, so we won't need to change anything there. and for smart retries, i think we should be able to rely on the existing logic in BlobAsyncClientBase.downloadStreamWithResponseInternal, .. i verified this in our crypto package - the decoder there also doesn't do anything special to handle retries. it calculates everything based on the values passed in through the context in EncryptedBlobClient.populateRequestConditionsAndContext. will follow up over teams with more info about the crypto client.

uCrc0 = uCrc;

ByteBuffer buffer = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer buffer = ByteBuffer.wrap(src, offset, length).order(ByteOrder.LITTLE_ENDIAN);
Copy link
Copy Markdown
Member

@ibrandes ibrandes Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to change anything else in compute besides the first two lines:

int pData = offset;
long uSize = length;

Duration timeout, Context context) {
final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions
= ModelHelper.wrapBlobOptions(ModelHelper.populateAndApplyDefaults(parallelTransferOptions));
= parallelTransferOptions == null ? null : ModelHelper.wrapBlobOptions(parallelTransferOptions);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also need to apply defaults here

Comment on lines -2403 to -2404
* This method processes the input data in chunks of 32 bytes for efficiency and uses lookup tables
* to update the CRC values.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe this still applies to how we process the data, we should add it back

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Storage Storage Service (Queues, Blobs, Files)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants