Skip to content

Commit 14556c4

Browse files
committed
Chunk upload
Signed-off-by: Valentin Delaye <jonesbusy@users.noreply.github.com>
1 parent 299fe89 commit 14556c4

4 files changed

Lines changed: 452 additions & 11 deletions

File tree

src/main/java/land/oras/Registry.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,156 @@ public Layer pushBlob(ContainerRef containerRef, byte[] data) {
761761
return Layer.fromData(ref, data);
762762
}
763763

764+
/**
765+
* Push a blob using chunked upload
766+
*
767+
* @param containerRef The container reference
768+
* @param chunkSize Maximum number of bytes per chunk. Must be &gt; 0.
769+
* @return The {@link Layer} descriptor for the uploaded blob.
770+
* @throws OrasException if the upload fails at any stage.
771+
*/
772+
public Layer pushBlobChunked(ContainerRef containerRef, Path blob, long chunkSize) {
773+
if (chunkSize <= 0) {
774+
throw new OrasException("chunkSize must be greater than 0");
775+
}
776+
String digest = containerRef.getAlgorithm().digest(blob);
777+
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
778+
if (ref.isInsecure(this) && !this.isInsecure()) {
779+
return asInsecure().pushBlobChunked(containerRef, blob, chunkSize);
780+
}
781+
if (hasBlob(ref.withDigest(digest))) {
782+
LOG.info("Blob already exists: {}", digest);
783+
return Layer.fromFile(blob, ref.getAlgorithm());
784+
}
785+
String location = initiateChunkedUpload(ref);
786+
try (InputStream is = Files.newInputStream(blob)) {
787+
long totalSize = Files.size(blob);
788+
location = uploadChunks(ref, is, totalSize, chunkSize, location);
789+
} catch (IOException e) {
790+
throw new OrasException("Failed to read blob for chunked upload: %s".formatted(blob), e);
791+
}
792+
finalizeChunkedUpload(ref, location, digest);
793+
return Layer.fromFile(blob, ref.getAlgorithm());
794+
}
795+
796+
/**
797+
* Push a blob using chunked upload
798+
*
799+
* @param containerRef The container reference
800+
* @param stream Input stream
801+
* @param chunkSize Maximum number of bytes per chunk. Must be &gt; 0.
802+
* @return The {@link Layer} descriptor for the uploaded blob.
803+
* @throws OrasException if the upload fails at any stage.
804+
*/
805+
public Layer pushBlobChunked(ContainerRef containerRef, InputStream stream, long totalSize, long chunkSize) {
806+
String digest = containerRef.getDigest();
807+
if (digest == null) {
808+
throw new OrasException("Digest is required to push blob with chunked stream upload");
809+
}
810+
if (chunkSize <= 0) {
811+
throw new OrasException("chunkSize must be greater than 0");
812+
}
813+
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
814+
if (ref.isInsecure(this) && !this.isInsecure()) {
815+
return asInsecure().pushBlobChunked(containerRef, stream, totalSize, chunkSize);
816+
}
817+
if (hasBlob(ref)) {
818+
LOG.info("Blob already exists: {}", digest);
819+
return Layer.fromDigest(digest, totalSize);
820+
}
821+
String location = initiateChunkedUpload(ref);
822+
location = uploadChunks(ref, stream, totalSize, chunkSize, location);
823+
finalizeChunkedUpload(ref, location, digest);
824+
return Layer.fromDigest(digest, totalSize);
825+
}
826+
827+
private String initiateChunkedUpload(ContainerRef ref) {
828+
URI uri = URI.create("%s://%s".formatted(getScheme(), ref.getBlobsUploadPath(this)));
829+
HttpClient.ResponseWrapper<String> response = client.post(
830+
uri,
831+
new byte[0],
832+
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
833+
Scopes.of(ref),
834+
authProvider);
835+
logResponse(response);
836+
if (response.statusCode() != 202) {
837+
throw new OrasException(
838+
"Failed to initiate chunked blob upload: status %d".formatted(response.statusCode()));
839+
}
840+
String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase());
841+
if (!location.startsWith("http://") && !location.startsWith("https://")) {
842+
location = "%s://%s/%s".formatted(getScheme(), ref.getApiRegistry(this), location.replaceFirst("^/", ""));
843+
}
844+
LOG.debug("Chunked upload session location: {}", location);
845+
return location;
846+
}
847+
848+
private String uploadChunks(ContainerRef ref, InputStream stream, long totalSize, long chunkSize, String location) {
849+
long offset = 0;
850+
byte[] buffer = new byte[(int) Math.min(chunkSize, Integer.MAX_VALUE)];
851+
try {
852+
while (offset < totalSize) {
853+
long remaining = totalSize - offset;
854+
int toRead = (int) Math.min(chunkSize, remaining);
855+
int read = stream.readNBytes(buffer, 0, toRead);
856+
if (read == 0) {
857+
break;
858+
}
859+
long rangeEnd = offset + read - 1;
860+
String contentRange = "%d-%d".formatted(offset, rangeEnd);
861+
final byte[] chunk = java.util.Arrays.copyOf(buffer, read);
862+
URI patchUri = URI.create(location);
863+
HttpClient.ResponseWrapper<String> patchResponse = client.patch(
864+
patchUri,
865+
read,
866+
Map.of(
867+
Const.CONTENT_TYPE_HEADER,
868+
Const.APPLICATION_OCTET_STREAM_HEADER_VALUE,
869+
Const.CONTENT_RANGE_HEADER,
870+
contentRange),
871+
() -> new java.io.ByteArrayInputStream(chunk),
872+
Scopes.of(ref),
873+
authProvider);
874+
logResponse(patchResponse);
875+
if (patchResponse.statusCode() != 202) {
876+
throw new OrasException("Chunked upload PATCH failed for range %s: status %d"
877+
.formatted(contentRange, patchResponse.statusCode()));
878+
}
879+
// The registry MAY return a new location after each PATCH
880+
String newLocation = patchResponse.headers().get(Const.LOCATION_HEADER.toLowerCase());
881+
if (newLocation != null && !newLocation.isBlank()) {
882+
if (!newLocation.startsWith("http://") && !newLocation.startsWith("https://")) {
883+
newLocation = "%s://%s/%s"
884+
.formatted(getScheme(), ref.getApiRegistry(this), newLocation.replaceFirst("^/", ""));
885+
}
886+
location = newLocation;
887+
LOG.debug("Chunked upload location updated: {}", location);
888+
}
889+
offset += read;
890+
LOG.debug("Uploaded chunk {}-{} ({} bytes)", offset - read, rangeEnd, read);
891+
}
892+
} catch (IOException e) {
893+
throw new OrasException("Failed during chunked blob upload", e);
894+
}
895+
return location;
896+
}
897+
898+
private void finalizeChunkedUpload(ContainerRef ref, String location, String digest) {
899+
URI putUri = createLocationWithDigest(location, digest);
900+
HttpClient.ResponseWrapper<String> putResponse = client.put(
901+
putUri,
902+
new byte[0],
903+
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
904+
Scopes.of(ref),
905+
authProvider);
906+
logResponse(putResponse);
907+
if (putResponse.statusCode() != 201) {
908+
throw new OrasException(
909+
"Failed to finalize chunked blob upload: status %d".formatted(putResponse.statusCode()));
910+
}
911+
LOG.debug("Chunked upload finalized successfully for digest: {}", digest);
912+
}
913+
764914
/**
765915
* Return if the registry contains already the blob
766916
* @param containerRef The container

src/main/java/land/oras/auth/HttpClient.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,35 @@ public ResponseWrapper<String> patch(
475475
authProvider);
476476
}
477477

478+
/**
479+
* Upload a chunk of data from an input stream using PATCH.
480+
* @param uri The URI
481+
* @param chunkSize The size of the chunk in bytes
482+
* @param headers The headers (should include Content-Range)
483+
* @param stream A supplier providing the input stream for this chunk
484+
* @param scopes The scopes
485+
* @param authProvider The authentication provider
486+
* @return The response
487+
*/
488+
public ResponseWrapper<String> patch(
489+
URI uri,
490+
long chunkSize,
491+
Map<String, String> headers,
492+
Supplier<InputStream> stream,
493+
Scopes scopes,
494+
AuthProvider authProvider) {
495+
return executeRequest(
496+
"PATCH",
497+
uri,
498+
true,
499+
headers,
500+
new byte[0],
501+
HttpResponse.BodyHandlers.ofString(),
502+
HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.ofInputStream(stream), chunkSize),
503+
scopes,
504+
authProvider);
505+
}
506+
478507
/**
479508
* Perform a PUT request
480509
* @param uri The URI

0 commit comments

Comments
 (0)