|
14 | 14 | import com.azure.storage.blob.options.BlobDownloadContentOptions; |
15 | 15 | import com.azure.storage.blob.options.BlobDownloadStreamOptions; |
16 | 16 | import com.azure.storage.blob.options.BlobDownloadToFileOptions; |
| 17 | +import com.azure.storage.blob.options.BlobUploadFromFileOptions; |
17 | 18 | import com.azure.storage.common.ParallelTransferOptions; |
18 | 19 | import com.azure.storage.common.ContentValidationAlgorithm; |
19 | 20 | import com.azure.storage.common.implementation.Constants; |
|
23 | 24 | import org.junit.jupiter.api.Test; |
24 | 25 | import org.junit.jupiter.api.io.TempDir; |
25 | 26 | import org.junit.jupiter.params.ParameterizedTest; |
| 27 | +import org.junit.jupiter.params.provider.EnumSource; |
| 28 | +import org.junit.jupiter.params.provider.MethodSource; |
26 | 29 | import org.junit.jupiter.params.provider.ValueSource; |
27 | 30 | import reactor.core.publisher.Flux; |
28 | 31 | import reactor.test.StepVerifier; |
|
34 | 37 | import java.nio.ByteBuffer; |
35 | 38 | import java.nio.file.Files; |
36 | 39 | import java.nio.file.Path; |
| 40 | +import java.util.ArrayList; |
37 | 41 | import java.util.List; |
38 | 42 | import java.util.concurrent.CopyOnWriteArrayList; |
39 | 43 | import java.util.concurrent.atomic.AtomicLong; |
40 | 44 |
|
| 45 | +import static org.junit.jupiter.api.Assertions.assertArrayEquals; |
41 | 46 | import static org.junit.jupiter.api.Assertions.assertEquals; |
42 | 47 | import static org.junit.jupiter.api.Assertions.assertFalse; |
43 | 48 | import static org.junit.jupiter.api.Assertions.assertNotNull; |
|
50 | 55 | public class BlobContentValidationAsyncDownloadTests extends BlobTestBase { |
51 | 56 | private static final int TEN_MB = 10 * Constants.MB; |
52 | 57 | private static final int BLOCK_SIZE = 4 * Constants.MB; |
| 58 | + /** |
| 59 | + * {@link BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases()} starts at ~96 MiB; above this threshold fuzzy |
| 60 | + * parallel download helpers use temp files + {@link BlobTestBase#compareFiles(File, File, long, long)} so the full |
| 61 | + * payload never lives twice in heap. |
| 62 | + */ |
| 63 | + private static final int FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES = 96 * Constants.MB; |
| 64 | + |
| 65 | + /** |
| 66 | + * Live-only random payload band for the dedicated random-size parallel-download fuzzy test |
| 67 | + * ({@link #fuzzyParallelDownloadLiveRandomRoundTrip(ContentValidationAlgorithm)}): each run draws a per-run |
| 68 | + * payload size in {@code (256 MiB, 500 MiB]} (matches the encoder fuzzy upload range) so the structured-message |
| 69 | + * decoder is exercised against payloads whose size varies per run in addition to the random byte contents. |
| 70 | + */ |
| 71 | + private static final long LIVE_RANDOM_PARALLEL_DOWNLOAD_PAYLOAD_MIN_BYTES_EXCLUSIVE = 256L * Constants.MB; |
| 72 | + private static final long LIVE_RANDOM_PARALLEL_DOWNLOAD_PAYLOAD_MAX_BYTES_INCLUSIVE = 500L * Constants.MB; |
| 73 | + |
| 74 | + private final List<File> createdFiles = new ArrayList<>(); |
53 | 75 |
|
54 | 76 | private File createRandomFile(Path tempDir, int size) throws IOException { |
55 | 77 | File file = Files.createTempFile(tempDir, "blob-cv-source", ".bin").toFile(); |
@@ -499,4 +521,161 @@ long getReportedByteCount() { |
499 | 521 | return this.reportedByteCount.get(); |
500 | 522 | } |
501 | 523 | } |
| 524 | + |
| 525 | + // ---------- Fuzzy parallel download (deterministic grids) ---------- |
| 526 | + |
| 527 | + @ParameterizedTest |
| 528 | + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadReplayableCases") |
| 529 | + public void fuzzyParallelDownloadReplayableRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) |
| 530 | + throws IOException { |
| 531 | + assertParallelDownloadFuzzyRoundTripAsync("replayable", payloadBytes, blockSizeBytes, maxConcurrency); |
| 532 | + } |
| 533 | + |
| 534 | + @LiveOnly // payload > blockSize with tiny totals; many small range GETs not replayable under the proxy. |
| 535 | + @ParameterizedTest |
| 536 | + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadSmallMultiPartCases") |
| 537 | + public void fuzzyParallelDownloadSmallMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) |
| 538 | + throws IOException { |
| 539 | + assertParallelDownloadFuzzyRoundTripAsync("smallMultiPart", payloadBytes, blockSizeBytes, maxConcurrency); |
| 540 | + } |
| 541 | + |
| 542 | + @LiveOnly // sub-4 MiB chunked range GETs not replayable under the proxy. |
| 543 | + @ParameterizedTest |
| 544 | + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadSubFourMiBCases") |
| 545 | + public void fuzzyParallelDownloadSubFourMiBRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) |
| 546 | + throws IOException { |
| 547 | + assertParallelDownloadFuzzyRoundTripAsync("subFourMiB", payloadBytes, blockSizeBytes, maxConcurrency); |
| 548 | + } |
| 549 | + |
| 550 | + @LiveOnly // 4 MiB boundary tuples that fan out into chunked range GETs. |
| 551 | + @ParameterizedTest |
| 552 | + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadFourMiBBoundaryCases") |
| 553 | + public void fuzzyParallelDownloadFourMiBBoundaryRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) |
| 554 | + throws IOException { |
| 555 | + assertParallelDownloadFuzzyRoundTripAsync("fourMiBBoundary", payloadBytes, blockSizeBytes, maxConcurrency); |
| 556 | + } |
| 557 | + |
| 558 | + @LiveOnly // payload > blockSize for every tuple; chunked range GETs across many requests. |
| 559 | + @ParameterizedTest |
| 560 | + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadMediumMultiPartCases") |
| 561 | + public void fuzzyParallelDownloadMediumMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) |
| 562 | + throws IOException { |
| 563 | + assertParallelDownloadFuzzyRoundTripAsync("mediumMultiPart", payloadBytes, blockSizeBytes, maxConcurrency); |
| 564 | + } |
| 565 | + |
| 566 | + @LiveOnly // payload >> blockSize; ~96-320 MiB downloads. |
| 567 | + @ParameterizedTest |
| 568 | + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadLargeMultiPartCases") |
| 569 | + public void fuzzyParallelDownloadLargeMultiPartRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) |
| 570 | + throws IOException { |
| 571 | + assertParallelDownloadFuzzyRoundTripAsync("largeMultiPart", payloadBytes, blockSizeBytes, maxConcurrency); |
| 572 | + } |
| 573 | + |
| 574 | + @LiveOnly // ~1 GiB single case; far too large for the test proxy. |
| 575 | + @ParameterizedTest |
| 576 | + @MethodSource("com.azure.storage.blob.BlobTestBase#fuzzyParallelDownloadOneGiBCases") |
| 577 | + public void fuzzyParallelDownloadOneGiBRoundTrip(int payloadBytes, long blockSizeBytes, int maxConcurrency) |
| 578 | + throws IOException { |
| 579 | + assertParallelDownloadFuzzyRoundTripAsync("oneGiB", payloadBytes, blockSizeBytes, maxConcurrency); |
| 580 | + } |
| 581 | + |
| 582 | + /** |
| 583 | + * Live-only random-size parallel download fuzzy round-trip. Each run draws a per-run payload size in |
| 584 | + * {@code (256 MiB, 500 MiB]} (matches the encoder fuzzy upload range) and exercises both CRC64 and AUTO |
| 585 | + * content-validation algorithms so the structured-message decoder is tested against payloads whose total size |
| 586 | + * varies per run in addition to the random byte contents that the deterministic grids already exercise. Kept |
| 587 | + * separate from the parameterized {@link #fuzzyParallelDownloadLargeMultiPartRoundTrip(int, long, int)} so the |
| 588 | + * deterministic per-grid round-trips and the randomized round-trip don't share work or cost. |
| 589 | + */ |
| 590 | + @LiveOnly |
| 591 | + @ParameterizedTest |
| 592 | + @EnumSource(value = ContentValidationAlgorithm.class, names = { "CRC64", "AUTO" }) |
| 593 | + public void fuzzyParallelDownloadLiveRandomRoundTrip(ContentValidationAlgorithm algorithm) throws IOException { |
| 594 | + int sizeBytes = (int) randomLongFromNamer(LIVE_RANDOM_PARALLEL_DOWNLOAD_PAYLOAD_MIN_BYTES_EXCLUSIVE + 1, |
| 595 | + LIVE_RANDOM_PARALLEL_DOWNLOAD_PAYLOAD_MAX_BYTES_INCLUSIVE + 1); |
| 596 | + assertParallelDownloadFuzzyRoundTripAsync("liveRandom", sizeBytes, 8L * Constants.MB, 8, algorithm); |
| 597 | + } |
| 598 | + |
| 599 | + private void assertParallelDownloadFuzzyRoundTripAsync(String caseKind, int payloadBytes, long blockSizeBytes, |
| 600 | + int maxConcurrency) throws IOException { |
| 601 | + assertParallelDownloadFuzzyRoundTripAsync(caseKind, payloadBytes, blockSizeBytes, maxConcurrency, |
| 602 | + ContentValidationAlgorithm.CRC64); |
| 603 | + } |
| 604 | + |
| 605 | + private void assertParallelDownloadFuzzyRoundTripAsync(String caseKind, int payloadBytes, long blockSizeBytes, |
| 606 | + int maxConcurrency, ContentValidationAlgorithm algorithm) throws IOException { |
| 607 | + List<HttpHeaders> recorded = new CopyOnWriteArrayList<>(); |
| 608 | + BlobAsyncClient client = createBlobAsyncClientWithRequestSniffer(recorded); |
| 609 | + |
| 610 | + ParallelTransferOptions parallelOptions |
| 611 | + = new ParallelTransferOptions().setBlockSizeLong(blockSizeBytes).setMaxConcurrency(maxConcurrency); |
| 612 | + |
| 613 | + String assertionMessage = "Fuzzy parallel download [" + caseKind + "] payloadBytes=" + payloadBytes |
| 614 | + + ", blockSize=" + blockSizeBytes + ", maxConcurrency=" + maxConcurrency + ", algorithm=" + algorithm; |
| 615 | + |
| 616 | + if (payloadBytes >= FUZZY_PARALLEL_DOWNLOAD_FILE_ROUND_TRIP_THRESHOLD_BYTES) { |
| 617 | + File sourceFile = getRandomFile(payloadBytes); |
| 618 | + sourceFile.deleteOnExit(); |
| 619 | + createdFiles.add(sourceFile); |
| 620 | + File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-async", ".bin").toFile(); |
| 621 | + outFile.deleteOnExit(); |
| 622 | + createdFiles.add(outFile); |
| 623 | + Files.deleteIfExists(outFile.toPath()); |
| 624 | + |
| 625 | + BlobUploadFromFileOptions uploadOptions |
| 626 | + = new BlobUploadFromFileOptions(sourceFile.getAbsolutePath()).setParallelTransferOptions( |
| 627 | + new com.azure.storage.blob.models.ParallelTransferOptions().setBlockSizeLong(blockSizeBytes) |
| 628 | + .setMaxConcurrency(maxConcurrency)); |
| 629 | + assertNotNull(client.uploadFromFileWithResponse(uploadOptions).block().getValue().getETag(), |
| 630 | + assertionMessage); |
| 631 | + |
| 632 | + BlobDownloadToFileOptions downloadOptions |
| 633 | + = new BlobDownloadToFileOptions(outFile.toPath().toString()).setParallelTransferOptions(parallelOptions) |
| 634 | + .setContentValidationAlgorithm(algorithm); |
| 635 | + |
| 636 | + StepVerifier.create(client.downloadToFileWithResponse(downloadOptions)) |
| 637 | + .assertNext(r -> assertNotNull(r.getValue(), assertionMessage)) |
| 638 | + .verifyComplete(); |
| 639 | + |
| 640 | + assertTrue(compareFiles(sourceFile, outFile, 0, payloadBytes), assertionMessage); |
| 641 | + } else { |
| 642 | + byte[] randomData = getRandomByteArray(payloadBytes); |
| 643 | + client.upload(BinaryData.fromBytes(randomData), true).block(); |
| 644 | + |
| 645 | + if (payloadBytes > blockSizeBytes) { |
| 646 | + File outFile = Files.createTempFile("blob-cv-fuzzy-parallel-dl-async-mp", ".bin").toFile(); |
| 647 | + outFile.deleteOnExit(); |
| 648 | + createdFiles.add(outFile); |
| 649 | + Files.deleteIfExists(outFile.toPath()); |
| 650 | + |
| 651 | + BlobDownloadToFileOptions downloadOptions = new BlobDownloadToFileOptions(outFile.toPath().toString()) |
| 652 | + .setParallelTransferOptions(parallelOptions) |
| 653 | + .setContentValidationAlgorithm(algorithm); |
| 654 | + |
| 655 | + StepVerifier.create(client.downloadToFileWithResponse(downloadOptions)) |
| 656 | + .assertNext(r -> assertNotNull(r.getValue(), assertionMessage)) |
| 657 | + .verifyComplete(); |
| 658 | + |
| 659 | + byte[] downloaded = Files.readAllBytes(outFile.toPath()); |
| 660 | + assertArrayEquals(randomData, downloaded, assertionMessage); |
| 661 | + } else { |
| 662 | + BlobDownloadContentOptions downloadOptions |
| 663 | + = new BlobDownloadContentOptions().setContentValidationAlgorithm(algorithm); |
| 664 | + |
| 665 | + StepVerifier.create(client.downloadContentWithResponse(downloadOptions)) |
| 666 | + .assertNext(r -> assertArrayEquals(randomData, r.getValue().toBytes(), assertionMessage)) |
| 667 | + .verifyComplete(); |
| 668 | + |
| 669 | + BlobDownloadStreamOptions streamOptions |
| 670 | + = new BlobDownloadStreamOptions().setContentValidationAlgorithm(algorithm); |
| 671 | + StepVerifier |
| 672 | + .create(client.downloadStreamWithResponse(streamOptions) |
| 673 | + .flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))) |
| 674 | + .assertNext(bytes -> assertArrayEquals(randomData, bytes, assertionMessage)) |
| 675 | + .verifyComplete(); |
| 676 | + } |
| 677 | + } |
| 678 | + assertTrue(hasStructuredMessageDownloadRequestHeaders(recorded, false), assertionMessage); |
| 679 | + } |
| 680 | + |
502 | 681 | } |
0 commit comments