Skip to content

Commit a76b6e8

Browse files
committed
Use a custom future type for slot suppliers
1 parent 8727900 commit a76b6e8

13 files changed

Lines changed: 256 additions & 154 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import io.temporal.worker.MetricsType;
3636
import io.temporal.worker.tuning.*;
3737
import java.util.Objects;
38-
import java.util.concurrent.ExecutionException;
39-
import java.util.concurrent.Future;
4038
import java.util.function.Supplier;
4139
import javax.annotation.Nonnull;
4240
import javax.annotation.Nullable;
@@ -97,8 +95,8 @@ public ActivityTask poll() {
9795
}
9896
PollActivityTaskQueueResponse response;
9997
SlotPermit permit;
98+
SlotSupplierFuture future;
10099
boolean isSuccessful = false;
101-
Future<SlotPermit> future;
102100
try {
103101
future =
104102
slotSupplier.reserveSlot(
@@ -110,16 +108,8 @@ public ActivityTask poll() {
110108
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
111109
return null;
112110
}
113-
try {
114-
permit = future.get();
115-
} catch (InterruptedException e) {
116-
future.cancel(true);
117-
Thread.currentThread().interrupt();
118-
return null;
119-
} catch (ExecutionException e) {
120-
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
121-
return null;
122-
}
111+
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
112+
if (permit == null) return null;
123113

124114
try {
125115
response =

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.temporal.worker.tuning.LocalActivitySlotInfo;
2424
import io.temporal.worker.tuning.SlotPermit;
2525
import io.temporal.worker.tuning.SlotReleaseReason;
26+
import io.temporal.worker.tuning.SlotSupplierFuture;
2627
import io.temporal.workflow.Functions;
2728
import java.util.concurrent.*;
2829
import javax.annotation.Nullable;
@@ -84,11 +85,14 @@ private void processQueue() {
8485
try {
8586
request = requestQueue.take();
8687

87-
Future<SlotPermit> future = slotSupplier.reserveSlot(request.data);
88+
SlotSupplierFuture future = slotSupplier.reserveSlot(request.data);
8889
try {
8990
slotPermit = future.get();
9091
} catch (InterruptedException e) {
91-
future.cancel(true);
92+
SlotPermit maybePermitAnyway = future.abortReservation();
93+
if (maybePermitAnyway != null) {
94+
slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), maybePermitAnyway);
95+
}
9296
Thread.currentThread().interrupt();
9397
return;
9498
} catch (ExecutionException e) {

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import io.temporal.worker.MetricsType;
3333
import io.temporal.worker.tuning.*;
3434
import java.util.Objects;
35-
import java.util.concurrent.ExecutionException;
36-
import java.util.concurrent.Future;
3735
import java.util.function.Supplier;
3836
import javax.annotation.Nonnull;
3937
import javax.annotation.Nullable;
@@ -87,23 +85,21 @@ public NexusTask poll() {
8785
}
8886
PollNexusTaskQueueResponse response;
8987
SlotPermit permit;
88+
SlotSupplierFuture future;
9089
boolean isSuccessful = false;
91-
Future<SlotPermit> future =
92-
slotSupplier.reserveSlot(
93-
new SlotReservationData(
94-
pollRequest.getTaskQueue().getName(),
95-
pollRequest.getIdentity(),
96-
pollRequest.getWorkerVersionCapabilities().getBuildId()));
9790
try {
98-
permit = future.get();
99-
} catch (InterruptedException e) {
100-
future.cancel(true);
101-
Thread.currentThread().interrupt();
102-
return null;
103-
} catch (ExecutionException e) {
91+
future =
92+
slotSupplier.reserveSlot(
93+
new SlotReservationData(
94+
pollRequest.getTaskQueue().getName(),
95+
pollRequest.getIdentity(),
96+
pollRequest.getWorkerVersionCapabilities().getBuildId()));
97+
} catch (Exception e) {
10498
log.warn("Error while trying to reserve a slot for a nexus task", e.getCause());
10599
return null;
106100
}
101+
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
102+
if (permit == null) return null;
107103

108104
try {
109105
response =

temporal-sdk/src/main/java/io/temporal/internal/worker/Poller.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import io.temporal.internal.common.GrpcUtils;
2828
import io.temporal.internal.task.VirtualThreadDelegate;
2929
import io.temporal.worker.MetricsType;
30+
import io.temporal.worker.tuning.SlotPermit;
31+
import io.temporal.worker.tuning.SlotReleaseReason;
32+
import io.temporal.worker.tuning.SlotSupplierFuture;
3033
import java.time.Duration;
3134
import java.util.Objects;
3235
import java.util.concurrent.*;
@@ -222,6 +225,25 @@ public WorkerLifecycleState getLifecycleState() {
222225
return WorkerLifecycleState.ACTIVE;
223226
}
224227

228+
static SlotPermit getSlotPermitAndHandleInterrupts(
229+
SlotSupplierFuture future, TrackingSlotSupplier<?> slotSupplier) {
230+
SlotPermit permit;
231+
try {
232+
permit = future.get();
233+
} catch (InterruptedException e) {
234+
SlotPermit maybePermitAnyway = future.abortReservation();
235+
if (maybePermitAnyway != null) {
236+
slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), maybePermitAnyway);
237+
}
238+
Thread.currentThread().interrupt();
239+
return null;
240+
} catch (ExecutionException e) {
241+
log.warn("Error while trying to reserve a slot", e.getCause());
242+
return null;
243+
}
244+
return permit;
245+
}
246+
225247
@Override
226248
public String toString() {
227249
// TODO using pollThreadNamePrefix here is ugly. We should consider introducing some concept of

temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.Map;
2828
import java.util.Optional;
2929
import java.util.concurrent.*;
30-
import java.util.concurrent.atomic.AtomicBoolean;
3130
import java.util.concurrent.atomic.AtomicInteger;
3231

3332
/**
@@ -49,56 +48,20 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
4948
publishSlotsMetric();
5049
}
5150

52-
public Future<SlotPermit> reserveSlot(SlotReservationData dat) {
53-
final Future<SlotPermit> originalFuture;
51+
public SlotSupplierFuture reserveSlot(SlotReservationData data) {
52+
final SlotSupplierFuture future;
5453
try {
55-
originalFuture = inner.reserveSlot(createCtx(dat));
54+
future = inner.reserveSlot(createCtx(data));
5655
} catch (Exception e) {
5756
throw new RuntimeException(e);
5857
}
5958

60-
return new Future<SlotPermit>() {
61-
private final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
62-
63-
private SlotPermit executeCallbackIfNeeded(SlotPermit permit) {
64-
if (callbackInvoked.compareAndSet(false, true)) {
65-
issuedSlots.incrementAndGet();
66-
}
67-
return permit;
68-
}
69-
70-
@Override
71-
public boolean cancel(boolean mayInterruptIfRunning) {
72-
return originalFuture.cancel(mayInterruptIfRunning);
73-
}
74-
75-
@Override
76-
public boolean isCancelled() {
77-
return originalFuture.isCancelled();
78-
}
79-
80-
@Override
81-
public boolean isDone() {
82-
return originalFuture.isDone();
83-
}
84-
85-
@Override
86-
public SlotPermit get() throws InterruptedException, ExecutionException {
87-
SlotPermit permit = originalFuture.get();
88-
return executeCallbackIfNeeded(permit);
89-
}
90-
91-
@Override
92-
public SlotPermit get(long timeout, TimeUnit unit)
93-
throws InterruptedException, ExecutionException, TimeoutException {
94-
SlotPermit permit = originalFuture.get(timeout, unit);
95-
return executeCallbackIfNeeded(permit);
96-
}
97-
};
59+
future.thenRun(issuedSlots::incrementAndGet);
60+
return future;
9861
}
9962

100-
public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {
101-
Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(dat));
63+
public Optional<SlotPermit> tryReserveSlot(SlotReservationData data) {
64+
Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(data));
10265
if (p.isPresent()) {
10366
issuedSlots.incrementAndGet();
10467
}

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,9 @@
3737
import io.temporal.worker.MetricsType;
3838
import io.temporal.worker.tuning.SlotPermit;
3939
import io.temporal.worker.tuning.SlotReleaseReason;
40+
import io.temporal.worker.tuning.SlotSupplierFuture;
4041
import io.temporal.worker.tuning.WorkflowSlotInfo;
4142
import java.util.Objects;
42-
import java.util.concurrent.ExecutionException;
43-
import java.util.concurrent.Future;
4443
import java.util.function.Supplier;
4544
import javax.annotation.Nonnull;
4645
import javax.annotation.Nullable;
@@ -125,25 +124,24 @@ public WorkflowPollTask(
125124
@Override
126125
@SuppressWarnings("deprecation")
127126
public WorkflowTask poll() {
128-
boolean isSuccessful = false;
129127
SlotPermit permit;
130-
Future<SlotPermit> future =
131-
slotSupplier.reserveSlot(
132-
new SlotReservationData(
133-
pollRequest.getTaskQueue().getName(),
134-
pollRequest.getIdentity(),
135-
pollRequest.getWorkerVersionCapabilities().getBuildId()));
128+
SlotSupplierFuture future;
129+
boolean isSuccessful = false;
136130
try {
137-
permit = future.get();
138-
} catch (InterruptedException e) {
139-
future.cancel(true);
140-
Thread.currentThread().interrupt();
141-
return null;
142-
} catch (ExecutionException e) {
143-
log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
131+
future =
132+
slotSupplier.reserveSlot(
133+
new SlotReservationData(
134+
pollRequest.getTaskQueue().getName(),
135+
pollRequest.getIdentity(),
136+
pollRequest.getWorkerVersionCapabilities().getBuildId()));
137+
} catch (Exception e) {
138+
log.warn("Error while trying to reserve a slot for a workflow", e.getCause());
144139
return null;
145140
}
146141

142+
permit = Poller.getSlotPermitAndHandleInterrupts(future, slotSupplier);
143+
if (permit == null) return null;
144+
147145
TaskQueueKind taskQueueKind = stickyQueueBalancer.makePoll();
148146
boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind);
149147
PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest;

temporal-sdk/src/main/java/io/temporal/worker/tuning/FixedSizeSlotSupplier.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Optional;
2626
import java.util.Queue;
2727
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.Future;
2928
import java.util.concurrent.locks.ReentrantLock;
3029

3130
/**
@@ -94,7 +93,11 @@ public void release() {
9493
try {
9594
CompletableFuture<Void> waiter = waiters.poll();
9695
if (waiter != null) {
97-
waiter.complete(null);
96+
if (!waiter.complete(null) && waiter.isCancelled()) {
97+
// If this waiter was cancelled, we need to release another permit, since this waiter
98+
// is now useless
99+
release();
100+
}
98101
} else {
99102
permits++;
100103
}
@@ -111,8 +114,10 @@ public FixedSizeSlotSupplier(int numSlots) {
111114
}
112115

113116
@Override
114-
public Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
115-
return executorSlotsSemaphore.acquire().thenApply(ignored -> new SlotPermit());
117+
public SlotSupplierFuture reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
118+
CompletableFuture<Void> slotFuture = executorSlotsSemaphore.acquire();
119+
return SlotSupplierFuture.fromCompletableFuture(
120+
slotFuture.thenApply(ignored -> new SlotPermit()), () -> slotFuture.cancel(true));
116121
}
117122

118123
@Override

temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -202,16 +202,16 @@ private ResourceBasedSlotSupplier(
202202
}
203203

204204
@Override
205-
public Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
205+
public SlotSupplierFuture reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
206206
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
207-
return CompletableFuture.completedFuture(new SlotPermit());
207+
return SlotSupplierFuture.completedFuture(new SlotPermit());
208208
}
209209
return tryReserveSlot(ctx)
210-
.map(CompletableFuture::completedFuture)
210+
.map(SlotSupplierFuture::completedFuture)
211211
.orElseGet(() -> scheduleSlotAcquisition(ctx));
212212
}
213213

214-
private CompletableFuture<SlotPermit> scheduleSlotAcquisition(SlotReserveContext<SI> ctx) {
214+
private SlotSupplierFuture scheduleSlotAcquisition(SlotReserveContext<SI> ctx) {
215215
Duration mustWaitFor;
216216
try {
217217
mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
@@ -228,17 +228,19 @@ private CompletableFuture<SlotPermit> scheduleSlotAcquisition(SlotReserveContext
228228
}
229229

230230
// After the delay, try to reserve the slot
231-
return permitFuture.thenCompose(
232-
ignored -> {
233-
Optional<SlotPermit> permit = tryReserveSlot(ctx);
234-
// If we couldn't get a slot this time, delay for a short period and try again
235-
return permit
236-
.map(CompletableFuture::completedFuture)
237-
.orElseGet(
238-
() ->
239-
CompletableFuture.supplyAsync(() -> null, delayedExecutor(10))
240-
.thenCompose(ig -> scheduleSlotAcquisition(ctx)));
241-
});
231+
return SlotSupplierFuture.fromCompletableFuture(
232+
permitFuture.thenCompose(
233+
ignored -> {
234+
Optional<SlotPermit> permit = tryReserveSlot(ctx);
235+
// If we couldn't get a slot this time, delay for a short period and try again
236+
return permit
237+
.map(CompletableFuture::completedFuture)
238+
.orElseGet(
239+
() ->
240+
CompletableFuture.supplyAsync(() -> null, delayedExecutor(10))
241+
.thenCompose(ig -> scheduleSlotAcquisition(ctx)));
242+
}),
243+
() -> permitFuture.cancel(true));
242244
}
243245

244246
@Override

temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotSupplier.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import io.temporal.common.Experimental;
2424
import java.util.Optional;
25-
import java.util.concurrent.Future;
2625

2726
/**
2827
* A SlotSupplier is responsible for managing the number of slots available for a given type of
@@ -49,7 +48,7 @@ public interface SlotSupplier<SI extends SlotInfo> {
4948
* @return A future that will be completed with a permit to use the slot when one becomes
5049
* available. Never return null, or complete the future with null.
5150
*/
52-
Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception;
51+
SlotSupplierFuture reserveSlot(SlotReserveContext<SI> ctx) throws Exception;
5352

5453
/**
5554
* This function is called when trying to reserve slots for "eager" workflow and activity tasks.

0 commit comments

Comments
 (0)