@@ -761,6 +761,224 @@ public Layer pushBlob(ContainerRef containerRef, byte[] data) {
761761 return Layer .fromData (ref , data );
762762 }
763763
764+ /**
765+ * Push a blob using chunked upload (OCI Distribution Spec § Pushing a blob in chunks).
766+ *
767+ * <p>The blob is read from the given {@link Path} and split into chunks of at most
768+ * {@code chunkSize} bytes. Each chunk is uploaded via a {@code PATCH} request; the
769+ * session is then closed with a final {@code PUT} request that carries the overall blob
770+ * digest.
771+ *
772+ * @param containerRef The container reference (registry + repository + optional tag/digest).
773+ * A digest on the ref is ignored; the digest is always computed from
774+ * the file content.
775+ * @param blob Path to the local file to upload.
776+ * @param chunkSize Maximum number of bytes per chunk. Must be > 0.
777+ * @return The {@link Layer} descriptor for the uploaded blob.
778+ * @throws OrasException if the upload fails at any stage.
779+ */
780+ public Layer pushBlobChunked (ContainerRef containerRef , Path blob , long chunkSize ) {
781+ if (chunkSize <= 0 ) {
782+ throw new OrasException ("chunkSize must be greater than 0" );
783+ }
784+ String digest = containerRef .getAlgorithm ().digest (blob );
785+ ContainerRef ref = containerRef .forRegistry (this ).checkBlocked (this );
786+ if (ref .isInsecure (this ) && !this .isInsecure ()) {
787+ return asInsecure ().pushBlobChunked (containerRef , blob , chunkSize );
788+ }
789+ if (hasBlob (ref .withDigest (digest ))) {
790+ LOG .info ("Blob already exists: {}" , digest );
791+ return Layer .fromFile (blob , ref .getAlgorithm ());
792+ }
793+ long totalSize ;
794+ try {
795+ totalSize = Files .size (blob );
796+ } catch (IOException e ) {
797+ throw new OrasException ("Failed to read blob file size: %s" .formatted (blob ), e );
798+ }
799+ String location = initiateChunkedUpload (ref );
800+ try (InputStream is = Files .newInputStream (blob )) {
801+ location = uploadChunks (ref , is , totalSize , chunkSize , location );
802+ } catch (IOException e ) {
803+ throw new OrasException ("Failed to read blob for chunked upload: %s" .formatted (blob ), e );
804+ }
805+ finalizeChunkedUpload (ref , location , digest );
806+ return Layer .fromFile (blob , ref .getAlgorithm ());
807+ }
808+
809+ /**
810+ * Push a blob using chunked upload from an {@link InputStream} (OCI Distribution Spec §
811+ * Pushing a blob in chunks).
812+ *
813+ * <p>The stream is consumed in chunks of at most {@code chunkSize} bytes. The caller
814+ * <em>must</em> supply the total blob size and digest via {@code containerRef} (the ref
815+ * must carry a digest). The stream is read exactly once; it is the caller's responsibility
816+ * to provide a stream whose content matches the declared digest and size.
817+ *
818+ * @param containerRef The container reference. <strong>Must</strong> carry the blob digest
819+ * (e.g. obtained from {@link ContainerRef#withDigest(String)}).
820+ * @param stream Input stream supplying the blob data. The stream is fully consumed
821+ * and <em>not</em> closed by this method.
822+ * @param totalSize Total size of the blob in bytes.
823+ * @param chunkSize Maximum number of bytes per chunk. Must be > 0.
824+ * @return The {@link Layer} descriptor for the uploaded blob.
825+ * @throws OrasException if the upload fails at any stage.
826+ */
827+ public Layer pushBlobChunked (ContainerRef containerRef , InputStream stream , long totalSize , long chunkSize ) {
828+ String digest = containerRef .getDigest ();
829+ if (digest == null ) {
830+ throw new OrasException ("Digest is required to push blob with chunked stream upload" );
831+ }
832+ if (chunkSize <= 0 ) {
833+ throw new OrasException ("chunkSize must be greater than 0" );
834+ }
835+ ContainerRef ref = containerRef .forRegistry (this ).checkBlocked (this );
836+ if (ref .isInsecure (this ) && !this .isInsecure ()) {
837+ return asInsecure ().pushBlobChunked (containerRef , stream , totalSize , chunkSize );
838+ }
839+ if (hasBlob (ref )) {
840+ LOG .info ("Blob already exists: {}" , digest );
841+ return Layer .fromDigest (digest , totalSize );
842+ }
843+ String location = initiateChunkedUpload (ref );
844+ location = uploadChunks (ref , stream , totalSize , chunkSize , location );
845+ finalizeChunkedUpload (ref , location , digest );
846+ return Layer .fromDigest (digest , totalSize );
847+ }
848+
849+ /**
850+ * Phase 1 of chunked upload: POST to obtain a session upload location.
851+ * @param ref The resolved container reference.
852+ * @return The absolute upload location URL string.
853+ */
854+ private String initiateChunkedUpload (ContainerRef ref ) {
855+ URI uri = URI .create ("%s://%s" .formatted (getScheme (), ref .getBlobsUploadPath (this )));
856+ HttpClient .ResponseWrapper <String > response = client .post (
857+ uri ,
858+ new byte [0 ],
859+ Map .of (Const .CONTENT_TYPE_HEADER , Const .APPLICATION_OCTET_STREAM_HEADER_VALUE ),
860+ Scopes .of (ref ),
861+ authProvider );
862+ logResponse (response );
863+ if (response .statusCode () != 202 ) {
864+ throw new OrasException (
865+ "Failed to initiate chunked blob upload: status %d" .formatted (response .statusCode ()));
866+ }
867+ String location = response .headers ().get (Const .LOCATION_HEADER .toLowerCase ());
868+ if (!location .startsWith ("http://" ) && !location .startsWith ("https://" )) {
869+ location = "%s://%s/%s" .formatted (getScheme (), ref .getApiRegistry (this ), location .replaceFirst ("^/" , "" ));
870+ }
871+ LOG .debug ("Chunked upload session location: {}" , location );
872+ return location ;
873+ }
874+
875+ /**
876+ * Phase 2 of chunked upload: send all chunks via PATCH.
877+ * @param ref The resolved container reference (used for auth scopes).
878+ * @param stream The data stream to upload.
879+ * @param totalSize Total number of bytes to upload.
880+ * @param chunkSize Maximum bytes per PATCH request.
881+ * @param location The current upload location URL (updated after each PATCH).
882+ * @return The final location URL to use for the closing PUT.
883+ */
884+ private String uploadChunks (ContainerRef ref , InputStream stream , long totalSize , long chunkSize , String location ) {
885+ long offset = 0 ;
886+ byte [] buffer = new byte [(int ) Math .min (chunkSize , Integer .MAX_VALUE )];
887+ try {
888+ while (offset < totalSize ) {
889+ long remaining = totalSize - offset ;
890+ int toRead = (int ) Math .min (chunkSize , remaining );
891+ int read = readFully (stream , buffer , toRead );
892+ if (read <= 0 ) {
893+ break ;
894+ }
895+ long rangeEnd = offset + read - 1 ;
896+ String contentRange = "%d-%d" .formatted (offset , rangeEnd );
897+ final byte [] chunk = java .util .Arrays .copyOf (buffer , read );
898+ URI patchUri ;
899+ try {
900+ patchUri = new URI (location );
901+ } catch (java .net .URISyntaxException e ) {
902+ throw new OrasException ("Invalid upload location URI: %s" .formatted (location ), e );
903+ }
904+ HttpClient .ResponseWrapper <String > patchResponse = client .patch (
905+ patchUri ,
906+ read ,
907+ Map .of (
908+ Const .CONTENT_TYPE_HEADER ,
909+ Const .APPLICATION_OCTET_STREAM_HEADER_VALUE ,
910+ Const .CONTENT_RANGE_HEADER ,
911+ contentRange ),
912+ () -> new java .io .ByteArrayInputStream (chunk ),
913+ Scopes .of (ref ),
914+ authProvider );
915+ logResponse (patchResponse );
916+ if (patchResponse .statusCode () != 202 ) {
917+ throw new OrasException ("Chunked upload PATCH failed for range %s: status %d"
918+ .formatted (contentRange , patchResponse .statusCode ()));
919+ }
920+ // The registry MAY return a new location after each PATCH
921+ String newLocation = patchResponse .headers ().get (Const .LOCATION_HEADER .toLowerCase ());
922+ if (newLocation != null && !newLocation .isBlank ()) {
923+ if (!newLocation .startsWith ("http://" ) && !newLocation .startsWith ("https://" )) {
924+ newLocation = "%s://%s/%s"
925+ .formatted (getScheme (), ref .getApiRegistry (this ), newLocation .replaceFirst ("^/" , "" ));
926+ }
927+ location = newLocation ;
928+ LOG .debug ("Chunked upload location updated: {}" , location );
929+ }
930+ offset += read ;
931+ LOG .debug ("Uploaded chunk {}-{} ({} bytes)" , offset - read , rangeEnd , read );
932+ }
933+ } catch (IOException e ) {
934+ throw new OrasException ("Failed during chunked blob upload" , e );
935+ }
936+ return location ;
937+ }
938+
939+ /**
940+ * Phase 3 of chunked upload: close the session with a PUT carrying the full blob digest.
941+ * @param ref The resolved container reference (used for auth scopes).
942+ * @param location The upload location URL from the last PATCH response.
943+ * @param digest The digest of the whole blob.
944+ */
945+ private void finalizeChunkedUpload (ContainerRef ref , String location , String digest ) {
946+ URI putUri = createLocationWithDigest (location , digest );
947+ HttpClient .ResponseWrapper <String > putResponse = client .put (
948+ putUri ,
949+ new byte [0 ],
950+ Map .of (Const .CONTENT_TYPE_HEADER , Const .APPLICATION_OCTET_STREAM_HEADER_VALUE ),
951+ Scopes .of (ref ),
952+ authProvider );
953+ logResponse (putResponse );
954+ if (putResponse .statusCode () != 201 ) {
955+ throw new OrasException (
956+ "Failed to finalize chunked blob upload: status %d" .formatted (putResponse .statusCode ()));
957+ }
958+ LOG .debug ("Chunked upload finalized successfully for digest: {}" , digest );
959+ }
960+
961+ /**
962+ * Reads exactly {@code length} bytes from the stream into {@code buf}, blocking until
963+ * sufficient data is available or end-of-stream is reached.
964+ * @param stream The source stream.
965+ * @param buf The buffer to fill.
966+ * @param length The number of bytes to read.
967+ * @return The number of bytes actually read (may be less than {@code length} at EOF).
968+ * @throws IOException if the underlying stream throws.
969+ */
970+ private int readFully (InputStream stream , byte [] buf , int length ) throws IOException {
971+ int total = 0 ;
972+ while (total < length ) {
973+ int n = stream .read (buf , total , length - total );
974+ if (n < 0 ) {
975+ break ;
976+ }
977+ total += n ;
978+ }
979+ return total ;
980+ }
981+
764982 /**
765983 * Return if the registry contains already the blob
766984 * @param containerRef The container
0 commit comments