Storage - Content Validation Decoder Implementation#47016
Storage - Content Validation Decoder Implementation#47016gunjansingh-msft wants to merge 16 commits intofeature/storage/content-validationfrom
Conversation
API Change CheckAPIView identified API level changes in this PR and created the following API reviews |
kyleknap
left a comment
There was a problem hiding this comment.
It's looking good! It's great to see us getting a working end to implementation of downloads. I just left some higher-level comments as we keeping hashing out the download implementation.
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Outdated
Show resolved
Hide resolved
...m/azure/storage/common/implementation/structuredmessage/StructuredMessageDecodingStream.java
Outdated
Show resolved
Hide resolved
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Show resolved
Hide resolved
ibrandes
left a comment
There was a problem hiding this comment.
i will be 100% honest - i was really struggling to walk through the logic in here, there's so much to look at x) my comments should highlight some of the more confusing areas. don't worry about polishing anything too much, but adding more subfunctions, centralizing util logic, and reducing the nesting should make this a lot easier to follow.
overall, i think you're going down the right path with how you utilize and set the decoder state. also, definitely look into downloadToFileImpl as mentioned in one of my comments - ProgressListener and ProgressReporter might be able to help us reduce some of the boilerplate code you've had to write.
...ure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BuilderHelper.java
Outdated
Show resolved
Hide resolved
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Outdated
Show resolved
Hide resolved
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Outdated
Show resolved
Hide resolved
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Show resolved
Hide resolved
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Outdated
Show resolved
Hide resolved
...mon/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java
Outdated
Show resolved
Hide resolved
...mon/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java
Show resolved
Hide resolved
...mon/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java
Outdated
Show resolved
Hide resolved
...mon/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java
Outdated
Show resolved
Hide resolved
...mon/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java
Outdated
Show resolved
Hide resolved
|
Hi @gunjansingh-msft. Thank you for your interest in helping to improve the Azure SDK experience and for your contribution. We've noticed that there hasn't been recent engagement on this pull request. If this is still an active work stream, please let us know by pushing some changes or leaving a comment. Otherwise, we'll close this out in 7 days. |
ibrandes
left a comment
There was a problem hiding this comment.
left some comments about things i immediately noticed while reviewing - going to continue to review logic over the next day or so :)
sdk/storage/azure-storage-blob/src/test/resources/logback-test.xml
Outdated
Show resolved
Hide resolved
...rage-blob/src/main/java/com/azure/storage/blob/implementation/util/ChunkedDownloadUtils.java
Outdated
Show resolved
Hide resolved
...ure/storage/blob/implementation/accesshelpers/BlobDownloadAsyncResponseConstructorProxy.java
Outdated
Show resolved
Hide resolved
...ure/storage/blob/implementation/accesshelpers/BlobDownloadAsyncResponseConstructorProxy.java
Outdated
Show resolved
Hide resolved
| /** | ||
| * Gets the size of the first range requested when downloading. | ||
| * @return The initial transfer size. | ||
| */ | ||
| public Long getInitialTransferSizeLong() { | ||
| return this.initialTransferSize; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the size of the first range requested when downloading. | ||
| * This value may be larger than the block size used for subsequent ranges. | ||
| * | ||
| * @param initialTransferSize The initial transfer size. | ||
| * @return The ParallelTransferOptions object itself. | ||
| */ | ||
| public ParallelTransferOptions setInitialTransferSizeLong(Long initialTransferSize) { | ||
| if (initialTransferSize != null) { | ||
| StorageImplUtils.assertInBounds("initialTransferSize", initialTransferSize, 1, Long.MAX_VALUE); | ||
| } | ||
| this.initialTransferSize = initialTransferSize; | ||
| return this; | ||
| } | ||
|
|
There was a problem hiding this comment.
ParallelTransferOptions is publicly exposed to customers - is initialTransferSize a value we want them to be able to configure? if not, we should move this somewhere internal.
There was a problem hiding this comment.
The .NET SDK explicitly exposes InitialTransferSize as a public, customer-configurable property on their equivalent of ParallelTransferOptions. Java implementation mirrors this design exactly. let me know if you feel otherwise.
There was a problem hiding this comment.
The .NET SDK explicitly exposes InitialTransferSize as a public, customer-configurable property on their equivalent of ParallelTransferOptions. the Java implementation mirrors this design exactly. let me know if you feel otherwise.
There was a problem hiding this comment.
this feels like a separate feature to me - i don't think it is necessary for content validation specifically. we should remove it and keep our changes minimal.
...n/java/com/azure/storage/common/implementation/contentvalidation/StorageCrc64Calculator.java
Show resolved
Hide resolved
...n/java/com/azure/storage/common/implementation/contentvalidation/StorageCrc64Calculator.java
Outdated
Show resolved
Hide resolved
...n/java/com/azure/storage/common/implementation/contentvalidation/StorageCrc64Calculator.java
Outdated
Show resolved
Hide resolved
...m/azure/storage/common/implementation/contentvalidation/StructuredMessageDecodingStream.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Implements structured-message decoding and CRC64-based content validation for Storage Blob downloads, including smarter retry behavior that can resume from validated segment boundaries.
Changes:
- Adds
StorageContentValidationDecoderPolicy+ supporting decoder state/helpers to decode/validate structured message responses and surface machine-readable retry offsets. - Extends download plumbing (range retries, download-to-file) to support structured-message decoding and aggregate CRC validation across retries/partitions.
- Adds/updates unit and integration tests for decoder behavior, CRC helpers, and blob download scenarios.
Reviewed changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicyTest.java | Adds unit tests for retry-offset/decoder-offset parsing helpers. |
| sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecoderTests.java | Adds chunk-splitting and retry-boundary tests for the structured message decoder. |
| sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/contentvalidation/StorageCrc64CalculatorTests.java | Adds coverage for new CRC helper overloads (computeTwo, ByteBuffer, slice). |
| sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/StorageCommonTestUtils.java | Makes JDK HTTP client selection fail fast when unavailable (test infra robustness). |
| sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java | Improves partial-response fault injection (range tracking, configurable truncation). |
| sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockDownloadHttpResponse.java | Enhances mock response to support body collection and proper resource closing. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java | New pipeline policy to decode structured messages, validate CRC, and emit retry offsets. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/DecoderState.java | New state carrier for per-response decoder progress and retry offset computation. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/DecodedResponse.java | Wraps an HttpResponse with a decoded body Flux while managing response lifetime. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/ContentValidationDecoderUtils.java | Adds parsing helpers and download eligibility checks for the decoding policy. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/AggregateCrcState.java | Aggregates CRC/segment info across retries for end-to-end validation. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java | Adds initialTransferSize option (common) for tuning the first download range size. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecodingStream.java | Adds a structured-message decoding wrapper utility for Flux streams. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecoder.java | New incremental decoder with segment CRC validation and smart-retry boundary tracking. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/contentvalidation/StorageCrc64Calculator.java | Adds CRC overloads for array slices, ByteBuffers, and dual CRC computation. |
| sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java | Introduces context keys and config toggles used by structured message decoding/retries. |
| sdk/storage/azure-storage-common/checkstyle-suppressions.xml | Updates suppressions to new checkstyle check names and adds one suppression entry. |
| sdk/storage/azure-storage-blob/src/test/resources/logback-test.xml | Adds module test logging configuration. |
| sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java | Adds sync integration tests for CRC64 structured-message decoding download APIs. |
| sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageAsyncDecoderDownloadTests.java | Adds async integration tests including interruption/retry scenarios. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java | Adjusts sync download-to-file option handling and concurrency behavior. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java | Adds structured-message retry logic and reworks download-to-file flow for validation. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java | Adds initialTransferSize option (blob models) and propagates it through defaults/wrapping. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/BlobDownloadAsyncResponse.java | Plumbs decoder state through response construction and sets ContentCrc64 post-consumption. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java | Propagates initialTransferSize through defaulting and wrapping helpers. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ChunkedDownloadUtils.java | Enhances first-chunk logic (contentLength fallback, optional empty-blob downloader). |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BuilderHelper.java | Adds the decoding policy into the blob pipeline. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobConstants.java | Introduces download range sizing and concurrency-related constants. |
| sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/accesshelpers/BlobDownloadAsyncResponseConstructorProxy.java | Extends access helper to carry decoder state and expose source response/decoder state accessors. |
...m/azure/storage/common/implementation/contentvalidation/StructuredMessageDecodingStream.java
Outdated
Show resolved
Hide resolved
| byte[] data = new byte[copy.remaining()]; | ||
| copy.get(data); | ||
| runningCrc = StorageCrc64Calculator.compute(data, runningCrc); |
There was a problem hiding this comment.
AggregateCrcState.appendPayload copies the payload into a new byte[] on every call. Since StorageCrc64Calculator now has compute(ByteBuffer, long), this can be updated to compute directly from the ByteBuffer (using a duplicate/slice) to avoid per-buffer allocations and extra memory copies during large downloads.
| byte[] data = new byte[copy.remaining()]; | |
| copy.get(data); | |
| runningCrc = StorageCrc64Calculator.compute(data, runningCrc); | |
| runningCrc = StorageCrc64Calculator.compute(copy, runningCrc); |
| byte[] bytes = new byte[length]; | ||
| slice.get(bytes); | ||
| crc.updateAndGet(previous -> StorageCrc64Calculator.compute(bytes, previous)); |
There was a problem hiding this comment.
Crc64TrackingAsynchronousByteChannel.updateCrc allocates a new byte[] and copies data for every write completion. This can significantly increase GC pressure for large downloads; consider leveraging StorageCrc64Calculator.compute(ByteBuffer, long) over a duplicate/slice of the written region to avoid the extra allocation/copy.
| byte[] bytes = new byte[length]; | |
| slice.get(bytes); | |
| crc.updateAndGet(previous -> StorageCrc64Calculator.compute(bytes, previous)); | |
| crc.updateAndGet(previous -> StorageCrc64Calculator.compute(slice, previous)); |
| LOGGER.atInfo() | ||
| .addKeyValue("newBytes", buffer.remaining()) | ||
| .addKeyValue("decoderOffset", decoder.getMessageOffset()) | ||
| .addKeyValue("lastCompleteSegment", decoder.getLastCompleteSegmentStart()) | ||
| .addKeyValue("totalDecodedPayload", decoder.getTotalDecodedPayloadBytes()) | ||
| .log("Received buffer in decodeStream"); | ||
|
|
||
| try { | ||
| StructuredMessageDecoder.DecodeResult result = decoder.decodeChunk(buffer); | ||
|
|
||
| LOGGER.atInfo() | ||
| .addKeyValue("status", result.getStatus()) | ||
| .addKeyValue("bytesConsumed", result.getBytesConsumed()) | ||
| .addKeyValue("decoderOffset", decoder.getMessageOffset()) | ||
| .addKeyValue("lastCompleteSegment", decoder.getLastCompleteSegmentStart()) | ||
| .log("Decode chunk result"); |
There was a problem hiding this comment.
StorageContentValidationDecoderPolicy logs per-buffer and per-chunk decode details at INFO. For downloads this can produce extremely high-volume logs and impact performance/operational noise; consider moving these hot-path logs to verbose/debug level (keeping INFO for one-time lifecycle events or failures).
| LOGGER.atInfo() | ||
| .addKeyValue("newBytes", buffer.remaining()) | ||
| .addKeyValue("pendingBytes", pendingBytes.size()) | ||
| .addKeyValue("decoderOffset", messageOffset) | ||
| .addKeyValue("lastCompleteSegment", lastCompleteSegmentStart) | ||
| .log("Received buffer in decode"); |
There was a problem hiding this comment.
StructuredMessageDecoder emits very frequent INFO logs on the decode hot path (every chunk/header/footer). This will be noisy and can materially slow down high-throughput downloads; consider switching these to verbose/debug and reserving INFO for summary or error conditions.
| byte[] data = "Hello World!".getBytes(); | ||
| long expected = StorageCrc64Calculator.compute(data, 0); |
There was a problem hiding this comment.
Tests use String.getBytes() without specifying a charset, which depends on the platform default. Use a fixed charset (e.g., StandardCharsets.UTF_8) to keep the test deterministic across environments.
| byte[] data = "Hello World!".getBytes(); | ||
| long expected = StorageCrc64Calculator.compute(data, 0); |
There was a problem hiding this comment.
Tests use String.getBytes() without specifying a charset, which depends on the platform default. Use a fixed charset (e.g., StandardCharsets.UTF_8) to keep the test deterministic across environments.
| byte[] originalData = new byte[0]; | ||
|
|
||
| // For zero-length data, encoder behavior varies - let's test with minimal data | ||
| byte[] minimalData = new byte[1]; | ||
| ThreadLocalRandom.current().nextBytes(minimalData); |
There was a problem hiding this comment.
In handlesZeroLengthSegment, the local variable originalData is unused (and the test ends up encoding/decoding minimalData instead). Removing the unused variable and/or adjusting the test name/comment to match what is actually being validated will improve clarity and avoid style-check failures.
|
|
||
| com.azure.storage.common.ParallelTransferOptions parallelTransferOptions = options.getParallelTransferOptions(); | ||
| Integer maxConcurrency = parallelTransferOptions == null ? null : parallelTransferOptions.getMaxConcurrency(); | ||
| if (maxConcurrency != null && maxConcurrency <= 1) { |
There was a problem hiding this comment.
adjustOptionsForSyncDownload forces maxConcurrency to 1 whenever it’s null or > 1. This changes the behavior/performance characteristics of sync downloadToFile* calls even when the caller explicitly requested parallelism; if this is required for a specific scenario (e.g., structured decoding limitations), consider gating it to that scenario and/or documenting the behavior clearly.
| if (maxConcurrency != null && maxConcurrency <= 1) { | |
| /* | |
| * For synchronous downloads, we default to a concurrency of 1 when the caller has not explicitly set a | |
| * maxConcurrency value. If the caller has provided a non-null maxConcurrency (including values greater | |
| * than 1), we preserve that configuration rather than overriding it here. | |
| */ | |
| if (maxConcurrency != null) { |
| </encoder> | ||
| </appender> | ||
|
|
||
| <root level="INFO"> |
There was a problem hiding this comment.
Setting the test logging root level to INFO will emit a lot of logs across the entire azure-storage-blob test suite, which can slow tests and clutter CI output. Consider raising this to WARN/ERROR (or scoping INFO to specific loggers) and enabling INFO only when troubleshooting.
| <root level="INFO"> | |
| <!-- Use WARN by default to keep test and CI output concise. Temporarily raise to INFO when troubleshooting. --> | |
| <root level="WARN"> |
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Show resolved
Hide resolved
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Show resolved
Hide resolved
...azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java
Show resolved
Hide resolved
ibrandes
left a comment
There was a problem hiding this comment.
couple more things i noticed:
...rage-blob/src/main/java/com/azure/storage/blob/implementation/util/ChunkedDownloadUtils.java
Outdated
Show resolved
Hide resolved
| if (JDK_HTTP_HTTP_CLIENT == null) { | ||
| throw new IllegalStateException( | ||
| "JDK HTTP client is not available (e.g. requires Java 19+ for virtual threads). " | ||
| + "Use NETTY, OK_HTTP, or VERTX instead."); | ||
| } |
| /** | ||
| * Structured message header value for CRC64 validation. | ||
| */ | ||
| public static final String STRUCTURED_MESSAGE_CRC64_BODY_TYPE = "XSM/1.0; properties=crc64"; |
There was a problem hiding this comment.
this is already in StructuredMessageConstants.java
| final StorageChecksumAlgorithm algorithm | ||
| = responseChecksumAlgorithm != null ? responseChecksumAlgorithm : StorageChecksumAlgorithm.NONE; | ||
| final boolean isStructuredMessageEnabled = isStructuredMessageAlgorithm(algorithm); | ||
| final boolean isMd5Enabled = algorithm == StorageChecksumAlgorithm.MD5; | ||
|
|
||
| final Boolean finalGetMD5 = (!isStructuredMessageEnabled && (getRangeContentMd5 || isMd5Enabled)) ? true : null; |
There was a problem hiding this comment.
the logic for resolving the validation type and adding flags to the context can be consolidated into util methods. by consolidating it, instead of re-writing the logic for every client method, we ideally just have to call the util method. i have an example of this in my encoder PR here: link. ideally, after a couple of review cycles, we can combine our util methods into one file, but for now, it would be good to create your own to establish the pattern.
| private static final class Md5TrackingAsynchronousByteChannel implements AsynchronousByteChannel { | ||
| private final AsynchronousByteChannel channel; | ||
| private final MessageDigest digest; | ||
|
|
||
| long position = chunkNum * finalParallelTransferOptions.getBlockSizeLong(); | ||
| return response.writeValueToAsync(IOUtils.toAsynchronousByteChannel(file, position), progressReporter); | ||
| private Md5TrackingAsynchronousByteChannel(AsynchronousByteChannel channel) { | ||
| this.channel = channel; | ||
| try { | ||
| this.digest = MessageDigest.getInstance("MD5"); | ||
| } catch (NoSuchAlgorithmException e) { | ||
| throw LOGGER.logExceptionAsError(new IllegalStateException("MD5 MessageDigest unavailable.", e)); | ||
| } | ||
| } | ||
|
|
||
| private byte[] getMd5() { | ||
| synchronized (digest) { | ||
| return digest.digest(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) { | ||
| channel.read(dst, attachment, handler); | ||
| } | ||
|
|
||
| @Override | ||
| public Future<Integer> read(ByteBuffer dst) { | ||
| return channel.read(dst); | ||
| } | ||
|
|
||
| @Override | ||
| public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) { | ||
| int startPos = src.position(); | ||
| ByteBuffer duplicate = src.duplicate(); | ||
| channel.write(src, attachment, new CompletionHandler<Integer, A>() { | ||
| @Override | ||
| public void completed(Integer result, A att) { | ||
| if (result != null && result > 0) { | ||
| updateMd5(duplicate, startPos, result); | ||
| } | ||
| handler.completed(result, att); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(Throwable exc, A att) { | ||
| handler.failed(exc, att); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public Future<Integer> write(ByteBuffer src) { | ||
| CompletableFuture<Integer> future = new CompletableFuture<>(); | ||
| int startPos = src.position(); | ||
| ByteBuffer duplicate = src.duplicate(); | ||
| channel.write(src, src, new CompletionHandler<Integer, ByteBuffer>() { | ||
| @Override | ||
| public void completed(Integer result, ByteBuffer attachment) { | ||
| if (result != null && result > 0) { | ||
| updateMd5(duplicate, startPos, result); | ||
| } | ||
| future.complete(result); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(Throwable exc, ByteBuffer attachment) { | ||
| future.completeExceptionally(exc); | ||
| } | ||
| }); | ||
| return future; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isOpen() { | ||
| return channel.isOpen(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| channel.close(); | ||
| } | ||
|
|
||
| private void updateMd5(ByteBuffer buffer, int startPos, int length) { | ||
| if (length <= 0) { | ||
| return; | ||
| } | ||
|
|
||
| ByteBuffer slice = buffer.duplicate(); | ||
| slice.position(startPos); | ||
| slice.limit(startPos + length); | ||
| synchronized (digest) { | ||
| digest.update(slice); | ||
| } | ||
| } |
There was a problem hiding this comment.
adding support for md5 isn't in the scope of content validation - this can be removed.
| = (range, conditions) -> isStructuredMessageEnabled | ||
| ? this.downloadStreamWithResponseInternal(range, downloadRetryOptions, conditions, rangeGetContentMd5, | ||
| responseChecksumAlgorithm, downloadContext) | ||
| : this.downloadStreamWithResponse(range, downloadRetryOptions, conditions, rangeGetContentMd5, | ||
| downloadContext); | ||
| BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> emptyBlobDownloadFunc = (range, | ||
| conditions) -> this.downloadStreamWithResponse(range, downloadRetryOptions, conditions, false, context); | ||
|
|
||
| boolean checksumValidationEnabled = isStructuredMessageEnabled || rangeGetContentMd5 || isMd5Enabled; | ||
| boolean md5ValidationEnabled = !isStructuredMessageEnabled && (rangeGetContentMd5 || isMd5Enabled); | ||
|
|
||
| long rangeSize = blockSizeProvided | ||
| ? Math.min(finalParallelTransferOptions.getBlockSizeLong(), BlobConstants.BLOB_MAX_DOWNLOAD_BYTES) | ||
| : (checksumValidationEnabled | ||
| ? BlobConstants.BLOB_MAX_HASH_REQUEST_DOWNLOAD_RANGE | ||
| : BlobConstants.BLOB_DEFAULT_DOWNLOAD_RANGE_SIZE); | ||
|
|
||
| Long requestedInitialTransferSize = finalParallelTransferOptions.getInitialTransferSizeLong(); | ||
| long initialRangeSize = requestedInitialTransferSize != null && requestedInitialTransferSize > 0 | ||
| ? requestedInitialTransferSize | ||
| : (checksumValidationEnabled | ||
| ? BlobConstants.BLOB_MAX_HASH_REQUEST_DOWNLOAD_RANGE | ||
| : BlobConstants.BLOB_DEFAULT_INITIAL_DOWNLOAD_RANGE_SIZE); | ||
|
|
||
| int maxConcurrency = maxConcurrencyProvided | ||
| ? finalParallelTransferOptions.getMaxConcurrency() | ||
| : getDefaultDownloadConcurrency(); | ||
|
|
||
| com.azure.storage.common.ParallelTransferOptions initialParallelTransferOptions | ||
| = new com.azure.storage.common.ParallelTransferOptions().setBlockSizeLong(initialRangeSize); | ||
|
|
||
| boolean useMasterCrc = isStructuredMessageEnabled; | ||
|
|
||
| LOGGER.atVerbose() | ||
| .addKeyValue("thread", Thread.currentThread().getName()) | ||
| .log("BlobAsyncClientBase.downloadToFileImpl calling downloadFirstChunk"); | ||
| return ChunkedDownloadUtils | ||
| .downloadFirstChunk(finalRange, finalParallelTransferOptions, requestConditions, downloadFunc, true, | ||
| context) | ||
| .downloadFirstChunk(finalRange, initialParallelTransferOptions, requestConditions, downloadFunc, | ||
| emptyBlobDownloadFunc, true, downloadContext) | ||
| .doOnSuccess(t -> LOGGER.atVerbose() | ||
| .addKeyValue("newCount", t != null ? t.getT1() : null) | ||
| .addKeyValue("thread", Thread.currentThread().getName()) | ||
| .log("BlobAsyncClientBase downloadFirstChunk returned")) | ||
| .flatMap(setupTuple3 -> { | ||
| long newCount = setupTuple3.getT1(); | ||
| BlobRequestConditions finalConditions = setupTuple3.getT2(); | ||
| BlobDownloadAsyncResponse initialResponse = setupTuple3.getT3(); | ||
| LOGGER.atVerbose() | ||
| .addKeyValue("newCount", newCount) | ||
| .addKeyValue("thread", Thread.currentThread().getName()) | ||
| .log("BlobAsyncClientBase flatMap after first chunk"); | ||
|
|
||
| if (initialResponse.getStatusCode() == 304 || newCount == 0) { | ||
| return Mono.fromCallable(() -> { | ||
| Response<BlobProperties> propertiesResponse | ||
| = ModelHelper.buildBlobPropertiesResponse(initialResponse); | ||
| try { | ||
| initialResponse.close(); | ||
| } catch (IOException e) { | ||
| throw LOGGER.logExceptionAsError(new UncheckedIOException(e)); | ||
| } | ||
| return propertiesResponse; | ||
| }); | ||
| } |
There was a problem hiding this comment.
we should figure out how to consolidate this logic more. i don't think its realistic to apply all of this logic to a method every time we need to add content validation to a new API. i think it would be beneficial to look into how our existing decryption policy works and determine what patterns we can re-use from there.
| private static Mono<Void> drainQueuedResponses(ArrayDeque<CompletableFuture<BlobDownloadAsyncResponse>> running, | ||
| Iterator<BlobRange> remainingIterator, | ||
| BiFunction<BlobRange, BlobRequestConditions, Mono<BlobDownloadAsyncResponse>> downloadFunc, | ||
| BlobRequestConditions finalConditions, AsynchronousByteChannel targetChannel, ProgressReporter progressReporter, | ||
| boolean useMasterCrc, ComposedCrcState composedCrcState, boolean md5ValidationEnabled) { | ||
| return Mono.defer(() -> { | ||
| CompletableFuture<BlobDownloadAsyncResponse> nextFuture = running.poll(); | ||
| if (nextFuture == null) { | ||
| return Mono.empty(); | ||
| } | ||
|
|
||
| if (remainingIterator.hasNext()) { | ||
| BlobRange nextRange = remainingIterator.next(); | ||
| running.add(downloadFunc.apply(nextRange, finalConditions).toFuture()); | ||
| } | ||
|
|
||
| return Mono.fromFuture(nextFuture) | ||
| .flatMap(response -> writeBodyToFile(response, targetChannel, | ||
| progressReporter == null ? null : progressReporter.createChild(), useMasterCrc, composedCrcState, | ||
| md5ValidationEnabled).doFinally(signalType -> closeQuietly(response))) | ||
| .then(Mono.defer(() -> drainQueuedResponses(running, remainingIterator, downloadFunc, finalConditions, | ||
| targetChannel, progressReporter, useMasterCrc, composedCrcState, md5ValidationEnabled))); | ||
| }); | ||
| } | ||
|
|
||
| private static void closePendingResponses(ArrayDeque<CompletableFuture<BlobDownloadAsyncResponse>> running) { | ||
| CompletableFuture<BlobDownloadAsyncResponse> future; | ||
| while ((future = running.poll()) != null) { | ||
| future.whenComplete((response, throwable) -> { | ||
| if (response != null) { | ||
| closeQuietly(response); | ||
| } | ||
| }); | ||
| if (!future.isDone()) { | ||
| future.cancel(true); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
i'm not sure i understand why it is necessary to utilize AsynchronousByteChannel in the client when we hadn't done so before.
| private static void validateResponseMd5(BlobDownloadAsyncResponse response, | ||
| Md5TrackingAsynchronousByteChannel md5Channel) { | ||
| if (md5Channel == null) { | ||
| return; | ||
| } | ||
|
|
||
| byte[] expectedMd5 | ||
| = response.getDeserializedHeaders() == null ? null : response.getDeserializedHeaders().getContentMd5(); | ||
| if (expectedMd5 == null || expectedMd5.length == 0) { | ||
| throw LOGGER.logExceptionAsError( | ||
| new IllegalArgumentException("Content-MD5 header missing from download response.")); | ||
| } | ||
|
|
||
| byte[] actualMd5 = md5Channel.getMd5(); | ||
| if (!Arrays.equals(expectedMd5, actualMd5)) { | ||
| throw LOGGER | ||
| .logExceptionAsError(new IllegalArgumentException("MD5 mismatch detected in download response.")); | ||
| } | ||
| } |
There was a problem hiding this comment.
can be removed - md5 is not in the scope of this project.
| private static final class ComposedCrcState { | ||
| private long crc; | ||
| private long length; | ||
|
|
||
| private void append(long nextCrc, long nextLength) { | ||
| if (nextLength <= 0) { | ||
| return; | ||
| } | ||
|
|
||
| if (length == 0) { | ||
| crc = nextCrc; | ||
| length = nextLength; | ||
| return; | ||
| } | ||
|
|
||
| crc = StorageCrc64Calculator.concat(0, 0, crc, length, 0, nextCrc, nextLength); | ||
| length += nextLength; | ||
| } | ||
|
|
||
| private long getCrc() { | ||
| return crc; | ||
| } | ||
|
|
||
| private long getLength() { | ||
| return length; | ||
| } | ||
| } |
There was a problem hiding this comment.
is this necessary? i thought cumulative crc64 calculations were something that was going to be added on later down the line?
| /** | ||
| * The legacy default number of concurrent transfers for download operations. | ||
| */ | ||
| public static final int BLOB_LEGACY_DEFAULT_CONCURRENT_TRANSFERS_COUNT = 5; | ||
| /** | ||
| * The default range size used for download operations when no checksum validation is requested. | ||
| */ | ||
| public static final long BLOB_DEFAULT_DOWNLOAD_RANGE_SIZE = 4L * Constants.MB; | ||
| /** | ||
| * The default initial range size used for download operations when no checksum validation is requested. | ||
| */ | ||
| public static final long BLOB_DEFAULT_INITIAL_DOWNLOAD_RANGE_SIZE = 256L * Constants.MB; | ||
| /** | ||
| * The maximum range size used for download operations. | ||
| */ | ||
| public static final long BLOB_MAX_DOWNLOAD_BYTES = 256L * Constants.MB; | ||
| /** | ||
| * The maximum range size used when requesting a transactional checksum during download. | ||
| */ | ||
| public static final long BLOB_MAX_HASH_REQUEST_DOWNLOAD_RANGE = 4L * Constants.MB; |
There was a problem hiding this comment.
yeah, i think we should absolutely keep our changes as minimal as possible and revert it. changing these default values can have a huge impact on the entirety of the SDK, and it would require a significant amount of java specific research to determine if these numbers are right for our language. also, it may be necessary to re-run perf tests as these concurrency values can have a big impact on the previously generated numbers.
| /** | ||
| * Gets the size of the first range requested when downloading. | ||
| * @return The initial transfer size. | ||
| */ | ||
| public Long getInitialTransferSizeLong() { | ||
| return this.initialTransferSize; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the size of the first range requested when downloading. | ||
| * This value may be larger than the block size used for subsequent ranges. | ||
| * | ||
| * @param initialTransferSize The initial transfer size. | ||
| * @return The ParallelTransferOptions object itself. | ||
| */ | ||
| public ParallelTransferOptions setInitialTransferSizeLong(Long initialTransferSize) { | ||
| if (initialTransferSize != null) { | ||
| StorageImplUtils.assertInBounds("initialTransferSize", initialTransferSize, 1, Long.MAX_VALUE); | ||
| } | ||
| this.initialTransferSize = initialTransferSize; | ||
| return this; | ||
| } | ||
|
|
There was a problem hiding this comment.
this feels like a separate feature to me - i don't think it is necessary for content validation specifically. we should remove it and keep our changes minimal.
| return drainQueuedResponses(running, remainingIterator, downloadFunc, finalConditions, targetChannel, | ||
| progressReporter, useMasterCrc, composedCrcState, md5ValidationEnabled) | ||
| .doFinally(signalType -> closePendingResponses(running)) | ||
| .then(Mono.fromCallable(() -> { | ||
| if (useMasterCrc) { | ||
| validateComposedCrc(composedCrcState, crcChannel); | ||
| } | ||
| return ModelHelper.buildBlobPropertiesResponse(initialResponse); | ||
| })); | ||
| }); | ||
| } |
There was a problem hiding this comment.
also, by removing the composedCrc logic, we can cut down on a lot of this additional logic.
| try { | ||
| final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions | ||
| = ModelHelper.wrapBlobOptions(ModelHelper.populateAndApplyDefaults(parallelTransferOptions)); | ||
| = parallelTransferOptions == null ? null : ModelHelper.wrapBlobOptions(parallelTransferOptions); |
There was a problem hiding this comment.
we need to apply defaults here - finalParallelTransferOptions = ModelHelper.wrapBlobOptions(ModelHelper.populateAndApplyDefaults(parallelTransferOptions));
| BlobRequestConditions finalConditions | ||
| = options.getRequestConditions() == null ? new BlobRequestConditions() : options.getRequestConditions(); | ||
|
|
||
| // Default behavior is not to overwrite |
| int numChunks = ChunkedDownloadUtils.calculateNumBlocks(newCount, | ||
| finalParallelTransferOptions.getBlockSizeLong()); | ||
|
|
||
| // In case it is an empty blob, this ensures we still actually perform a download operation. |
| progressReporter == null ? null : progressReporter.createChild()).flux()), | ||
| finalParallelTransferOptions.getMaxConcurrency()) | ||
|
|
||
| // Only the first download call returns a value. |
|
|
||
| BlobRange finalRange = options.getRange() == null ? new BlobRange(0) : options.getRange(); | ||
| final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions | ||
| com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions |
There was a problem hiding this comment.
final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions
| } | ||
| policies.add(new MetadataValidationPolicy()); | ||
|
|
||
| policies.add(new StorageContentValidationDecoderPolicy()); |
There was a problem hiding this comment.
a thought came up in my head, and I think it might be worth looking into - if we insert the decoder policy at the start of the pipeline (i.e. right after List<HttpPipelinePolicy> policies = new ArrayList<>();), i think it could simplify the retry logic a lot. for standard http retries, because the decryption policy is placed before the retry policy, each retry re-enters the policy with a fresh response body, so we won't need to change anything there. and for smart retries, i think we should be able to rely on the existing logic in BlobAsyncClientBase.downloadStreamWithResponseInternal, .. i verified this in our crypto package - the decoder there also doesn't do anything special to handle retries. it calculates everything based on the values passed in through the context in EncryptedBlobClient.populateRequestConditionsAndContext. will follow up over teams with more info about the crypto client.
| uCrc0 = uCrc; | ||
|
|
||
| ByteBuffer buffer = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN); | ||
| ByteBuffer buffer = ByteBuffer.wrap(src, offset, length).order(ByteOrder.LITTLE_ENDIAN); |
There was a problem hiding this comment.
you don't need to change anything else in compute besides the first two lines:
int pData = offset;
long uSize = length;
| Duration timeout, Context context) { | ||
| final com.azure.storage.common.ParallelTransferOptions finalParallelTransferOptions | ||
| = ModelHelper.wrapBlobOptions(ModelHelper.populateAndApplyDefaults(parallelTransferOptions)); | ||
| = parallelTransferOptions == null ? null : ModelHelper.wrapBlobOptions(parallelTransferOptions); |
There was a problem hiding this comment.
also need to apply defaults here
| * This method processes the input data in chunks of 32 bytes for efficiency and uses lookup tables | ||
| * to update the CRC values. |
There was a problem hiding this comment.
i believe this still applies to how we process the data, we should add it back
No description provided.