Skip to content

Commit 4256b60

Browse files
authored
Concurrency for layer pull and push (#629)
Signed-off-by: Valentin Delaye <jonesbusy@users.noreply.github.com>
1 parent 79f879c commit 4256b60

5 files changed

Lines changed: 202 additions & 121 deletions

File tree

src/main/java/land/oras/CopyUtils.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222

2323
import java.util.Objects;
24+
import java.util.concurrent.CompletableFuture;
2425
import land.oras.exception.OrasException;
2526
import org.jspecify.annotations.NonNull;
2627
import org.slf4j.Logger;
@@ -107,17 +108,20 @@ void copyLayers(
107108
OCI<TargetRefType> target,
108109
TargetRefType targetRef,
109110
String contentType) {
110-
for (Layer layer : source.collectLayers(sourceRef, contentType, true)) {
111-
Objects.requireNonNull(layer.getDigest(), "Layer digest is required for streaming copy");
112-
Objects.requireNonNull(layer.getSize(), "Layer size is required for streaming copy");
113-
LOG.debug("Copying layer {}", layer.getDigest());
114-
target.pushBlob(
115-
targetRef.withDigest(layer.getDigest()),
116-
layer.getSize(),
117-
() -> source.fetchBlob(sourceRef.withDigest(layer.getDigest())),
118-
layer.getAnnotations());
119-
LOG.debug("Copied layer {}", layer.getDigest());
120-
}
111+
CompletableFuture.allOf(source.collectLayers(sourceRef, contentType, true).stream()
112+
.map(layer -> {
113+
Objects.requireNonNull(layer.getDigest(), "Layer digest is required for streaming copy");
114+
Objects.requireNonNull(layer.getSize(), "Layer size is required for streaming copy");
115+
return CompletableFuture.runAsync(
116+
() -> target.pushBlob(
117+
targetRef.withDigest(layer.getDigest()),
118+
layer.getSize(),
119+
() -> source.fetchBlob(sourceRef.withDigest(layer.getDigest())),
120+
layer.getAnnotations()),
121+
source.getExecutorService());
122+
})
123+
.toArray(CompletableFuture[]::new))
124+
.join();
121125
}
122126

123127
/**

src/main/java/land/oras/OCI.java

Lines changed: 86 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@
2727
import java.nio.file.Files;
2828
import java.nio.file.Path;
2929
import java.nio.file.StandardCopyOption;
30-
import java.util.ArrayList;
30+
import java.util.Arrays;
3131
import java.util.LinkedHashMap;
3232
import java.util.LinkedList;
3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.CompletionException;
37+
import java.util.concurrent.ExecutorService;
3538
import java.util.function.Supplier;
3639
import land.oras.exception.OrasException;
3740
import land.oras.utils.ArchiveUtils;
@@ -170,77 +173,15 @@ protected List<Layer> collectLayers(T ref, String contentType, boolean includeAl
170173
* @return The layers
171174
*/
172175
protected final List<Layer> pushLayers(T ref, Annotations annotations, boolean withDigest, LocalPath... paths) {
173-
List<Layer> layers = new ArrayList<>();
174-
for (LocalPath path : paths) {
175-
try {
176-
// Create tar.gz archive for directory
177-
if (Files.isDirectory(path.getPath())) {
178-
SupportedCompression compression = SupportedCompression.fromMediaType(path.getMediaType());
179-
180-
// If source need to be packed first
181-
boolean autoUnpack = compression.isAutoUnpack();
182-
LocalPath tempSource = autoUnpack ? ArchiveUtils.tar(path) : path;
183-
LocalPath tempArchive = ArchiveUtils.compress(tempSource, path.getMediaType());
184-
185-
if (withDigest) {
186-
ref = ref.withDigest(ref.getAlgorithm().digest(tempArchive.getPath()));
187-
}
188-
try (InputStream is = Files.newInputStream(tempArchive.getPath())) {
189-
String title = path.getPath().isAbsolute()
190-
? path.getPath().getFileName().toString()
191-
: path.getPath().toString();
192-
193-
// We store the filename, based on directory name if we don't auto unpack
194-
if (!autoUnpack) {
195-
title = "%s.%s".formatted(title, compression.getFileExtension());
196-
}
197-
LOG.debug("Uploading directory as archive with title: {}", title);
198-
199-
Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
200-
? annotations.getFileAnnotations(title)
201-
: new LinkedHashMap<>(Map.of(Const.ANNOTATION_TITLE, title));
202-
203-
// Add oras digest/unpack
204-
// For example zip can be packed application/zip but never unpacked by the runtime
205-
// This is convenience method to pack zip layer as directories
206-
if (compression.isAutoUnpack()) {
207-
layerAnnotations.put(
208-
Const.ANNOTATION_ORAS_CONTENT_DIGEST,
209-
ref.getAlgorithm().digest(tempSource.getPath()));
210-
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true");
211-
} else {
212-
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "false");
213-
}
214-
215-
Layer layer = pushBlob(ref, is)
216-
.withMediaType(path.getMediaType())
217-
.withAnnotations(layerAnnotations);
218-
layers.add(layer);
219-
LOG.info("Uploaded directory: {}", layer.getDigest());
220-
}
221-
Files.delete(tempArchive.getPath());
222-
} else {
223-
try (InputStream is = Files.newInputStream(path.getPath())) {
224-
if (withDigest) {
225-
ref = ref.withDigest(ref.getAlgorithm().digest(path.getPath()));
226-
}
227-
String title = path.getPath().getFileName().toString();
228-
Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
229-
? annotations.getFileAnnotations(title)
230-
: Map.of(Const.ANNOTATION_TITLE, title);
231-
232-
Layer layer = pushBlob(ref, is)
233-
.withMediaType(path.getMediaType())
234-
.withAnnotations(layerAnnotations);
235-
layers.add(layer);
236-
LOG.info("Uploaded: {}", layer.getDigest());
237-
}
238-
}
239-
} catch (IOException e) {
240-
throw new OrasException("Failed to push artifact", e);
241-
}
176+
try {
177+
return Arrays.stream(paths)
178+
.map(p -> CompletableFuture.supplyAsync(
179+
() -> pushLayer(ref, annotations, withDigest, p), getExecutorService()))
180+
.map(CompletableFuture::join)
181+
.toList();
182+
} catch (CompletionException e) {
183+
throw new OrasException("Failed to push layers", e.getCause());
242184
}
243-
return layers;
244185
}
245186

246187
/**
@@ -307,6 +248,12 @@ public final Manifest attachArtifact(T ref, ArtifactType artifactType, LocalPath
307248
*/
308249
public abstract Tags getTags(T ref);
309250

251+
/**
252+
* Get the executor service for concurrent operations. This is used for concurrent pushing and pulling of layers.
253+
* @return The executor service
254+
*/
255+
public abstract ExecutorService getExecutorService();
256+
310257
/**
311258
* Get the tags for a ref
312259
* @param ref The ref
@@ -483,4 +430,72 @@ public Manifest attachArtifact(T ref, ArtifactType artifactType, Annotations ann
483430
SupportedAlgorithm.getDefault().digest(manifest.toJson().getBytes(StandardCharsets.UTF_8))),
484431
manifest);
485432
}
433+
434+
protected Layer pushLayer(T ref, Annotations annotations, boolean withDigest, LocalPath path) {
435+
try {
436+
// Create tar.gz archive for directory
437+
if (Files.isDirectory(path.getPath())) {
438+
SupportedCompression compression = SupportedCompression.fromMediaType(path.getMediaType());
439+
440+
// If source need to be packed first
441+
boolean autoUnpack = compression.isAutoUnpack();
442+
LocalPath tempSource = autoUnpack ? ArchiveUtils.tar(path) : path;
443+
LocalPath tempArchive = ArchiveUtils.compress(tempSource, path.getMediaType());
444+
445+
if (withDigest) {
446+
ref = ref.withDigest(ref.getAlgorithm().digest(tempArchive.getPath()));
447+
}
448+
try (InputStream is = Files.newInputStream(tempArchive.getPath())) {
449+
String title = path.getPath().isAbsolute()
450+
? path.getPath().getFileName().toString()
451+
: path.getPath().toString();
452+
453+
// We store the filename, based on directory name if we don't auto unpack
454+
if (!autoUnpack) {
455+
title = "%s.%s".formatted(title, compression.getFileExtension());
456+
}
457+
LOG.debug("Uploading directory as archive with title: {}", title);
458+
459+
Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
460+
? annotations.getFileAnnotations(title)
461+
: new LinkedHashMap<>(Map.of(Const.ANNOTATION_TITLE, title));
462+
463+
// Add oras digest/unpack
464+
// For example zip can be packed application/zip but never unpacked by the runtime
465+
// This is convenience method to pack zip layer as directories
466+
if (compression.isAutoUnpack()) {
467+
layerAnnotations.put(
468+
Const.ANNOTATION_ORAS_CONTENT_DIGEST,
469+
ref.getAlgorithm().digest(tempSource.getPath()));
470+
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "true");
471+
} else {
472+
layerAnnotations.put(Const.ANNOTATION_ORAS_UNPACK, "false");
473+
}
474+
475+
Layer layer =
476+
pushBlob(ref, is).withMediaType(path.getMediaType()).withAnnotations(layerAnnotations);
477+
LOG.info("Uploaded directory: {}", layer.getDigest());
478+
Files.delete(tempArchive.getPath());
479+
return layer;
480+
}
481+
} else {
482+
try (InputStream is = Files.newInputStream(path.getPath())) {
483+
if (withDigest) {
484+
ref = ref.withDigest(ref.getAlgorithm().digest(path.getPath()));
485+
}
486+
String title = path.getPath().getFileName().toString();
487+
Map<String, String> layerAnnotations = annotations.hasFileAnnotations(title)
488+
? annotations.getFileAnnotations(title)
489+
: Map.of(Const.ANNOTATION_TITLE, title);
490+
491+
Layer layer =
492+
pushBlob(ref, is).withMediaType(path.getMediaType()).withAnnotations(layerAnnotations);
493+
LOG.info("Uploaded: {}", layer.getDigest());
494+
return layer;
495+
}
496+
}
497+
} catch (IOException e) {
498+
throw new OrasException("Failed to push artifact", e);
499+
}
500+
}
486501
}

src/main/java/land/oras/OCILayout.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.LinkedList;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.Executors;
3234
import java.util.function.Supplier;
3335
import land.oras.exception.OrasException;
3436
import land.oras.utils.Const;
@@ -45,6 +47,8 @@ public final class OCILayout extends OCI<LayoutRef> {
4547
@SuppressWarnings("all")
4648
private final String imageLayoutVersion = "1.0.0";
4749

50+
private final ExecutorService executors = Executors.newSingleThreadExecutor();
51+
4852
/**
4953
* Path on the file system of the OCI Layout
5054
*/
@@ -63,6 +67,11 @@ public static OCILayout.Builder builder() {
6367
return OCILayout.Builder.builder();
6468
}
6569

70+
@Override
71+
public ExecutorService getExecutorService() {
72+
return executors;
73+
}
74+
6675
@Override
6776
public Manifest pushArtifact(
6877
LayoutRef ref,

0 commit comments

Comments
 (0)