Skip to content

Commit 20e3303

Browse files
addressing latest review comments
1 parent f72cb8b commit 20e3303

5 files changed

Lines changed: 24 additions & 38 deletions

File tree

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/BlobDownloadAsyncResponse.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import com.azure.storage.blob.implementation.accesshelpers.BlobDownloadAsyncResponseConstructorProxy;
1414
import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders;
1515
import com.azure.storage.blob.implementation.util.ModelHelper;
16-
import com.azure.core.util.logging.ClientLogger;
1716
import reactor.core.publisher.Flux;
1817
import reactor.core.publisher.Mono;
1918

@@ -30,18 +29,8 @@
3029
public final class BlobDownloadAsyncResponse extends ResponseBase<BlobDownloadHeaders, Flux<ByteBuffer>>
3130
implements Closeable {
3231

33-
private static final ClientLogger LOGGER = new ClientLogger(BlobDownloadAsyncResponse.class);
34-
3532
static {
36-
BlobDownloadAsyncResponseConstructorProxy
37-
.setAccessor(new BlobDownloadAsyncResponseConstructorProxy.BlobDownloadAsyncResponseConstructorAccessor() {
38-
@Override
39-
public BlobDownloadAsyncResponse create(StreamResponse sourceResponse,
40-
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume,
41-
DownloadRetryOptions retryOptions) {
42-
return new BlobDownloadAsyncResponse(sourceResponse, onErrorResume, retryOptions);
43-
}
44-
});
33+
BlobDownloadAsyncResponseConstructorProxy.setAccessor(BlobDownloadAsyncResponse::new);
4534
}
4635

4736
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
@@ -67,6 +56,13 @@ public BlobDownloadAsyncResponse(HttpRequest request, int statusCode, HttpHeader
6756
this.retryOptions = null;
6857
}
6958

59+
/**
60+
* Constructs a {@link BlobDownloadAsyncResponse}.
61+
*
62+
* @param sourceResponse The initial Stream Response
63+
* @param onErrorResume Function used to resume.
64+
* @param retryOptions Retry options.
65+
*/
7066
BlobDownloadAsyncResponse(StreamResponse sourceResponse,
7167
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume, DownloadRetryOptions retryOptions) {
7268
super(sourceResponse.getRequest(), sourceResponse.getStatusCode(), sourceResponse.getHeaders(),
@@ -99,12 +95,7 @@ private static Flux<ByteBuffer> createResponseFlux(StreamResponse sourceResponse
9995
*/
10096
public Mono<Void> writeValueToAsync(AsynchronousByteChannel channel, ProgressReporter progressReporter) {
10197
Objects.requireNonNull(channel, "'channel' must not be null");
102-
LOGGER.atVerbose()
103-
.addKeyValue("thread", Thread.currentThread().getName())
104-
.log("BlobDownloadAsyncResponse.writeValueToAsync entry");
10598
if (sourceResponse != null) {
106-
LOGGER.atVerbose()
107-
.log("BlobDownloadAsyncResponse.writeValueToAsync using sourceResponse (IOUtils.transfer)");
10899
return IOUtils.transferStreamResponseToAsynchronousByteChannel(channel, sourceResponse, onErrorResume,
109100
progressReporter, retryOptions.getMaxRetryRequests());
110101
} else if (super.getValue() != null) {

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,6 +1283,10 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponseInternal(BlobRange ran
12831283
BlobDownloadHeaders blobDownloadHeaders = ModelHelper.populateBlobDownloadHeaders(blobsDownloadHeaders,
12841284
ModelHelper.getErrorCode(response.getHeaders()));
12851285

1286+
/*
1287+
* If the customer did not specify a count, they are reading to the end of the blob. Extract this value
1288+
* from the response for better book-keeping towards the end.
1289+
*/
12861290
long finalCount;
12871291
long initialOffset = finalRange.getOffset();
12881292
if (finalRange.getCount() == null) {
@@ -1292,6 +1296,8 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponseInternal(BlobRange ran
12921296
finalCount = finalRange.getCount();
12931297
}
12941298

1299+
// The resume function takes throwable and offset at the destination.
1300+
// I.e. offset is relative to the starting point.
12951301
BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> {
12961302
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
12971303
return Mono.error(throwable);
@@ -1539,6 +1545,7 @@ Mono<Response<BlobProperties>> downloadToFileWithResponse(BlobDownloadToFileOpti
15391545
BlobRequestConditions finalConditions
15401546
= options.getRequestConditions() == null ? new BlobRequestConditions() : options.getRequestConditions();
15411547

1548+
// Default behavior is not to overwrite
15421549
Set<OpenOption> openOptions = options.getOpenOptions();
15431550
if (openOptions == null) {
15441551
openOptions = DEFAULT_OPEN_OPTIONS_SET;

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageAsyncDecoderDownloadTests.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -252,15 +252,13 @@ public void interruptAndVerifyProperRewind() throws IOException {
252252

253253
DownloadRetryOptions retryOptions = new DownloadRetryOptions().setMaxRetryRequests(5);
254254

255-
StepVerifier.create(downloadClient
256-
.downloadStreamWithResponse(new BlobDownloadStreamOptions().setDownloadRetryOptions(retryOptions)
257-
.setResponseChecksumAlgorithm(StorageChecksumAlgorithm.CRC64))
258-
.doFinally(signalType -> {
259-
System.out.println("[MockPartialResponsePolicy] hits=" + mockPolicy.getHits() + ", triesRemaining="
260-
+ mockPolicy.getTriesRemaining() + ", ranges=" + mockPolicy.getRangeHeaders());
261-
assertTrue(mockPolicy.getHits() > 0, "Mock interruption policy was not invoked");
262-
})
263-
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
255+
StepVerifier
256+
.create(downloadClient
257+
.downloadStreamWithResponse(new BlobDownloadStreamOptions().setDownloadRetryOptions(retryOptions)
258+
.setResponseChecksumAlgorithm(StorageChecksumAlgorithm.CRC64))
259+
.doFinally(
260+
signalType -> assertTrue(mockPolicy.getHits() > 0, "Mock interruption policy was not invoked"))
261+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
264262
.assertNext(result -> TestUtils.assertArraysEqual(randomData, result))
265263
.verifyComplete();
266264

sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
9494
return Mono.just(response);
9595
}
9696
hits.incrementAndGet();
97-
System.out.println("[MockPartialResponsePolicy] invoked. tries=" + remainingTries
98-
+ ", maxBytesPerResponse=" + maxBytesPerResponse);
9997

10098
Flux<ByteBuffer> limitedBody = limitStreamToBytes(response.getBody(), maxBytesPerResponse);
10199
return Mono.just(

sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/contentvalidation/StructuredMessageDecoderTests.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.storage.common.implementation.contentvalidation;
55

6+
import com.azure.core.util.FluxUtil;
67
import org.junit.jupiter.api.Test;
78
import reactor.core.publisher.Flux;
89

@@ -25,13 +26,7 @@
2526
public class StructuredMessageDecoderTests {
2627

2728
private static ByteBuffer collectFlux(Flux<ByteBuffer> flux) {
28-
ByteArrayOutputStream out = new ByteArrayOutputStream();
29-
flux.toIterable().forEach(buf -> {
30-
byte[] bytes = new byte[buf.remaining()];
31-
buf.get(bytes);
32-
out.write(bytes, 0, bytes.length);
33-
});
34-
return ByteBuffer.wrap(out.toByteArray()).order(ByteOrder.LITTLE_ENDIAN);
29+
return ByteBuffer.wrap(FluxUtil.collectBytesInByteBufferStream(flux).block()).order(ByteOrder.LITTLE_ENDIAN);
3530
}
3631

3732
@Test
@@ -127,9 +122,6 @@ public void readsSegmentHeaderSplitAcrossChunks() throws IOException {
127122
public void handlesZeroLengthSegment() throws IOException {
128123
// Test: Zero-length segment should decode correctly
129124
// Note: Zero-length segments are valid in the format
130-
byte[] originalData = new byte[0];
131-
132-
// For zero-length data, encoder behavior varies - let's test with minimal data
133125
byte[] minimalData = new byte[1];
134126
ThreadLocalRandom.current().nextBytes(minimalData);
135127

0 commit comments

Comments
 (0)