From 94d8622743d35c0895da6485adb69e78810d29c6 Mon Sep 17 00:00:00 2001 From: Valentin Delaye Date: Sun, 14 Jun 2026 16:13:11 +0200 Subject: [PATCH] Chunk upload Signed-off-by: Valentin Delaye --- src/main/java/land/oras/Registry.java | 152 +++++++++++ src/main/java/land/oras/auth/HttpClient.java | 29 +++ src/test/java/land/oras/RegistryTest.java | 238 +++++++++++++++++- .../java/land/oras/RegistryWireMockTest.java | 46 ++++ 4 files changed, 454 insertions(+), 11 deletions(-) diff --git a/src/main/java/land/oras/Registry.java b/src/main/java/land/oras/Registry.java index d43f68ed..bd4c93de 100644 --- a/src/main/java/land/oras/Registry.java +++ b/src/main/java/land/oras/Registry.java @@ -761,6 +761,158 @@ public Layer pushBlob(ContainerRef containerRef, byte[] data) { return Layer.fromData(ref, data); } + /** + * Push a blob using chunked upload + * + * @param containerRef The container reference + * @param blob The blob file to upload + * @param chunkSize Maximum number of bytes per chunk. Must be > 0. + * @return The {@link Layer} descriptor for the uploaded blob. + * @throws OrasException if the upload fails at any stage. + */ + public Layer pushBlobChunked(ContainerRef containerRef, Path blob, long chunkSize) { + if (chunkSize <= 0) { + throw new OrasException("chunkSize must be greater than 0"); + } + String digest = containerRef.getAlgorithm().digest(blob); + ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this); + if (ref.isInsecure(this) && !this.isInsecure()) { + return asInsecure().pushBlobChunked(containerRef, blob, chunkSize); + } + if (hasBlob(ref.withDigest(digest))) { + LOG.info("Blob already exists: {}", digest); + return Layer.fromFile(blob, ref.getAlgorithm()); + } + String location = initiateChunkedUpload(ref); + try (InputStream is = Files.newInputStream(blob)) { + long totalSize = Files.size(blob); + location = uploadChunks(ref, is, totalSize, chunkSize, location); + } catch (IOException e) { + throw new OrasException("Failed to read blob for chunked upload: %s".formatted(blob), e); + } + finalizeChunkedUpload(ref, location, digest); + return Layer.fromFile(blob, ref.getAlgorithm()); + } + + /** + * Push a blob using chunked upload + * + * @param containerRef The container reference + * @param stream Input stream + * @param totalSize Total size of the stream + * @param chunkSize Maximum number of bytes per chunk. Must be > 0. + * @return The {@link Layer} descriptor for the uploaded blob. + * @throws OrasException if the upload fails at any stage. + */ + public Layer pushBlobChunked(ContainerRef containerRef, InputStream stream, long totalSize, long chunkSize) { + String digest = containerRef.getDigest(); + if (digest == null) { + throw new OrasException("Digest is required to push blob with chunked stream upload"); + } + if (chunkSize <= 0) { + throw new OrasException("chunkSize must be greater than 0"); + } + ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this); + if (ref.isInsecure(this) && !this.isInsecure()) { + return asInsecure().pushBlobChunked(containerRef, stream, totalSize, chunkSize); + } + if (hasBlob(ref)) { + LOG.info("Blob already exists: {}", digest); + return Layer.fromDigest(digest, totalSize); + } + String location = initiateChunkedUpload(ref); + location = uploadChunks(ref, stream, totalSize, chunkSize, location); + finalizeChunkedUpload(ref, location, digest); + return Layer.fromDigest(digest, totalSize); + } + + private String initiateChunkedUpload(ContainerRef ref) { + URI uri = URI.create("%s://%s".formatted(getScheme(), ref.getBlobsUploadPath(this))); + HttpClient.ResponseWrapper response = client.post( + uri, + new byte[0], + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE), + Scopes.of(ref), + authProvider); + logResponse(response); + if (response.statusCode() != 202) { + throw new OrasException( + "Failed to initiate chunked blob upload: status %d".formatted(response.statusCode())); + } + String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase()); + if (!location.startsWith("http://") && !location.startsWith("https://")) { + location = "%s://%s/%s".formatted(getScheme(), ref.getApiRegistry(this), location.replaceFirst("^/", "")); + } + LOG.debug("Chunked upload session location: {}", location); + return location; + } + + private String uploadChunks(ContainerRef ref, InputStream stream, long totalSize, long chunkSize, String location) { + long offset = 0; + byte[] buffer = new byte[(int) Math.min(chunkSize, Integer.MAX_VALUE)]; + try { + while (offset < totalSize) { + long remaining = totalSize - offset; + int toRead = (int) Math.min(chunkSize, remaining); + int read = stream.readNBytes(buffer, 0, toRead); + if (read == 0) { + break; + } + long rangeEnd = offset + read - 1; + String contentRange = "%d-%d".formatted(offset, rangeEnd); + final byte[] chunk = java.util.Arrays.copyOf(buffer, read); + URI patchUri = URI.create(location); + HttpClient.ResponseWrapper patchResponse = client.patch( + patchUri, + read, + Map.of( + Const.CONTENT_TYPE_HEADER, + Const.APPLICATION_OCTET_STREAM_HEADER_VALUE, + Const.CONTENT_RANGE_HEADER, + contentRange), + () -> new java.io.ByteArrayInputStream(chunk), + Scopes.of(ref), + authProvider); + logResponse(patchResponse); + if (patchResponse.statusCode() != 202) { + throw new OrasException("Chunked upload PATCH failed for range %s: status %d" + .formatted(contentRange, patchResponse.statusCode())); + } + // The registry MAY return a new location after each PATCH + String newLocation = patchResponse.headers().get(Const.LOCATION_HEADER.toLowerCase()); + if (newLocation != null && !newLocation.isBlank()) { + if (!newLocation.startsWith("http://") && !newLocation.startsWith("https://")) { + newLocation = "%s://%s/%s" + .formatted(getScheme(), ref.getApiRegistry(this), newLocation.replaceFirst("^/", "")); + } + location = newLocation; + LOG.debug("Chunked upload location updated: {}", location); + } + offset += read; + LOG.debug("Uploaded chunk {}-{} ({} bytes)", offset - read, rangeEnd, read); + } + } catch (IOException e) { + throw new OrasException("Failed during chunked blob upload", e); + } + return location; + } + + private void finalizeChunkedUpload(ContainerRef ref, String location, String digest) { + URI putUri = createLocationWithDigest(location, digest); + HttpClient.ResponseWrapper putResponse = client.put( + putUri, + new byte[0], + Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE), + Scopes.of(ref), + authProvider); + logResponse(putResponse); + if (putResponse.statusCode() != 201) { + throw new OrasException( + "Failed to finalize chunked blob upload: status %d".formatted(putResponse.statusCode())); + } + LOG.debug("Chunked upload finalized successfully for digest: {}", digest); + } + /** * Return if the registry contains already the blob * @param containerRef The container diff --git a/src/main/java/land/oras/auth/HttpClient.java b/src/main/java/land/oras/auth/HttpClient.java index 56d9ffb0..d35a3416 100644 --- a/src/main/java/land/oras/auth/HttpClient.java +++ b/src/main/java/land/oras/auth/HttpClient.java @@ -475,6 +475,35 @@ public ResponseWrapper patch( authProvider); } + /** + * Upload a chunk of data from an input stream using PATCH. + * @param uri The URI + * @param chunkSize The size of the chunk in bytes + * @param headers The headers (should include Content-Range) + * @param stream A supplier providing the input stream for this chunk + * @param scopes The scopes + * @param authProvider The authentication provider + * @return The response + */ + public ResponseWrapper patch( + URI uri, + long chunkSize, + Map headers, + Supplier stream, + Scopes scopes, + AuthProvider authProvider) { + return executeRequest( + "PATCH", + uri, + true, + headers, + new byte[0], + HttpResponse.BodyHandlers.ofString(), + HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.ofInputStream(stream), chunkSize), + scopes, + authProvider); + } + /** * Perform a PUT request * @param uri The URI diff --git a/src/test/java/land/oras/RegistryTest.java b/src/test/java/land/oras/RegistryTest.java index 769019e2..0e009792 100644 --- a/src/test/java/land/oras/RegistryTest.java +++ b/src/test/java/land/oras/RegistryTest.java @@ -41,7 +41,6 @@ import land.oras.utils.SupportedCompression; import land.oras.utils.ZotContainer; import land.oras.utils.ZotUnsecureContainer; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -1946,7 +1945,7 @@ void shouldHandleExistingBlobInStreamPush() throws IOException { // Create test file Path testFile = Files.createTempFile("test-data-", ".tmp"); Files.writeString(testFile, "Test Content"); - long fileSize = Files.size(testFile); + Files.size(testFile); String expectedDigest = containerRef.getAlgorithm().digest(testFile); // First push @@ -1965,10 +1964,6 @@ void shouldHandleExistingBlobInStreamPush() throws IOException { assertEquals(expectedDigest, firstLayer.getDigest()); assertEquals(expectedDigest, secondLayer.getDigest()); assertEquals(firstLayer.getSize(), secondLayer.getSize()); - - // Clean up - Files.delete(testFile); - registry.deleteBlob(containerRef.withDigest(firstLayer.getDigest())); } @Test @@ -1988,7 +1983,6 @@ public int read() throws IOException { } }; - // Verify exception is wrapped in OrasException OrasException exception = assertThrows(OrasException.class, () -> registry.pushBlob(containerRef, failingStream)); assertEquals("Failed to push blob", exception.getMessage()); @@ -2052,11 +2046,233 @@ void shouldHandleLargeStreamContent() throws IOException { // Verify content with stream try (InputStream resultStream = registry.getBlobStream(containerRef.withDigest(layer.getDigest()))) { byte[] result = resultStream.readAllBytes(); - Assertions.assertArrayEquals(largeData, result); + assertArrayEquals(largeData, result); } + } - // Clean up - Files.delete(largeFile); - registry.deleteBlob(containerRef.withDigest(layer.getDigest())); + @Test + void shouldPushBlobChunkedFromPath() throws IOException { + Registry registry = Registry.Builder.builder() + .defaults("myuser", "mypass") + .withInsecure(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-chunked-path".formatted(this.registry.getRegistry())); + + // Create a file with known content + byte[] content = "hello chunked world".getBytes(StandardCharsets.UTF_8); + Path blobFile = blobDir.resolve("chunked.txt"); + Files.write(blobFile, content); + + String expectedDigest = SupportedAlgorithm.SHA256.digest(content); + + // Upload in 5-byte chunks + Layer layer = registry.pushBlobChunked(containerRef, blobFile, 5L); + + assertEquals(expectedDigest, layer.getDigest()); + assertEquals(content.length, layer.getSize()); + + // Verify the blob can be retrieved + byte[] blob = registry.getBlob(containerRef.withDigest(expectedDigest)); + assertEquals(new String(content), new String(blob)); + + // Idempotent: second call should short-circuit (blob already exists) + Layer layerAgain = registry.pushBlobChunked(containerRef, blobFile, 5L); + assertEquals(expectedDigest, layerAgain.getDigest()); + } + + @Test + void shouldPushBlobChunkedFromPathWithSingleChunk() throws IOException { + Registry registry = Registry.Builder.builder() + .defaults("myuser", "mypass") + .withInsecure(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-chunked-path-single".formatted(this.registry.getRegistry())); + + // Content that fits in one chunk + byte[] content = "hi".getBytes(StandardCharsets.UTF_8); + Path blobFile = blobDir.resolve("single-chunk.txt"); + Files.write(blobFile, content); + + String expectedDigest = SupportedAlgorithm.SHA256.digest(content); + + // chunk size larger than the content — only one PATCH is issued + Layer layer = registry.pushBlobChunked(containerRef, blobFile, 1024L); + + assertEquals(expectedDigest, layer.getDigest()); + } + + @Test + void shouldPushBlobChunkedFromInputStream() { + Registry registry = Registry.Builder.builder() + .defaults("myuser", "mypass") + .withInsecure(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-chunked-stream".formatted(this.registry.getRegistry())); + + byte[] content = "hello chunked stream".getBytes(StandardCharsets.UTF_8); + String expectedDigest = SupportedAlgorithm.SHA256.digest(content); + + ContainerRef refWithDigest = containerRef.withDigest(expectedDigest); + + // Upload in 4-byte chunks + Layer layer = registry.pushBlobChunked(refWithDigest, new ByteArrayInputStream(content), content.length, 4L); + + assertEquals(expectedDigest, layer.getDigest()); + assertEquals(content.length, layer.getSize()); + + // Verify the blob can be retrieved + byte[] blob = registry.getBlob(containerRef.withDigest(expectedDigest)); + assertEquals(new String(content), new String(blob)); + + // Idempotent: second call should short-circuit (blob already exists) + Layer layerAgain = + registry.pushBlobChunked(refWithDigest, new ByteArrayInputStream(content), content.length, 4L); + assertEquals(expectedDigest, layerAgain.getDigest()); + } + + @Test + void shouldPushLargeBlobChunkedFromPath() throws IOException { + Registry registry = Registry.Builder.builder() + .defaults("myuser", "mypass") + .withInsecure(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-chunked-large".formatted(this.registry.getRegistry())); + + // 1 MB of random content + byte[] content = new byte[1024 * 1024]; + new Random().nextBytes(content); + Path blobFile = blobDir.resolve("large-chunked.bin"); + Files.write(blobFile, content); + + String expectedDigest = SupportedAlgorithm.SHA256.digest(content); + + // 256 KB chunks → 4 PATCH requests + Layer layer = registry.pushBlobChunked(containerRef, blobFile, 256 * 1024L); + + assertEquals(expectedDigest, layer.getDigest()); + assertEquals(content.length, layer.getSize()); + + // Verify the blob can be retrieved + try (InputStream resultStream = registry.getBlobStream(containerRef.withDigest(expectedDigest))) { + byte[] result = resultStream.readAllBytes(); + assertArrayEquals(content, result); + } + } + + @Test + void shouldFailChunkedUploadWithMissingDigestOnInputStream() { + Registry registry = Registry.Builder.builder() + .defaults("myuser", "mypass") + .withInsecure(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-chunked-err".formatted(this.registry.getRegistry())); + + OrasException e = assertThrows( + OrasException.class, + () -> registry.pushBlobChunked( + containerRef, new ByteArrayInputStream("data".getBytes(StandardCharsets.UTF_8)), 4L, 4L)); + assertEquals("Digest is required to push blob with chunked stream upload", e.getMessage()); + } + + @Test + void shouldFailChunkedUploadWithInvalidChunkSize() throws IOException { + Registry registry = Registry.Builder.builder() + .defaults("myuser", "mypass") + .withInsecure(true) + .build(); + ContainerRef containerRef = + ContainerRef.parse("%s/library/artifact-chunked-err".formatted(this.registry.getRegistry())); + + byte[] content = "data".getBytes(StandardCharsets.UTF_8); + Path blobFile = blobDir.resolve("err.txt"); + Files.write(blobFile, content); + + OrasException e1 = + assertThrows(OrasException.class, () -> registry.pushBlobChunked(containerRef, blobFile, 0L)); + assertEquals("chunkSize must be greater than 0", e1.getMessage()); + + String digest = SupportedAlgorithm.SHA256.digest(content); + OrasException e2 = assertThrows( + OrasException.class, + () -> registry.pushBlobChunked( + containerRef.withDigest(digest), new ByteArrayInputStream(content), content.length, 0L)); + assertEquals("chunkSize must be greater than 0", e2.getMessage()); + } + + @Test + @Execution(ExecutionMode.SAME_THREAD) + void shouldPushBlobChunkedFromPathViaInsecureRegistryConfig(@TempDir Path homeDir) throws Exception { + + // Insecure config + String config = + """ + [[registry]] + location = "%s" + insecure = true + """ + .formatted(this.unsecureRegistry.getRegistry()); + TestUtils.createRegistriesConfFile(homeDir, config); + + TestUtils.withHome(homeDir, () -> { + Registry registry = Registry.builder() + .defaults(this.unsecureRegistry.getRegistry()) + .build(); + + byte[] content = "hello chunked insecure path".getBytes(StandardCharsets.UTF_8); + Path blobFile = blobDir.resolve("chunked-insecure-path.txt"); + try { + Files.write(blobFile, content); + } catch (IOException e) { + throw new RuntimeException(e); + } + + String expectedDigest = SupportedAlgorithm.SHA256.digest(content); + ContainerRef containerRef = ContainerRef.parse("library/artifact-chunked-insecure-path"); + + Layer layer = registry.pushBlobChunked(containerRef, blobFile, 8L); + + assertEquals(expectedDigest, layer.getDigest()); + assertEquals(content.length, layer.getSize()); + + registry.deleteBlob(containerRef.withDigest(expectedDigest)); + }); + } + + @Test + @Execution(ExecutionMode.SAME_THREAD) + void shouldPushBlobChunkedFromStreamViaInsecureRegistryConfig(@TempDir Path homeDir) throws Exception { + + // Insecure config + String config = + """ + [[registry]] + location = "%s" + insecure = true + """ + .formatted(this.unsecureRegistry.getRegistry()); + TestUtils.createRegistriesConfFile(homeDir, config); + + TestUtils.withHome(homeDir, () -> { + Registry registry = Registry.builder() + .defaults(this.unsecureRegistry.getRegistry()) + .build(); + + byte[] content = "hello chunked insecure stream".getBytes(StandardCharsets.UTF_8); + String expectedDigest = SupportedAlgorithm.SHA256.digest(content); + ContainerRef containerRef = ContainerRef.parse("library/artifact-chunked-insecure-stream") + .withDigest(expectedDigest); + + Layer layer = registry.pushBlobChunked(containerRef, new ByteArrayInputStream(content), content.length, 8L); + + assertEquals(expectedDigest, layer.getDigest()); + assertEquals(content.length, layer.getSize()); + + registry.deleteBlob(containerRef.withDigest(expectedDigest)); + }); } } diff --git a/src/test/java/land/oras/RegistryWireMockTest.java b/src/test/java/land/oras/RegistryWireMockTest.java index 23a244e8..0dbea057 100644 --- a/src/test/java/land/oras/RegistryWireMockTest.java +++ b/src/test/java/land/oras/RegistryWireMockTest.java @@ -1146,4 +1146,50 @@ void pullArtifactShouldRejectInvalidTitleAnnotation(WireMockRuntimeInfo wmRuntim Files.exists(outputDir.getParent().resolve("traversed-file.txt")), "Blob must not be written outside the output directory"); } + + @Test + void shouldFailChunkedUploadWhenInitiationReturnsNon202(WireMockRuntimeInfo wmRuntimeInfo) throws IOException { + WireMock wireMock = wmRuntimeInfo.getWireMock(); + String registryUrl = wmRuntimeInfo.getHttpBaseUrl().replace("http://", ""); + + // The POST that opens a chunked-upload session returns 500 instead of 202. + wireMock.register(post(urlPathMatching("/v2/library/chunked-init-error/blobs/uploads/")) + .willReturn(aResponse().withStatus(500).withBody("Internal Server Error"))); + + // HEAD for the prior-existence check returns 404 (blob does not exist yet). + wireMock.register(head(urlPathMatching("/v2/library/chunked-init-error/blobs/.*")) + .willReturn(aResponse().withStatus(404))); + + Registry registry = Registry.Builder.builder() + .withAuthProvider(authProvider) + .withInsecure(true) + .build(); + + byte[] content = "hello".getBytes(StandardCharsets.UTF_8); + String digest = SupportedAlgorithm.SHA256.digest(content); + + // Path overload + Path blobFile = configDir.resolve("chunked-init-error.txt"); + Files.write(blobFile, content); + ContainerRef refPath = ContainerRef.parse("%s/library/chunked-init-error".formatted(registryUrl)); + + OrasException exPath = assertThrows(OrasException.class, () -> registry.pushBlobChunked(refPath, blobFile, 4L)); + assertEquals( + "Failed to initiate chunked blob upload: status 500", + exPath.getMessage(), + "Exception message should include the unexpected status code"); + + // InputStream overload + ContainerRef refStream = ContainerRef.parse("%s/library/chunked-init-error".formatted(registryUrl)) + .withDigest(digest); + + OrasException exStream = assertThrows( + OrasException.class, + () -> registry.pushBlobChunked( + refStream, new java.io.ByteArrayInputStream(content), content.length, 4L)); + assertEquals( + "Failed to initiate chunked blob upload: status 500", + exStream.getMessage(), + "Exception message should include the unexpected status code"); + } }