Skip to content

Commit 8f80686

Browse files
authored
Allow consumers to provide their executor service (#637)
Signed-off-by: Valentin Delaye <jonesbusy@users.noreply.github.com>
1 parent ae503e3 commit 8f80686

3 files changed

Lines changed: 40 additions & 10 deletions

File tree

src/main/java/land/oras/Registry.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public final class Registry extends OCI<ContainerRef> {
6666
/**
6767
* The executor service for parallel operations
6868
*/
69-
private ExecutorService executors;
69+
private ExecutorService executorService;
7070

7171
/**
7272
* The registries configuration loaded from the environment
@@ -139,6 +139,14 @@ private void setParallelism(int maxConcurrentDownloads) {
139139
this.maxConcurrentDownloads = maxConcurrentDownloads;
140140
}
141141

142+
/**
143+
* Allow consumer to set custom executor service for parallel operations. If not set, a default one will be created with the given parallelism
144+
* @param executorService The executor service
145+
*/
146+
private void setExecutorService(ExecutorService executorService) {
147+
this.executorService = executorService;
148+
}
149+
142150
/**
143151
* Return if this registry is insecure
144152
* @return True if insecure
@@ -177,11 +185,13 @@ private void setSkipTlsVerify(boolean skipTlsVerify) {
177185
*/
178186
private Registry build() {
179187
client = HttpClient.Builder.builder().withSkipTlsVerify(skipTlsVerify).build();
180-
executors = Executors.newFixedThreadPool(maxConcurrentDownloads, r -> {
181-
Thread t = new Thread(r);
182-
t.setName("layer-transfer-worker-%d".formatted(t.getId()));
183-
return t;
184-
});
188+
if (executorService == null) {
189+
executorService = Executors.newFixedThreadPool(maxConcurrentDownloads, r -> {
190+
Thread t = new Thread(r);
191+
t.setName("layer-transfer-worker-%d".formatted(t.getId()));
192+
return t;
193+
});
194+
}
185195
return this;
186196
}
187197

@@ -220,7 +230,7 @@ public Registry asInsecure() {
220230

221231
@Override
222232
public ExecutorService getExecutorService() {
223-
return executors;
233+
return executorService;
224234
}
225235

226236
@Override
@@ -1108,6 +1118,7 @@ public Builder from(Registry registry) {
11081118
this.registry.setInsecure(registry.insecure);
11091119
this.registry.setRegistry(registry.registry);
11101120
this.registry.setSkipTlsVerify(registry.skipTlsVerify);
1121+
this.registry.setExecutorService(registry.executorService);
11111122
this.registry.setParallelism(registry.maxConcurrentDownloads);
11121123
return this;
11131124
}
@@ -1195,6 +1206,17 @@ public Builder withParallelism(int parallelism) {
11951206
return this;
11961207
}
11971208

1209+
/**
1210+
* Set the executor service to use for parallel uploads/downloads. By default it uses a parallelism level given by withParallelism() and a fixed thread pool.
1211+
* Only uses for layers upload/download, not for manifest or index upload/download.
1212+
* @param executorService The executor service
1213+
* @return The builder
1214+
*/
1215+
public Builder withExecutorService(ExecutorService executorService) {
1216+
registry.setExecutorService(executorService);
1217+
return this;
1218+
}
1219+
11981220
/**
11991221
* Set the insecure flag
12001222
* @param insecure Insecure

src/test/java/land/oras/DockerIoITCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ void shouldCopyTagToInternalRegistry() {
107107

108108
// Source registry
109109
Registry sourceRegistry =
110-
Registry.Builder.builder().withParallelism(10).defaults().build();
110+
Registry.Builder.builder().withParallelism(3).defaults().build();
111111

112112
// Copy to this internal registry
113113
Registry targetRegistry = Registry.Builder.builder()
114114
.defaults("myuser", "mypass")
115-
.withParallelism(10)
115+
.withParallelism(3)
116116
.withInsecure(true)
117117
.build();
118118

src/test/java/land/oras/RegistryTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.List;
3333
import java.util.Map;
3434
import java.util.Random;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
3537
import land.oras.exception.OrasException;
3638
import land.oras.utils.Const;
3739
import land.oras.utils.RegistryContainer;
@@ -325,6 +327,8 @@ void shouldPushUnsecure() {
325327
@Execution(ExecutionMode.SAME_THREAD)
326328
void shouldPushPullManifestsAndBlobsByUsingConfig(@TempDir Path homeDir) throws Exception {
327329

330+
ExecutorService customExecutor = Executors.newSingleThreadExecutor();
331+
328332
// language=toml
329333
String config =
330334
"""
@@ -336,7 +340,9 @@ void shouldPushPullManifestsAndBlobsByUsingConfig(@TempDir Path homeDir) throws
336340
TestUtils.createRegistriesConfFile(homeDir, config);
337341

338342
TestUtils.withHome(homeDir, () -> {
339-
Registry registry = Registry.Builder.builder().build(); // Use default
343+
Registry registry = Registry.Builder.builder()
344+
.withExecutorService(customExecutor)
345+
.build();
340346
ContainerRef containerRef = ContainerRef.parse(
341347
"%s/library/artifact-text-manifest-blobs".formatted(this.unsecureRegistry.getRegistry()));
342348

@@ -370,6 +376,8 @@ void shouldPushPullManifestsAndBlobsByUsingConfig(@TempDir Path homeDir) throws
370376
registry.deleteManifest(containerRef);
371377
registry.deleteBlob(containerRef.withDigest(otherDigest));
372378
});
379+
380+
customExecutor.shutdown();
373381
}
374382

375383
@Test

0 commit comments

Comments
 (0)