Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions src/main/java/land/oras/Registry.java
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,158 @@ public Layer pushBlob(ContainerRef containerRef, byte[] data) {
return Layer.fromData(ref, data);
}

/**
* Push a blob using chunked upload
*
* @param containerRef The container reference
* @param blob The blob file to upload
* @param chunkSize Maximum number of bytes per chunk. Must be > 0.
* @return The {@link Layer} descriptor for the uploaded blob.
* @throws OrasException if the upload fails at any stage.
*/
public Layer pushBlobChunked(ContainerRef containerRef, Path blob, long chunkSize) {
if (chunkSize <= 0) {
throw new OrasException("chunkSize must be greater than 0");
}
String digest = containerRef.getAlgorithm().digest(blob);
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
if (ref.isInsecure(this) && !this.isInsecure()) {
return asInsecure().pushBlobChunked(containerRef, blob, chunkSize);
}
if (hasBlob(ref.withDigest(digest))) {
LOG.info("Blob already exists: {}", digest);
return Layer.fromFile(blob, ref.getAlgorithm());
}
String location = initiateChunkedUpload(ref);
try (InputStream is = Files.newInputStream(blob)) {
long totalSize = Files.size(blob);
location = uploadChunks(ref, is, totalSize, chunkSize, location);
} catch (IOException e) {
throw new OrasException("Failed to read blob for chunked upload: %s".formatted(blob), e);
}
finalizeChunkedUpload(ref, location, digest);
return Layer.fromFile(blob, ref.getAlgorithm());
}

/**
* Push a blob using chunked upload
*
* @param containerRef The container reference
* @param stream Input stream
* @param totalSize Total size of the stream
* @param chunkSize Maximum number of bytes per chunk. Must be &gt; 0.
* @return The {@link Layer} descriptor for the uploaded blob.
* @throws OrasException if the upload fails at any stage.
*/
public Layer pushBlobChunked(ContainerRef containerRef, InputStream stream, long totalSize, long chunkSize) {
String digest = containerRef.getDigest();
if (digest == null) {
throw new OrasException("Digest is required to push blob with chunked stream upload");
}
if (chunkSize <= 0) {
throw new OrasException("chunkSize must be greater than 0");
}
ContainerRef ref = containerRef.forRegistry(this).checkBlocked(this);
if (ref.isInsecure(this) && !this.isInsecure()) {
return asInsecure().pushBlobChunked(containerRef, stream, totalSize, chunkSize);
}
if (hasBlob(ref)) {
LOG.info("Blob already exists: {}", digest);
return Layer.fromDigest(digest, totalSize);
}
String location = initiateChunkedUpload(ref);
location = uploadChunks(ref, stream, totalSize, chunkSize, location);
finalizeChunkedUpload(ref, location, digest);
return Layer.fromDigest(digest, totalSize);
}

private String initiateChunkedUpload(ContainerRef ref) {
URI uri = URI.create("%s://%s".formatted(getScheme(), ref.getBlobsUploadPath(this)));
HttpClient.ResponseWrapper<String> response = client.post(
uri,
new byte[0],
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
Scopes.of(ref),
authProvider);
logResponse(response);
if (response.statusCode() != 202) {
throw new OrasException(
"Failed to initiate chunked blob upload: status %d".formatted(response.statusCode()));
}
String location = response.headers().get(Const.LOCATION_HEADER.toLowerCase());
if (!location.startsWith("http://") && !location.startsWith("https://")) {
location = "%s://%s/%s".formatted(getScheme(), ref.getApiRegistry(this), location.replaceFirst("^/", ""));
}
LOG.debug("Chunked upload session location: {}", location);
return location;
}

private String uploadChunks(ContainerRef ref, InputStream stream, long totalSize, long chunkSize, String location) {
long offset = 0;
byte[] buffer = new byte[(int) Math.min(chunkSize, Integer.MAX_VALUE)];
try {
while (offset < totalSize) {
long remaining = totalSize - offset;
int toRead = (int) Math.min(chunkSize, remaining);
int read = stream.readNBytes(buffer, 0, toRead);
if (read == 0) {
break;
}
long rangeEnd = offset + read - 1;
String contentRange = "%d-%d".formatted(offset, rangeEnd);
final byte[] chunk = java.util.Arrays.copyOf(buffer, read);
URI patchUri = URI.create(location);
HttpClient.ResponseWrapper<String> patchResponse = client.patch(
patchUri,
read,
Map.of(
Const.CONTENT_TYPE_HEADER,
Const.APPLICATION_OCTET_STREAM_HEADER_VALUE,
Const.CONTENT_RANGE_HEADER,
contentRange),
() -> new java.io.ByteArrayInputStream(chunk),
Scopes.of(ref),
authProvider);
logResponse(patchResponse);
if (patchResponse.statusCode() != 202) {
throw new OrasException("Chunked upload PATCH failed for range %s: status %d"
.formatted(contentRange, patchResponse.statusCode()));
}
// The registry MAY return a new location after each PATCH
String newLocation = patchResponse.headers().get(Const.LOCATION_HEADER.toLowerCase());
if (newLocation != null && !newLocation.isBlank()) {
if (!newLocation.startsWith("http://") && !newLocation.startsWith("https://")) {
newLocation = "%s://%s/%s"
.formatted(getScheme(), ref.getApiRegistry(this), newLocation.replaceFirst("^/", ""));
}
location = newLocation;
LOG.debug("Chunked upload location updated: {}", location);
}
offset += read;
LOG.debug("Uploaded chunk {}-{} ({} bytes)", offset - read, rangeEnd, read);
}
} catch (IOException e) {
throw new OrasException("Failed during chunked blob upload", e);
}
return location;
}

private void finalizeChunkedUpload(ContainerRef ref, String location, String digest) {
URI putUri = createLocationWithDigest(location, digest);
HttpClient.ResponseWrapper<String> putResponse = client.put(
putUri,
new byte[0],
Map.of(Const.CONTENT_TYPE_HEADER, Const.APPLICATION_OCTET_STREAM_HEADER_VALUE),
Scopes.of(ref),
authProvider);
logResponse(putResponse);
if (putResponse.statusCode() != 201) {
throw new OrasException(
"Failed to finalize chunked blob upload: status %d".formatted(putResponse.statusCode()));
}
LOG.debug("Chunked upload finalized successfully for digest: {}", digest);
}

/**
* Return if the registry contains already the blob
* @param containerRef The container
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/land/oras/auth/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,35 @@ public ResponseWrapper<String> patch(
authProvider);
}

/**
* Upload a chunk of data from an input stream using PATCH.
* @param uri The URI
* @param chunkSize The size of the chunk in bytes
* @param headers The headers (should include Content-Range)
* @param stream A supplier providing the input stream for this chunk
* @param scopes The scopes
* @param authProvider The authentication provider
* @return The response
*/
public ResponseWrapper<String> patch(
URI uri,
long chunkSize,
Map<String, String> headers,
Supplier<InputStream> stream,
Scopes scopes,
AuthProvider authProvider) {
return executeRequest(
"PATCH",
uri,
true,
headers,
new byte[0],
HttpResponse.BodyHandlers.ofString(),
HttpRequest.BodyPublishers.fromPublisher(HttpRequest.BodyPublishers.ofInputStream(stream), chunkSize),
scopes,
authProvider);
}

/**
* Perform a PUT request
* @param uri The URI
Expand Down
Loading
Loading