Skip to content

Commit 6b319a9

Browse files
committed
Review feedback
1 parent 4c51b7e commit 6b319a9

8 files changed

Lines changed: 175 additions & 39 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
5050
}
5151

5252
public CompletableFuture<SlotPermit> reserveSlot(SlotReservationData dat) {
53-
CompletableFuture<SlotPermit> future = inner.reserveSlot(createCtx(dat));
53+
CompletableFuture<SlotPermit> future = null;
54+
try {
55+
future = inner.reserveSlot(createCtx(dat));
56+
} catch (Exception e) {
57+
throw new RuntimeException(e);
58+
}
5459
future.thenAccept(permit -> issuedSlots.incrementAndGet());
5560
return future;
5661
}

temporal-sdk/src/main/java/io/temporal/worker/tuning/FixedSizeSlotSupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public FixedSizeSlotSupplier(int numSlots) {
110110
}
111111

112112
@Override
113-
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) {
113+
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
114114
return executorSlotsSemaphore.acquire().thenApply(ignored -> new SlotPermit());
115115
}
116116

temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java

Lines changed: 87 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,8 @@ public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSuppl
3434
private final ResourceBasedSlotOptions options;
3535
private Instant lastSlotIssuedAt = Instant.EPOCH;
3636
// For slot reservations that are waiting to re-check resource usage
37-
private static final ScheduledExecutorService scheduler =
38-
Executors.newScheduledThreadPool(
39-
// Two threads seem needed here, so that reading PID decisions doesn't interfere overly
40-
// with firing off scheduled tasks or one another.
41-
2,
42-
r -> {
43-
Thread t = new Thread(r);
44-
t.setName("ResourceBasedSlotSupplier.scheduler");
45-
t.setDaemon(true);
46-
return t;
47-
});
37+
private final ScheduledExecutorService scheduler;
38+
private static ScheduledExecutorService defaultScheduler;
4839

4940
/**
5041
* Construct a slot supplier for workflow tasks with the given resource controller and options.
@@ -55,7 +46,20 @@ public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSuppl
5546
public static ResourceBasedSlotSupplier<WorkflowSlotInfo> createForWorkflow(
5647
ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
5748
return new ResourceBasedSlotSupplier<>(
58-
WorkflowSlotInfo.class, resourceBasedController, options);
49+
WorkflowSlotInfo.class, resourceBasedController, options, null);
50+
}
51+
52+
/**
53+
* As {@link #createForWorkflow(ResourceBasedController, ResourceBasedSlotOptions)}, but allows
54+
* overriding the internal thread pool. It is recommended to share the same executor across all
55+
* resource based slot suppliers in a worker.
56+
*/
57+
public static ResourceBasedSlotSupplier<WorkflowSlotInfo> createForWorkflow(
58+
ResourceBasedController resourceBasedController,
59+
ResourceBasedSlotOptions options,
60+
ScheduledExecutorService scheduler) {
61+
return new ResourceBasedSlotSupplier<>(
62+
WorkflowSlotInfo.class, resourceBasedController, options, scheduler);
5963
}
6064

6165
/**
@@ -67,7 +71,20 @@ public static ResourceBasedSlotSupplier<WorkflowSlotInfo> createForWorkflow(
6771
public static ResourceBasedSlotSupplier<ActivitySlotInfo> createForActivity(
6872
ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
6973
return new ResourceBasedSlotSupplier<>(
70-
ActivitySlotInfo.class, resourceBasedController, options);
74+
ActivitySlotInfo.class, resourceBasedController, options, null);
75+
}
76+
77+
/**
78+
* As {@link #createForActivity(ResourceBasedController, ResourceBasedSlotOptions)}, but allows
79+
* overriding the internal thread pool. It is recommended to share the same executor across all
80+
* resource based slot suppliers in a worker.
81+
*/
82+
public static ResourceBasedSlotSupplier<ActivitySlotInfo> createForActivity(
83+
ResourceBasedController resourceBasedController,
84+
ResourceBasedSlotOptions options,
85+
ScheduledExecutorService scheduler) {
86+
return new ResourceBasedSlotSupplier<>(
87+
ActivitySlotInfo.class, resourceBasedController, options, scheduler);
7188
}
7289

