Skip to content

Commit 4af2f48

Browse files
authored
Chunk upload (#742)
Signed-off-by: Valentin Delaye <jonesbusy@users.noreply.github.com>
1 parent 599c23c commit 4af2f48

4 files changed

Lines changed: 454 additions & 11 deletions

File tree

src/main/java/land/oras/Registry.java

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,158 @@ public Layer pushBlob(ContainerRef containerRef, byte[] data) {
761761
return Layer.fromData(ref, data);
762762
}
763763

764+
/**
765+
* Push a blob using chunked upload
766+
*
767+
* @param containerRef The container reference
768+
* @param blob The blob file to upload
769+
* @param chunkSize Maximum number of bytes per chunk. Must be &gt; 0.
770+
* @return The {@link Layer} descriptor for the uploaded blob.
771+
* @throws OrasException if the upload fails at any stage.
772+
*/
773+
public Layer pushBlobChunked(ContainerRef containerRef, Path blob, long chunkSize) {
774+
if (chunkSize <= 0) {
775+
throw new OrasException("chunkSize must be greater than 0");
776+
}
777+
String digest = containerRef.getAlgorithm().digest(blob);
778+
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
779+
if (ref.isInsecure(this) && !this.isInsecure()) {
780+
return asInsecure().pushBlobChunked(containerRef, blob, chunkSize);
781+
}
782+
if (hasBlob(ref.withDigest(digest))) {
783+
LOG.info("Blob already exists: {}", digest);
784+
return Layer.fromFile(blob, ref.getAlgorithm());
785+
}
786+
String location = initiateChunkedUpload(ref);
787+
try (InputStream is = Files.newInputStream(blob)) {
788+
long totalSize = Files.size(blob);
789+
location = uploadChunks(ref, is, totalSize, chunkSize, location);
790+
} catch (IOException e) {
791+
throw new OrasException("Failed to read blob for chunked upload: %s".formatted(blob), e);
792+
}
793+
finalizeChunkedUpload(ref, location, digest);
794+
return Layer.fromFile(blob, ref.getAlgorithm());
795+
}
796+
797+
/**
798+
* Push a blob using chunked upload
799+
*
800+
* @param containerRef The container reference
801+
* @param stream Input stream
802+
* @param totalSize Total size of the stream
803+
* @param chunkSize Maximum number of bytes per chunk. Must be &gt; 0.
804+
* @return The {@link Layer} descriptor for the uploaded blob.
805+
* @throws OrasException if the upload fails at any stage.
806+
*/
807+
public Layer pushBlobChunked(ContainerRef containerRef, InputStream stream, long totalSize, long chunkSize) {
808+
String digest = containerRef.getDigest();
809+
if (digest == null) {
810+
throw new OrasException("Digest is required to push blob with chunked stream upload");
811+
}
812+
if (chunkSize <= 0) {
813+
throw new OrasException("chunkSize must be greater than 0");
814+
}
815+
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
816+
if (ref.isInsecure(this) && !this.isInsecure()) {
817+
return asInsecure().pushBlobChunked(containerRef, stream, totalSize, chunkSize);
818+
}
819+
if (hasBlob(ref)) {
820+
LOG.info("Blob already exists: {}", digest);
821+
return Layer.fromDigest(digest, totalSize);
822+
}
823+
String location = initiateChunkedUpload(ref);
824+
location = uploadChunks(ref, stream, totalSize, chunkSize, location);
825+
finalizeChunkedUpload(ref, location, digest);
826+
return Layer.fromDigest(digest, totalSize);
827+
}
828+
829+
private String initiateChunkedUpload(ContainerRef ref) {
830+
URI uri = URI.create("%s://%s".formatted(getScheme(), ref.getBlobsUploadPath(this)));
831+
HttpClient.ResponseWrapper<String> response = client.post(
832+
uri,
833+
new byte[0],
834+
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
835+
Scopes.of(ref),
836+
authProvider);
837+
logResponse(response);
838+
if (response.statusCode() != 202) {
839+
throw new OrasException(
840+
"Failed to initiate chunked blob upload: status %d".formatted(response.statusCode()));
841+
}
842+
String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase());
843+
if (!location.startsWith("http://") && !location.startsWith("https://")) {
844+
location = "%s://%s/%s".formatted(getScheme(), ref.getApiRegistry(this), location.replaceFirst("^/", ""));
845+
}
846+
LOG.debug("Chunked upload session location: {}", location);
847+
return location;
848+
}
849+
850+
private String uploadChunks(ContainerRef ref, InputStream stream, long totalSize, long chunkSize, String location) {
851+
long offset = 0;
852+
byte[] buffer = new byte[(int) Math.min(chunkSize, Integer.MAX_VALUE)];
853+
try {
854+
while (offset < totalSize) {
855+
long remaining = totalSize - offset;
856+
int toRead = (int) Math.min(chunkSize, remaining);
857+
int read = stream.readNBytes(buffer, 0, toRead);
858+
if (read == 0) {
859+
break;
860+
}
861+
long rangeEnd = offset + read - 1;
862+
String contentRange = "%d-%d".formatted(offset, rangeEnd);
863+
final byte[] chunk = java.util.Arrays.copyOf(buffer, read);
864+
URI patchUri = URI.create(location);
865+
HttpClient.ResponseWrapper<String> patchResponse = client.patch(
866+
patchUri,
867+
read,
868+
Map.of(
869+
Const.CONTENT_TYPE_HEADER,
870+
Const.APPLICATION_OCTET_STREAM_HEADER_VALUE,
871+
Const.CONTENT_RANGE_HEADER,
872+
contentRange),
873+
() -> new java.io.ByteArrayInputStream(chunk),
874+
Scopes.of(ref),
875+
authProvider);
876+
logResponse(patchResponse);
877+
if (patchResponse.statusCode() != 202) {
878+
throw new OrasException("Chunked upload PATCH failed for range %s: status %d"
879+
.formatted(contentRange, patchResponse.statusCode()));
880+
}
881+
// The registry MAY return a new location after each PATCH
882+
String newLocation = patchResponse.headers().get(Const.LOCATION_HEADER.toLowerCase());
883+
if (newLocation != null && !newLocation.isBlank()) {
884+
if (!newLocation.startsWith("http://") && !newLocation.startsWith("https://")) {
885+
newLocation = "%s://%s/%s"
886+
.formatted(getScheme(), ref.getApiRegistry(this), newLocation.replaceFirst("^/", ""));
887+
}
888+
location = newLocation;
889+
LOG.debug("Chunked upload location updated: {}", location);
890+
}
891+
offset += read;
892+
LOG.debug("Uploaded chunk {}-{} ({} bytes)", offset - read, rangeEnd, read);
893+
}
894+
} catch (IOException e) {
895+
throw new OrasException("Failed during chunked blob upload", e);
896+
}
897+
return location;
898+
}
899+
900+
private void finalizeChunkedUpload(ContainerRef ref, String location, String digest) {
901+
URI putUri = createLocationWithDigest(location, digest);
902+
HttpClient.ResponseWrapper<String> putResponse = client.put(
903+
putUri,
904+
new byte[0],
905+
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
906+
Scopes.of(ref),
907+
authProvider);
908+
logResponse(putResponse);
909+
if (putResponse.statusCode() != 201) {
910+
throw new OrasException(
911+
"Failed to finalize chunked blob upload: status %d".formatted(putResponse.statusCode()));
912+
}
913+
LOG.debug("Chunked upload finalized successfully for digest: {}", digest);
914+
}
915+
764916
/**
765917
* Return if the registry contains already the blob
766918
* @param containerRef The container

src/main/java/land/oras/auth/HttpClient.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,35 @@ public ResponseWrapper<String> patch(
475475
authProvider);
476476
}
477477

478+
/**
479+
* Upload a chunk of data from an input stream using PATCH.
480+
* @param uri The URI
481+
* @param chunkSize The size of the chunk in bytes
482+
* @param headers The headers (should include Content-Range)
483+
* @param stream A supplier providing the input stream for this chunk
484+
* @param scopes The scopes
485+
* @param authProvider The authentication provider
486+
* @return The response
487+
*/
488+
public ResponseWrapper<String> patch(
489+
URI uri,
490+
long chunkSize,
491+
Map<String, String> headers,
492+
Supplier<InputStream> stream,
493+
Scopes scopes,
494+
AuthProvider authProvider) {
495+
return executeRequest(
496+
"PATCH",
497+
uri,
498+
true,
499+
headers,
500+
new byte[0],
501+
HttpResponse.BodyHandlers.ofString(),
502+
HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.ofInputStream(stream), chunkSize),
503+
scopes,
504+
authProvider);
505+
}
506+
478507
/**
479508
* Perform a PUT request
480509
* @param uri The URI

0 commit comments

Comments
 (0)