Skip to content

Commit 20f9ad9

Browse files
committed
Chunk upload
1 parent 299fe89 commit 20f9ad9

3 files changed

Lines changed: 409 additions & 0 deletions

File tree

src/main/java/land/oras/Registry.java

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,224 @@ public Layer pushBlob(ContainerRef containerRef, byte[] data) {
761761
return Layer.fromData(ref, data);
762762
}
763763

764+
/**
765+
* Push a blob using chunked upload (OCI Distribution Spec § Pushing a blob in chunks).
766+
*
767+
* <p>The blob is read from the given {@link Path} and split into chunks of at most
768+
* {@code chunkSize} bytes. Each chunk is uploaded via a {@code PATCH} request; the
769+
* session is then closed with a final {@code PUT} request that carries the overall blob
770+
* digest.
771+
*
772+
* @param containerRef The container reference (registry + repository + optional tag/digest).
773+
* A digest on the ref is ignored; the digest is always computed from
774+
* the file content.
775+
* @param blob Path to the local file to upload.
776+
* @param chunkSize Maximum number of bytes per chunk. Must be &gt; 0.
777+
* @return The {@link Layer} descriptor for the uploaded blob.
778+
* @throws OrasException if the upload fails at any stage.
779+
*/
780+
public Layer pushBlobChunked(ContainerRef containerRef, Path blob, long chunkSize) {
781+
if (chunkSize <= 0) {
782+
throw new OrasException("chunkSize must be greater than 0");
783+
}
784+
String digest = containerRef.getAlgorithm().digest(blob);
785+
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
786+
if (ref.isInsecure(this) && !this.isInsecure()) {
787+
return asInsecure().pushBlobChunked(containerRef, blob, chunkSize);
788+
}
789+
if (hasBlob(ref.withDigest(digest))) {
790+
LOG.info("Blob already exists: {}", digest);
791+
return Layer.fromFile(blob, ref.getAlgorithm());
792+
}
793+
long totalSize;
794+
try {
795+
totalSize = Files.size(blob);
796+
} catch (IOException e) {
797+
throw new OrasException("Failed to read blob file size: %s".formatted(blob), e);
798+
}
799+
String location = initiateChunkedUpload(ref);
800+
try (InputStream is = Files.newInputStream(blob)) {
801+
location = uploadChunks(ref, is, totalSize, chunkSize, location);
802+
} catch (IOException e) {
803+
throw new OrasException("Failed to read blob for chunked upload: %s".formatted(blob), e);
804+
}
805+
finalizeChunkedUpload(ref, location, digest);
806+
return Layer.fromFile(blob, ref.getAlgorithm());
807+
}
808+
809+
/**
810+
* Push a blob using chunked upload from an {@link InputStream} (OCI Distribution Spec §
811+
* Pushing a blob in chunks).
812+
*
813+
* <p>The stream is consumed in chunks of at most {@code chunkSize} bytes. The caller
814+
* <em>must</em> supply the total blob size and digest via {@code containerRef} (the ref
815+
* must carry a digest). The stream is read exactly once; it is the caller's responsibility
816+
* to provide a stream whose content matches the declared digest and size.
817+
*
818+
* @param containerRef The container reference. <strong>Must</strong> carry the blob digest
819+
* (e.g. obtained from {@link ContainerRef#withDigest(String)}).
820+
* @param stream Input stream supplying the blob data. The stream is fully consumed
821+
* and <em>not</em> closed by this method.
822+
* @param totalSize Total size of the blob in bytes.
823+
* @param chunkSize Maximum number of bytes per chunk. Must be &gt; 0.
824+
* @return The {@link Layer} descriptor for the uploaded blob.
825+
* @throws OrasException if the upload fails at any stage.
826+
*/
827+
public Layer pushBlobChunked(ContainerRef containerRef, InputStream stream, long totalSize, long chunkSize) {
828+
String digest = containerRef.getDigest();
829+
if (digest == null) {
830+
throw new OrasException("Digest is required to push blob with chunked stream upload");
831+
}
832+
if (chunkSize <= 0) {
833+
throw new OrasException("chunkSize must be greater than 0");
834+
}
835+
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
836+
if (ref.isInsecure(this) && !this.isInsecure()) {
837+
return asInsecure().pushBlobChunked(containerRef, stream, totalSize, chunkSize);
838+
}
839+
if (hasBlob(ref)) {
840+
LOG.info("Blob already exists: {}", digest);
841+
return Layer.fromDigest(digest, totalSize);
842+
}
843+
String location = initiateChunkedUpload(ref);
844+
location = uploadChunks(ref, stream, totalSize, chunkSize, location);
845+
finalizeChunkedUpload(ref, location, digest);
846+
return Layer.fromDigest(digest, totalSize);
847+
}
848+
849+
/**
850+
* Phase 1 of chunked upload: POST to obtain a session upload location.
851+
* @param ref The resolved container reference.
852+
* @return The absolute upload location URL string.
853+
*/
854+
private String initiateChunkedUpload(ContainerRef ref) {
855+
URI uri = URI.create("%s://%s".formatted(getScheme(), ref.getBlobsUploadPath(this)));
856+
HttpClient.ResponseWrapper<String> response = client.post(
857+
uri,
858+
new byte[0],
859+
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
860+
Scopes.of(ref),
861+
authProvider);
862+
logResponse(response);
863+
if (response.statusCode() != 202) {
864+
throw new OrasException(
865+
"Failed to initiate chunked blob upload: status %d".formatted(response.statusCode()));
866+
}
867+
String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase());
868+
if (!location.startsWith("http://") && !location.startsWith("https://")) {
869+
location = "%s://%s/%s".formatted(getScheme(), ref.getApiRegistry(this), location.replaceFirst("^/", ""));
870+
}
871+
LOG.debug("Chunked upload session location: {}", location);
872+
return location;
873+
}
874+
875+
/**
876+
* Phase 2 of chunked upload: send all chunks via PATCH.
877+
* @param ref The resolved container reference (used for auth scopes).
878+
* @param stream The data stream to upload.
879+
* @param totalSize Total number of bytes to upload.
880+
* @param chunkSize Maximum bytes per PATCH request.
881+
* @param location The current upload location URL (updated after each PATCH).
882+
* @return The final location URL to use for the closing PUT.
883+
*/
884+
private String uploadChunks(ContainerRef ref, InputStream stream, long totalSize, long chunkSize, String location) {
885+
long offset = 0;
886+
byte[] buffer = new byte[(int) Math.min(chunkSize, Integer.MAX_VALUE)];
887+
try {
888+
while (offset < totalSize) {
889+
long remaining = totalSize - offset;
890+
int toRead = (int) Math.min(chunkSize, remaining);
891+
int read = readFully(stream, buffer, toRead);
892+
if (read <= 0) {
893+
break;
894+
}
895+
long rangeEnd = offset + read - 1;
896+
String contentRange = "%d-%d".formatted(offset, rangeEnd);
897+
final byte[] chunk = java.util.Arrays.copyOf(buffer, read);
898+
URI patchUri;
899+
try {
900+
patchUri = new URI(location);
901+
} catch (java.net.URISyntaxException e) {
902+
throw new OrasException("Invalid upload location URI: %s".formatted(location), e);
903+
}
904+
HttpClient.ResponseWrapper<String> patchResponse = client.patch(
905+
patchUri,
906+
read,
907+
Map.of(
908+
Const.CONTENT_TYPE_HEADER,
909+
Const.APPLICATION_OCTET_STREAM_HEADER_VALUE,
910+
Const.CONTENT_RANGE_HEADER,
911+
contentRange),
912+
() -> new java.io.ByteArrayInputStream(chunk),
913+
Scopes.of(ref),
914+
authProvider);
915+
logResponse(patchResponse);
916+
if (patchResponse.statusCode() != 202) {
917+
throw new OrasException("Chunked upload PATCH failed for range %s: status %d"
918+
.formatted(contentRange, patchResponse.statusCode()));
919+
}
920+
// The registry MAY return a new location after each PATCH
921+
String newLocation = patchResponse.headers().get(Const.LOCATION_HEADER.toLowerCase());
922+
if (newLocation != null && !newLocation.isBlank()) {
923+
if (!newLocation.startsWith("http://") && !newLocation.startsWith("https://")) {
924+
newLocation = "%s://%s/%s"
925+
.formatted(getScheme(), ref.getApiRegistry(this), newLocation.replaceFirst("^/", ""));
926+
}
927+
location = newLocation;
928+
LOG.debug("Chunked upload location updated: {}", location);
929+
}
930+
offset += read;
931+
LOG.debug("Uploaded chunk {}-{} ({} bytes)", offset - read, rangeEnd, read);
932+
}
933+
} catch (IOException e) {
934+
throw new OrasException("Failed during chunked blob upload", e);
935+
}
936+
return location;
937+
}
938+
939+
/**
940+
* Phase 3 of chunked upload: close the session with a PUT carrying the full blob digest.
941+
* @param ref The resolved container reference (used for auth scopes).
942+
* @param location The upload location URL from the last PATCH response.
943+
* @param digest The digest of the whole blob.
944+
*/
945+
private void finalizeChunkedUpload(ContainerRef ref, String location, String digest) {
946+
URI putUri = createLocationWithDigest(location, digest);
947+
HttpClient.ResponseWrapper<String> putResponse = client.put(
948+
putUri,
949+
new byte[0],
950+
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
951+
Scopes.of(ref),
952+
authProvider);
953+
logResponse(putResponse);
954+
if (putResponse.statusCode() != 201) {
955+
throw new OrasException(
956+
"Failed to finalize chunked blob upload: status %d".formatted(putResponse.statusCode()));
957+
}
958+
LOG.debug("Chunked upload finalized successfully for digest: {}", digest);
959+
}
960+
961+
/**
962+
* Reads exactly {@code length} bytes from the stream into {@code buf}, blocking until
963+
* sufficient data is available or end-of-stream is reached.
964+
* @param stream The source stream.
965+
* @param buf The buffer to fill.
966+
* @param length The number of bytes to read.
967+
* @return The number of bytes actually read (may be less than {@code length} at EOF).
968+
* @throws IOException if the underlying stream throws.
969+
*/
970+
private int readFully(InputStream stream, byte[] buf, int length) throws IOException {
971+
int total = 0;
972+
while (total < length) {
973+
int n = stream.read(buf, total, length - total);
974+
if (n < 0) {
975+
break;
976+
}
977+
total += n;
978+
}
979+
return total;
980+
}
981+
764982
/**
765983
* Return if the registry contains already the blob
766984
* @param containerRef The container

src/main/java/land/oras/auth/HttpClient.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,35 @@ public ResponseWrapper<String> patch(
475475
authProvider);
476476
}
477477

478+
/**
479+
* Upload a chunk of data from an input stream using PATCH.
480+
* @param uri The URI
481+
* @param chunkSize The size of the chunk in bytes
482+
* @param headers The headers (should include Content-Range)
483+
* @param stream A supplier providing the input stream for this chunk
484+
* @param scopes The scopes
485+
* @param authProvider The authentication provider
486+
* @return The response
487+
*/
488+
public ResponseWrapper<String> patch(
489+
URI uri,
490+
long chunkSize,
491+
Map<String, String> headers,
492+
Supplier<InputStream> stream,
493+
Scopes scopes,
494+
AuthProvider authProvider) {
495+
return executeRequest(
496+
"PATCH",
497+
uri,
498+
true,
499+
headers,
500+
new byte[0],
501+
HttpResponse.BodyHandlers.ofString(),
502+
HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.ofInputStream(stream), chunkSize),
503+
scopes,
504+
authProvider);
505+
}
506+
478507
/**
479508
* Perform a PUT request
480509
* @param uri The URI

0 commit comments

Comments
 (0)