7390
/**
@@ -79,7 +96,20 @@ public static ResourceBasedSlotSupplier<ActivitySlotInfo> createForActivity(
7996
public static ResourceBasedSlotSupplier<LocalActivitySlotInfo> createForLocalActivity(
8097
ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
8198
return new ResourceBasedSlotSupplier<>(
82-
LocalActivitySlotInfo.class, resourceBasedController, options);
99+
LocalActivitySlotInfo.class, resourceBasedController, options, null);
100+
}
101+
102+
/**
103+
* As {@link #createForLocalActivity(ResourceBasedController, ResourceBasedSlotOptions)}, but
104+
* allows overriding the internal thread pool. It is recommended to share the same executor across
105+
* all resource based slot suppliers in a worker.
106+
*/
107+
public static ResourceBasedSlotSupplier<LocalActivitySlotInfo> createForLocalActivity(
108+
ResourceBasedController resourceBasedController,
109+
ResourceBasedSlotOptions options,
110+
ScheduledExecutorService scheduler) {
111+
return new ResourceBasedSlotSupplier<>(
112+
LocalActivitySlotInfo.class, resourceBasedController, options, scheduler);
83113
}
84114

85115
/**
@@ -90,14 +120,34 @@ public static ResourceBasedSlotSupplier<LocalActivitySlotInfo> createForLocalAct
90120
*/
91121
public static ResourceBasedSlotSupplier<NexusSlotInfo> createForNexus(
92122
ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
93-
return new ResourceBasedSlotSupplier<>(NexusSlotInfo.class, resourceBasedController, options);
123+
return new ResourceBasedSlotSupplier<>(
124+
NexusSlotInfo.class, resourceBasedController, options, null);
125+
}
126+
127+
/**
128+
* As {@link #createForNexus(ResourceBasedController, ResourceBasedSlotOptions)}, but allows
129+
* overriding the internal thread pool. It is recommended to share the same executor across all
130+
* resource based slot suppliers in a worker.
131+
*/
132+
public static ResourceBasedSlotSupplier<NexusSlotInfo> createForNexus(
133+
ResourceBasedController resourceBasedController,
134+
ResourceBasedSlotOptions options,
135+
ScheduledExecutorService scheduler) {
136+
return new ResourceBasedSlotSupplier<>(
137+
NexusSlotInfo.class, resourceBasedController, options, scheduler);
94138
}
95139

