Skip to content

Commit 4c51b7e

Browse files
committed
Asyncify slot suppliers
1 parent 59bbabb commit 4c51b7e

11 files changed

Lines changed: 197 additions & 85 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import io.temporal.worker.MetricsType;
3636
import io.temporal.worker.tuning.*;
3737
import java.util.Objects;
38+
import java.util.concurrent.CompletableFuture;
39+
import java.util.concurrent.ExecutionException;
3840
import java.util.function.Supplier;
3941
import javax.annotation.Nonnull;
4042
import javax.annotation.Nullable;
@@ -96,18 +98,19 @@ public ActivityTask poll() {
9698
PollActivityTaskQueueResponse response;
9799
SlotPermit permit;
98100
boolean isSuccessful = false;
99-
101+
CompletableFuture<SlotPermit> future =
102+
slotSupplier.reserveSlot(
103+
new SlotReservationData(
104+
pollRequest.getTaskQueue().getName(),
105+
pollRequest.getIdentity(),
106+
pollRequest.getWorkerVersionCapabilities().getBuildId()));
100107
try {
101-
permit =
102-
slotSupplier.reserveSlot(
103-
new SlotReservationData(
104-
pollRequest.getTaskQueue().getName(),
105-
pollRequest.getIdentity(),
106-
pollRequest.getWorkerVersionCapabilities().getBuildId()));
108+
permit = future.get();
107109
} catch (InterruptedException e) {
110+
future.cancel(true);
108111
Thread.currentThread().interrupt();
109112
return null;
110-
} catch (Exception e) {
113+
} catch (ExecutionException e) {
111114
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
112115
return null;
113116
}

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,22 @@ private void processQueue() {
8383
QueuedLARequest request = null;
8484
try {
8585
request = requestQueue.take();
86+
87+
CompletableFuture<SlotPermit> future = slotSupplier.reserveSlot(request.data);
8688
try {
87-
slotPermit = slotSupplier.reserveSlot(request.data);
89+
slotPermit = future.get();
8890
} catch (InterruptedException e) {
91+
future.cancel(true);
8992
Thread.currentThread().interrupt();
9093
return;
91-
} catch (Exception e) {
94+
} catch (ExecutionException e) {
9295
log.error(
9396
"Error reserving local activity slot, dropped activity id {}",
9497
request.task.getActivityId(),
9598
e);
9699
continue;
97100
}
101+
98102
request.task.getExecutionContext().setPermit(slotPermit);
99103
afterReservedCallback.apply(request.task);
100104
} catch (InterruptedException e) {

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import io.temporal.worker.MetricsType;
3333
import io.temporal.worker.tuning.*;
3434
import java.util.Objects;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ExecutionException;
3537
import java.util.function.Supplier;
3638
import javax.annotation.Nonnull;
3739
import javax.annotation.Nullable;
@@ -86,18 +88,19 @@ public NexusTask poll() {
8688
PollNexusTaskQueueResponse response;
8789
SlotPermit permit;
8890
boolean isSuccessful = false;
89-
91+
CompletableFuture<SlotPermit> future =
92+
slotSupplier.reserveSlot(
93+
new SlotReservationData(
94+
pollRequest.getTaskQueue().getName(),
95+
pollRequest.getIdentity(),
96+
pollRequest.getWorkerVersionCapabilities().getBuildId()));
9097
try {
91-
permit =
92-
slotSupplier.reserveSlot(
93-
new SlotReservationData(
94-
pollRequest.getTaskQueue().getName(),
95-
pollRequest.getIdentity(),
96-
pollRequest.getWorkerVersionCapabilities().getBuildId()));
98+
permit = future.get();
9799
} catch (InterruptedException e) {
100+
future.cancel(true);
98101
Thread.currentThread().interrupt();
99102
return null;
100-
} catch (Exception e) {
103+
} catch (ExecutionException e) {
101104
log.warn("Error while trying to reserve a slot for a nexus task", e.getCause());
102105
return null;
103106
}

temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Collections;
2727
import java.util.Map;
2828
import java.util.Optional;
29+
import java.util.concurrent.CompletableFuture;
2930
import java.util.concurrent.ConcurrentHashMap;
3031
import java.util.concurrent.atomic.AtomicInteger;
3132

@@ -48,10 +49,10 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
4849
publishSlotsMetric();
4950
}
5051

51-
public SlotPermit reserveSlot(SlotReservationData dat) throws InterruptedException {
52-
SlotPermit p = inner.reserveSlot(createCtx(dat));
53-
issuedSlots.incrementAndGet();
54-
return p;
52+
public CompletableFuture<SlotPermit> reserveSlot(SlotReservationData dat) {
53+
CompletableFuture<SlotPermit> future = inner.reserveSlot(createCtx(dat));
54+
future.thenAccept(permit -> issuedSlots.incrementAndGet());
55+
return future;
5556
}
5657

5758
public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,12 @@
3535
import io.temporal.serviceclient.MetricsTag;
3636
import io.temporal.serviceclient.WorkflowServiceStubs;
3737
import io.temporal.worker.MetricsType;
38-
import io.temporal.worker.tuning.*;
38+
import io.temporal.worker.tuning.SlotPermit;
39+
import io.temporal.worker.tuning.SlotReleaseReason;
40+
import io.temporal.worker.tuning.WorkflowSlotInfo;
3941
import java.util.Objects;
42+
import java.util.concurrent.CompletableFuture;
43+
import java.util.concurrent.ExecutionException;
4044
import java.util.function.Supplier;
4145
import javax.annotation.Nonnull;
4246
import javax.annotation.Nullable;
@@ -123,17 +127,19 @@ public WorkflowPollTask(
123127
public WorkflowTask poll() {
124128
boolean isSuccessful = false;
125129
SlotPermit permit;
130+
CompletableFuture<SlotPermit> future =
131+
slotSupplier.reserveSlot(
132+
new SlotReservationData(
133+
pollRequest.getTaskQueue().getName(),
134+
pollRequest.getIdentity(),
135+
pollRequest.getWorkerVersionCapabilities().getBuildId()));
126136
try {
127-
permit =
128-
slotSupplier.reserveSlot(
129-
new SlotReservationData(
130-
pollRequest.getTaskQueue().getName(),
131-
pollRequest.getIdentity(),
132-
pollRequest.getWorkerVersionCapabilities().getBuildId()));
137+
permit = future.get();
133138
} catch (InterruptedException e) {
139+
future.cancel(true);
134140
Thread.currentThread().interrupt();
135141
return null;
136-
} catch (Exception e) {
142+
} catch (ExecutionException e) {
137143
log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
138144
return null;
139145
}

temporal-sdk/src/main/java/io/temporal/worker/tuning/FixedSizeSlotSupplier.java

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
package io.temporal.worker.tuning;
2222

2323
import com.google.common.base.Preconditions;
24+
import java.util.ArrayDeque;
2425
import java.util.Optional;
25-
import java.util.concurrent.*;
26+
import java.util.Queue;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.locks.ReentrantLock;
2629

2730
/**
2831
* This implementation of {@link SlotSupplier} provides a fixed number of slots backed by a
@@ -32,18 +35,83 @@
3235
*/
3336
public class FixedSizeSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
3437
private final int numSlots;
35-
private final Semaphore executorSlotsSemaphore;
38+
private final AsyncSemaphore executorSlotsSemaphore;
39+
40+
/**
41+
* A simple version of an async semaphore. Unfortunately there's not any readily available
42+
* properly licensed library I could find for this which is a bit shocking, but this
43+
* implementation should be suitable for our needs
44+
*/
45+
static class AsyncSemaphore {
46+
private final ReentrantLock lock = new ReentrantLock();
47+
private final Queue<CompletableFuture<Void>> waiters = new ArrayDeque<>();
48+
private int permits;
49+
50+
AsyncSemaphore(int initialPermits) {
51+
this.permits = initialPermits;
52+
}
53+
54+
/**
55+
* Acquire a permit asynchronously. If a permit is available, returns a completed future,
56+
* otherwise returns a future that will be completed when a permit is released.
57+
*/
58+
public CompletableFuture<Void> acquire() {
59+
lock.lock();
60+
try {
61+
if (permits > 0) {
62+
permits--;
63+
return CompletableFuture.completedFuture(null);
64+
} else {
65+
CompletableFuture<Void> waiter = new CompletableFuture<>();
66+
waiters.add(waiter);
67+
return waiter;
68+
}
69+
} finally {
70+
lock.unlock();
71+
}
72+
}
73+
74+
public boolean tryAcquire() {
75+
lock.lock();
76+
try {
77+
if (permits > 0) {
78+
permits--;
79+
return true;
80+
}
81+
return false;
82+
} finally {
83+
lock.unlock();
84+
}
85+
}
86+
87+
/**
88+
* Release a permit. If there are waiting futures, completes the next one instead of
89+
* incrementing the permit count.
90+
*/
91+
public void release() {
92+
lock.lock();
93+
try {
94+
CompletableFuture<Void> waiter = waiters.poll();
95+
if (waiter != null) {
96+
waiter.complete(null);
97+
} else {
98+
permits++;
99+
}
100+
} finally {
101+
lock.unlock();
102+
}
103+
}
104+
}
36105

37106
public FixedSizeSlotSupplier(int numSlots) {
38107
Preconditions.checkArgument(numSlots > 0, "FixedSizeSlotSupplier must have at least one slot");
39108
this.numSlots = numSlots;
40-
executorSlotsSemaphore = new Semaphore(numSlots);
109+
executorSlotsSemaphore = new AsyncSemaphore(numSlots);
41110
}
42111

43112
@Override
44-
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
45-
executorSlotsSemaphore.acquire();
46-
return new SlotPermit();
113+
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) {
114+
return executorSlotsSemaphore.acquire().thenApply(ignored -> new SlotPermit());
47115
}
48116

49117
@Override

temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.time.Duration;
2525
import java.time.Instant;
2626
import java.util.Optional;
27+
import java.util.concurrent.*;
2728

2829
/** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */
2930
@Experimental
@@ -32,6 +33,18 @@ public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSuppl
3233
private final ResourceBasedController resourceController;
3334
private final ResourceBasedSlotOptions options;
3435
private Instant lastSlotIssuedAt = Instant.EPOCH;
36+
// For slot reservations that are waiting to re-check resource usage
37+
private static final ScheduledExecutorService scheduler =
38+
Executors.newScheduledThreadPool(
39+
// Two threads seem needed here, so that reading PID decisions doesn't interfere overly
40+
// with firing off scheduled tasks or one another.
41+
2,
42+
r -> {
43+
Thread t = new Thread(r);
44+
t.setName("ResourceBasedSlotSupplier.scheduler");
45+
t.setDaemon(true);
46+
return t;
47+
});
3548

3649
/**
3750
* Construct a slot supplier for workflow tasks with the given resource controller and options.
@@ -139,29 +152,43 @@ private ResourceBasedSlotSupplier(
139152
}
140153

141154
@Override
142-
public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
143-
while (true) {
144-
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
145-
return new SlotPermit();
146-
} else {
147-
Duration mustWaitFor;
148-
try {
149-
mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
150-
} catch (ArithmeticException e) {
151-
mustWaitFor = Duration.ZERO;
152-
}
153-
if (mustWaitFor.compareTo(Duration.ZERO) > 0) {
154-
Thread.sleep(mustWaitFor.toMillis());
155-
}
156-
157-
Optional<SlotPermit> permit = tryReserveSlot(ctx);
158-
if (permit.isPresent()) {
159-
return permit.get();
160-
} else {
161-
Thread.sleep(10);
162-
}
163-
}
155+
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) {
156+
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
157+
return CompletableFuture.completedFuture(new SlotPermit());
164158
}
159+
return tryReserveSlot(ctx)
160+
.map(CompletableFuture::completedFuture)
161+
.orElseGet(() -> scheduleSlotAcquisition(ctx));
162+
}
163+
164+
private CompletableFuture<SlotPermit> scheduleSlotAcquisition(SlotReserveContext<SI> ctx) {
165+
Duration mustWaitFor;
166+
try {
167+
mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
168+
} catch (ArithmeticException e) {
169+
mustWaitFor = Duration.ZERO;
170+
}
171+
172+
CompletableFuture<Void> permitFuture;
173+
if (mustWaitFor.compareTo(Duration.ZERO) > 0) {
174+
permitFuture =
175+
CompletableFuture.supplyAsync(() -> null, delayedExecutor(mustWaitFor.toMillis()));
176+
} else {
177+
permitFuture = CompletableFuture.completedFuture(null);
178+
}
179+
180+
// After the delay, try to reserve the slot
181+
return permitFuture.thenCompose(
182+
ignored -> {
183+
Optional<SlotPermit> permit = tryReserveSlot(ctx);
184+
// If we couldn't get a slot this time, delay for a short period and try again
185+
return permit
186+
.map(CompletableFuture::completedFuture)
187+
.orElseGet(
188+
() ->
189+
CompletableFuture.supplyAsync(() -> null, delayedExecutor(10))
190+
.thenCompose(ig -> scheduleSlotAcquisition(ctx)));
191+
});
165192
}
166193

167194
@Override
@@ -190,4 +217,9 @@ public ResourceBasedController getResourceController() {
190217
private Duration timeSinceLastSlotIssued() {
191218
return Duration.between(lastSlotIssuedAt, Instant.now());
192219
}
220+
221+
// Polyfill for Java 9 delayedExecutor
222+
private static Executor delayedExecutor(long delay) {
223+
return r -> scheduler.schedule(() -> scheduler.execute(r), delay, TimeUnit.MILLISECONDS);
224+
}
193225
}

0 commit comments

Comments
 (0)