Skip to content

Commit 8727900

Browse files
committed
Return Future instead of CompletableFuture
1 parent 6f192ac commit 8727900

9 files changed

Lines changed: 98 additions & 26 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
import io.temporal.worker.MetricsType;
3636
import io.temporal.worker.tuning.*;
3737
import java.util.Objects;
38-
import java.util.concurrent.CompletableFuture;
3938
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.Future;
4040
import java.util.function.Supplier;
4141
import javax.annotation.Nonnull;
4242
import javax.annotation.Nullable;
@@ -98,7 +98,7 @@ public ActivityTask poll() {
9898
PollActivityTaskQueueResponse response;
9999
SlotPermit permit;
100100
boolean isSuccessful = false;
101-
CompletableFuture<SlotPermit> future;
101+
Future<SlotPermit> future;
102102
try {
103103
future =
104104
slotSupplier.reserveSlot(

temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private void processQueue() {
8484
try {
8585
request = requestQueue.take();
8686

87-
CompletableFuture<SlotPermit> future = slotSupplier.reserveSlot(request.data);
87+
Future<SlotPermit> future = slotSupplier.reserveSlot(request.data);
8888
try {
8989
slotPermit = future.get();
9090
} catch (InterruptedException e) {

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import io.temporal.worker.MetricsType;
3333
import io.temporal.worker.tuning.*;
3434
import java.util.Objects;
35-
import java.util.concurrent.CompletableFuture;
3635
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.Future;
3737
import java.util.function.Supplier;
3838
import javax.annotation.Nonnull;
3939
import javax.annotation.Nullable;
@@ -88,7 +88,7 @@ public NexusTask poll() {
8888
PollNexusTaskQueueResponse response;
8989
SlotPermit permit;
9090
boolean isSuccessful = false;
91-
CompletableFuture<SlotPermit> future =
91+
Future<SlotPermit> future =
9292
slotSupplier.reserveSlot(
9393
new SlotReservationData(
9494
pollRequest.getTaskQueue().getName(),

temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import java.util.Collections;
2727
import java.util.Map;
2828
import java.util.Optional;
29-
import java.util.concurrent.CompletableFuture;
30-
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.*;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3131
import java.util.concurrent.atomic.AtomicInteger;
3232

3333
/**
@@ -49,15 +49,52 @@ public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
4949
publishSlotsMetric();
5050
}
5151

52-
public CompletableFuture<SlotPermit> reserveSlot(SlotReservationData dat) {
53-
CompletableFuture<SlotPermit> future = null;
52+
public Future<SlotPermit> reserveSlot(SlotReservationData dat) {
53+
final Future<SlotPermit> originalFuture;
5454
try {
55-
future = inner.reserveSlot(createCtx(dat));
55+
originalFuture = inner.reserveSlot(createCtx(dat));
5656
} catch (Exception e) {
5757
throw new RuntimeException(e);
5858
}
59-
future.thenAccept(permit -> issuedSlots.incrementAndGet());
60-
return future;
59+
60+
return new Future<SlotPermit>() {
61+
private final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
62+
63+
private SlotPermit executeCallbackIfNeeded(SlotPermit permit) {
64+
if (callbackInvoked.compareAndSet(false, true)) {
65+
issuedSlots.incrementAndGet();
66+
}
67+
return permit;
68+
}
69+
70+
@Override
71+
public boolean cancel(boolean mayInterruptIfRunning) {
72+
return originalFuture.cancel(mayInterruptIfRunning);
73+
}
74+
75+
@Override
76+
public boolean isCancelled() {
77+
return originalFuture.isCancelled();
78+
}
79+
80+
@Override
81+
public boolean isDone() {
82+
return originalFuture.isDone();
83+
}
84+
85+
@Override
86+
public SlotPermit get() throws InterruptedException, ExecutionException {
87+
SlotPermit permit = originalFuture.get();
88+
return executeCallbackIfNeeded(permit);
89+
}
90+
91+
@Override
92+
public SlotPermit get(long timeout, TimeUnit unit)
93+
throws InterruptedException, ExecutionException, TimeoutException {
94+
SlotPermit permit = originalFuture.get(timeout, unit);
95+
return executeCallbackIfNeeded(permit);
96+
}
97+
};
6198
}
6299

63100
public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
import io.temporal.worker.tuning.SlotReleaseReason;
4040
import io.temporal.worker.tuning.WorkflowSlotInfo;
4141
import java.util.Objects;
42-
import java.util.concurrent.CompletableFuture;
4342
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.Future;
4444
import java.util.function.Supplier;
4545
import javax.annotation.Nonnull;
4646
import javax.annotation.Nullable;
@@ -127,7 +127,7 @@ public WorkflowPollTask(
127127
public WorkflowTask poll() {
128128
boolean isSuccessful = false;
129129
SlotPermit permit;
130-
CompletableFuture<SlotPermit> future =
130+
Future<SlotPermit> future =
131131
slotSupplier.reserveSlot(
132132
new SlotReservationData(
133133
pollRequest.getTaskQueue().getName(),

temporal-sdk/src/main/java/io/temporal/worker/tuning/FixedSizeSlotSupplier.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Optional;
2626
import java.util.Queue;
2727
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.Future;
2829
import java.util.concurrent.locks.ReentrantLock;
2930

3031
/**
@@ -110,7 +111,7 @@ public FixedSizeSlotSupplier(int numSlots) {
110111
}
111112

112113
@Override
113-
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
114+
public Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
114115
return executorSlotsSemaphore.acquire().thenApply(ignored -> new SlotPermit());
115116
}
116117

temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private ResourceBasedSlotSupplier(
202202
}
203203

204204
@Override
205-
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
205+
public Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
206206
if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
207207
return CompletableFuture.completedFuture(new SlotPermit());
208208
}

temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotSupplier.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import io.temporal.common.Experimental;
2424
import java.util.Optional;
25-
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Future;
2626

2727
/**
2828
* A SlotSupplier is responsible for managing the number of slots available for a given type of
@@ -49,7 +49,7 @@ public interface SlotSupplier<SI extends SlotInfo> {
4949
* @return A future that will be completed with a permit to use the slot when one becomes
5050
* available. Never return null, or complete the future with null.
5151
*/
52-
CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception;
52+
Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception;
5353

5454
/**
5555
* This function is called when trying to reserve slots for "eager" workflow and activity tasks.

temporal-sdk/src/test/java/io/temporal/testUtils/CountingSlotSupplier.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222

2323
import io.temporal.worker.tuning.*;
2424
import java.util.Optional;
25-
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.*;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicInteger;
2828

2929
public class CountingSlotSupplier<SI extends SlotInfo> extends FixedSizeSlotSupplier<SI> {
@@ -38,13 +38,47 @@ public CountingSlotSupplier(int numSlots) {
3838
}
3939

4040
@Override
41-
public CompletableFuture<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
42-
CompletableFuture<SlotPermit> p = super.reserveSlot(ctx);
43-
return p.thenApply(
44-
permit -> {
41+
public Future<SlotPermit> reserveSlot(SlotReserveContext<SI> ctx) throws Exception {
42+
Future<SlotPermit> originalFuture = super.reserveSlot(ctx);
43+
44+
return new Future<SlotPermit>() {
45+
private final AtomicBoolean callbackInvoked = new AtomicBoolean(false);
46+
47+
private SlotPermit executeCallbackIfNeeded(SlotPermit permit) {
48+
if (callbackInvoked.compareAndSet(false, true)) {
4549
reservedCount.incrementAndGet();
46-
return permit;
47-
});
50+
}
51+
return permit;
52+
}
53+
54+
@Override
55+
public boolean cancel(boolean mayInterruptIfRunning) {
56+
return originalFuture.cancel(mayInterruptIfRunning);
57+
}
58+
59+
@Override
60+
public boolean isCancelled() {
61+
return originalFuture.isCancelled();
62+
}
63+
64+
@Override
65+
public boolean isDone() {
66+
return originalFuture.isDone();
67+
}
68+
69+
@Override
70+
public SlotPermit get() throws InterruptedException, ExecutionException {
71+
SlotPermit permit = originalFuture.get();
72+
return executeCallbackIfNeeded(permit);
73+
}
74+
75+
@Override
76+
public SlotPermit get(long timeout, TimeUnit unit)
77+
throws InterruptedException, ExecutionException, TimeoutException {
78+
SlotPermit permit = originalFuture.get(timeout, unit);
79+
return executeCallbackIfNeeded(permit);
80+
}
81+
};
4882
}
4983

5084
@Override

0 commit comments

Comments
 (0)