96140
private ResourceBasedSlotSupplier(
97141
Class<SI> clazz,
98142
ResourceBasedController resourceBasedController,
99-
ResourceBasedSlotOptions options) {
143+
ResourceBasedSlotOptions options,
144+
ScheduledExecutorService scheduler) {
100145
this.resourceController = resourceBasedController;
146+
if (scheduler == null) {
147+
this.scheduler = getDefaultScheduler();
148+
} else {
149+
this.scheduler = scheduler;
150+
}
101151
// Merge default options for any unset fields
102152
if (WorkflowSlotInfo.class.isAssignableFrom(clazz)) {
103153
this.options =
@@ -152,7 +202,7 @@ private ResourceBasedSlotSupplier(
152202
}
153203

154204
@Override
155-
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) {
205+
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
156206
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
157207
return CompletableFuture.completedFuture(new SlotPermit());
158208
}
@@ -219,7 +269,26 @@ private Duration timeSinceLastSlotIssued() {
219269
}
220270

221271
// Polyfill for Java 9 delayedExecutor
222-
private static Executor delayedExecutor(long delay) {
272+
private Executor delayedExecutor(long delay) {
223273
return r -> scheduler.schedule(() -> scheduler.execute(r), delay, TimeUnit.MILLISECONDS);
224274
}
275+
276+
private static ScheduledExecutorService getDefaultScheduler() {
277+
synchronized (ResourceBasedSlotSupplier.class) {
278+
if (defaultScheduler == null) {
279+
defaultScheduler =
280+
Executors.newScheduledThreadPool(
281+
// Two threads seem needed here, so that reading PID decisions doesn't interfere
282+
// overly with firing off scheduled tasks or one another.
283+
2,
284+
r -> {
285+
Thread t = new Thread(r);
286+
t.setName("ResourceBasedSlotSupplier.scheduler");
287+
t.setDaemon(true);
288+
return t;
289+
});
290+
}
291+
return defaultScheduler;
292+
}
293+
}
225294
}

temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedTuner.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import io.temporal.common.Experimental;
2424
import java.time.Duration;
25+
import java.util.concurrent.ScheduledExecutorService;
2526
import javax.annotation.Nonnull;
2627

2728
/** A {@link WorkerTuner} that attempts to allocate slots based on available system resources. */
@@ -51,6 +52,7 @@ public class ResourceBasedTuner implements WorkerTuner {
5152
private final ResourceBasedSlotOptions activitySlotOptions;
5253
private final ResourceBasedSlotOptions localActivitySlotOptions;
5354
private final ResourceBasedSlotOptions nexusSlotOptions;
55+
private final ScheduledExecutorService executor;
5456

5557
public static Builder newBuilder() {
5658
return new Builder();
@@ -63,6 +65,7 @@ public static final class Builder {
6365
private @Nonnull ResourceBasedSlotOptions localActivitySlotOptions =
6466
DEFAULT_ACTIVITY_SLOT_OPTIONS;
6567
private @Nonnull ResourceBasedSlotOptions nexusSlotOptions = DEFAULT_NEXUS_SLOT_OPTIONS;
68+
private @Nonnull ScheduledExecutorService executor;
6669

6770
private Builder() {}
6871

@@ -115,13 +118,23 @@ public Builder setNexusSlotOptions(@Nonnull ResourceBasedSlotOptions nexusSlotOp
115118
return this;
116119
}
117120

121+
/**
122+
* Set the executor used for checking resource usage periodically. Defaults to a two-thread
123+
* pool.
124+
*/
125+
public Builder setExecutor(@Nonnull ScheduledExecutorService executor) {
126+
this.executor = executor;
127+
return this;
128+
}
129+
118130
public ResourceBasedTuner build() {
119131
return new ResourceBasedTuner(
120132
controllerOptions,
121133
workflowSlotOptions,
122134
activitySlotOptions,
123135
localActivitySlotOptions,
124-
nexusSlotOptions);
136+
nexusSlotOptions,
137+
executor);
125138
}
126139
}
127140

@@ -133,35 +146,38 @@ public ResourceBasedTuner(
133146
ResourceBasedSlotOptions workflowSlotOptions,
134147
ResourceBasedSlotOptions activitySlotOptions,
135148
ResourceBasedSlotOptions localActivitySlotOptions,
136-
ResourceBasedSlotOptions nexusSlotOptions) {
149+
ResourceBasedSlotOptions nexusSlotOptions,
150+
ScheduledExecutorService executor) {
137151
this.controller = ResourceBasedController.newSystemInfoController(controllerOptions);
138152
this.workflowSlotOptions = workflowSlotOptions;
139153
this.activitySlotOptions = activitySlotOptions;
140154
this.localActivitySlotOptions = localActivitySlotOptions;
141155
this.nexusSlotOptions = nexusSlotOptions;
156+
this.executor = executor;
142157
}
143158

144159
@Nonnull
145160
@Override
146161
public SlotSupplier<WorkflowSlotInfo> getWorkflowTaskSlotSupplier() {
147-
return ResourceBasedSlotSupplier.createForWorkflow(controller, workflowSlotOptions);
162+
return ResourceBasedSlotSupplier.createForWorkflow(controller, workflowSlotOptions, executor);
148163
}
149164

150165
@Nonnull
151166
@Override
152167
public SlotSupplier<ActivitySlotInfo> getActivityTaskSlotSupplier() {
153-
return ResourceBasedSlotSupplier.createForActivity(controller, activitySlotOptions);
168+
return ResourceBasedSlotSupplier.createForActivity(controller, activitySlotOptions, executor);
154169
}
155170

156171
@Nonnull
157172
@Override
158173
public SlotSupplier<LocalActivitySlotInfo> getLocalActivitySlotSupplier() {
159-
return ResourceBasedSlotSupplier.createForLocalActivity(controller, localActivitySlotOptions);
174+
return ResourceBasedSlotSupplier.createForLocalActivity(
175+
controller, localActivitySlotOptions, executor);
160176
}
161177

162178
@Nonnull
163179
@Override
164180
public SlotSupplier<NexusSlotInfo> getNexusSlotSupplier() {
165-
return ResourceBasedSlotSupplier.createForNexus(controller, nexusSlotOptions);
181+
return ResourceBasedSlotSupplier.createForNexus(controller, nexusSlotOptions, executor);
166182
}
167183
}

temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotSupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ public interface SlotSupplier<SI extends SlotInfo> {
4343
* <p>These futures may be cancelled if the worker is shutting down or otherwise abandons the
4444
* reservation. This can cause an {@link InterruptedException} to be thrown, in the thread running
4545
* your implementation. You may want to catch it to perform any necessary cleanup, and then you
46-
* should rethrow the exception.
46+
* should rethrow the exception. Other thrown exceptions will be logged.
4747
*
4848
* @param ctx The context for slot reservation.
4949
* @return A future that will be completed with a permit to use the slot when one becomes
5050
* available. Never return null, or complete the future with null.
5151
*/
52-
CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx);
52+
CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception;
5353

5454
/**
5555
* This function is called when trying to reserve slots for "eager" workflow and activity tasks.

temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,17 @@ public void supplierIsCalledAppropriately() {
7171

7272
SlotSupplier<WorkflowSlotInfo> mockSupplier = mock(SlotSupplier.class);
7373
AtomicInteger usedSlotsWhenCalled = new AtomicInteger(-1);
74-
when(mockSupplier.reserveSlot(
75-
argThat(
76-
src -> {
77-
usedSlotsWhenCalled.set(src.getUsedSlots().size());
78-
return true;
79-
})))
80-
.thenReturn(CompletableFuture.completedFuture(new SlotPermit()));
74+
try {
75+
when(mockSupplier.reserveSlot(
76+
argThat(
77+
src -> {
78+
usedSlotsWhenCalled.set(src.getUsedSlots().size());
79+
return true;
80+
})))
81+
.thenReturn(CompletableFuture.completedFuture(new SlotPermit()));
82+
} catch (Exception e) {
83+
throw new RuntimeException(e);
84+
}
8185

8286
StickyQueueBalancer stickyQueueBalancer = new StickyQueueBalancer(5, true);
8387
Scope metricsScope =
@@ -119,7 +123,11 @@ public void supplierIsCalledAppropriately() {
119123

120124
if (throwOnPoll) {
121125
assertThrows(RuntimeException.class, poller::poll);
122-
verify(mockSupplier, times(1)).reserveSlot(any());
126+
try {
127+
verify(mockSupplier, times(1)).reserveSlot(any());
128+
} catch (Exception e) {
129+
throw new RuntimeException(e);
130+
}
123131
verify(mockSupplier, times(1)).releaseSlot(any());
124132
assertEquals(0, trackingSS.getUsedSlots().size());
125133
} else {
@@ -128,8 +136,12 @@ public void supplierIsCalledAppropriately() {
128136
// We can't test this in the verifier, since it will get an up-to-date reference to the map
129137
// where the slot *is* used.
130138
assertEquals(0, usedSlotsWhenCalled.get());
131-
verify(mockSupplier, times(1))
132-
.reserveSlot(argThat(arg -> Objects.equals(arg.getTaskQueue(), TASK_QUEUE)));
139+
try {
140+
verify(mockSupplier, times(1))
141+
.reserveSlot(argThat(arg -> Objects.equals(arg.getTaskQueue(), TASK_QUEUE)));
142+
} catch (Exception e) {
143+
throw new RuntimeException(e);
144+
}
133145
verify(mockSupplier, times(0)).releaseSlot(any());
134146
assertEquals(1, trackingSS.getUsedSlots().size());
135147
}

temporal-sdk/src/test/java/io/temporal/testUtils/CountingSlotSupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public CountingSlotSupplier(int numSlots) {
3838
}
3939

4040
@Override
41-
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) {
41+
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
4242
CompletableFuture<SlotPermit> p = super.reserveSlot(ctx);
4343
return p.thenApply(
4444
permit -> {

0 commit comments

Comments
 (0)