Skip to content

Commit 38cd43e

Browse files
Do some refactoring
1 parent d92092b commit 38cd43e

7 files changed

Lines changed: 49 additions & 93 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/common/GrpcUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package io.temporal.internal.common;
22

3+
import com.google.common.util.concurrent.ListenableFuture;
34
import io.grpc.Status;
45
import io.grpc.StatusRuntimeException;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.ForkJoinPool;
59

610
public class GrpcUtils {
711
/**
@@ -14,4 +18,20 @@ public static boolean isChannelShutdownException(StatusRuntimeException ex) {
1418
&& (description.startsWith("Channel shutdown")
1519
|| description.startsWith("Subchannel shutdown")));
1620
}
21+
22+
public static <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> listenableFuture) {
23+
CompletableFuture<T> result = new CompletableFuture<>();
24+
listenableFuture.addListener(
25+
() -> {
26+
try {
27+
result.complete(listenableFuture.get());
28+
} catch (ExecutionException e) {
29+
result.completeExceptionally(e.getCause());
30+
} catch (Exception e) {
31+
result.completeExceptionally(e);
32+
}
33+
},
34+
ForkJoinPool.commonPool());
35+
return result;
36+
}
1737
}

temporal-sdk/src/main/java/io/temporal/internal/worker/AdjustableSemaphore.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
/** A simple implementation of an adjustable semaphore. */
77
@ThreadSafe
88
public final class AdjustableSemaphore {
9-
10-
/** semaphore starts at 0 capacity; must be set by setMaxPermits before use */
11-
private final ResizeableSemaphore semaphore = new ResizeableSemaphore();
9+
private final ResizeableSemaphore semaphore;
1210

1311
/**
1412
* how many permits are allowed as governed by this semaphore. Access must be synchronized on this
@@ -17,13 +15,14 @@ public final class AdjustableSemaphore {
1715
private int maxPermits = 0;
1816

1917
/** New instances should be configured with setMaxPermits(). */
20-
public AdjustableSemaphore() {
21-
// no op
18+
public AdjustableSemaphore(int initialPermits) {
19+
if (initialPermits < 1) {
20+
throw new IllegalArgumentException(
21+
"Semaphore size must be at least 1," + " was " + initialPermits);
22+
}
23+
this.semaphore = new ResizeableSemaphore(initialPermits);
2224
}
2325

24-
/*
25-
* Must be synchronized because the underlying int is not thread safe
26-
*/
2726
/**
2827
* Set the max number of permits. Must be greater than zero.
2928
*
@@ -32,7 +31,7 @@ public AdjustableSemaphore() {
3231
* until enough permits have been released to have the number of outstanding permits fall below
3332
* the new maximum. In other words, it does what you probably think it should.
3433
*
35-
* @param newMax
34+
* @param newMax the new maximum number of permits
3635
*/
3736
synchronized void setMaxPermits(int newMax) {
3837
if (newMax < 1) {
@@ -56,7 +55,7 @@ synchronized void setMaxPermits(int newMax) {
5655
this.maxPermits = newMax;
5756
}
5857

59-
/** Release a permit back to the semaphore. Make sure not to double-release. */
58+
/** Release a permit back to the semaphore. */
6059
void release() {
6160
this.semaphore.release();
6261
}
@@ -80,8 +79,8 @@ private static final class ResizeableSemaphore extends Semaphore {
8079
private static final long serialVersionUID = 1L;
8180

8281
/** Create a new semaphore with 0 permits. */
83-
ResizeableSemaphore() {
84-
super(0);
82+
ResizeableSemaphore(int initialPermits) {
83+
super(initialPermits);
8584
}
8685

8786
@Override

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
44

5-
import com.google.common.util.concurrent.ListenableFuture;
65
import com.google.protobuf.DoubleValue;
76
import com.google.protobuf.Timestamp;
87
import com.uber.m3.tally.Scope;
@@ -12,6 +11,7 @@
1211
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
1312
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
1413
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
14+
import io.temporal.internal.common.GrpcUtils;
1515
import io.temporal.internal.common.ProtobufTimeUtils;
1616
import io.temporal.serviceclient.MetricsTag;
1717
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -21,8 +21,6 @@
2121
import io.temporal.worker.tuning.SlotPermit;
2222
import io.temporal.worker.tuning.SlotReleaseReason;
2323
import java.util.concurrent.CompletableFuture;
24-
import java.util.concurrent.ExecutionException;
25-
import java.util.concurrent.ForkJoinPool;
2624
import java.util.concurrent.atomic.AtomicInteger;
2725
import java.util.function.Supplier;
2826
import javax.annotation.Nonnull;
@@ -77,23 +75,6 @@ public AsyncActivityPollTask(
7775
this.pollRequest = pollRequest.build();
7876
}
7977

80-
private static <T> CompletableFuture<T> toCompletableFuture(
81-
ListenableFuture<T> listenableFuture) {
82-
CompletableFuture<T> result = new CompletableFuture<>();
83-
listenableFuture.addListener(
84-
() -> {
85-
try {
86-
result.complete(listenableFuture.get());
87-
} catch (ExecutionException e) {
88-
result.completeExceptionally(e.getCause());
89-
} catch (Exception e) {
90-
result.completeExceptionally(e);
91-
}
92-
},
93-
ForkJoinPool.commonPool());
94-
return result;
95-
}
96-
9778
@Override
9879
@SuppressWarnings("deprecation")
9980
public CompletableFuture<ActivityTask> poll(SlotPermit permit) {
@@ -106,7 +87,7 @@ public CompletableFuture<ActivityTask> poll(SlotPermit permit) {
10687
.update(pollGauge.incrementAndGet());
10788

10889
CompletableFuture<PollActivityTaskQueueResponse> response =
109-
toCompletableFuture(
90+
GrpcUtils.toCompletableFuture(
11091
service
11192
.futureStub()
11293
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
@@ -130,11 +111,9 @@ public CompletableFuture<ActivityTask> poll(SlotPermit permit) {
130111
})
131112
.whenComplete(
132113
(r, e) -> {
133-
if (e != null) {
134-
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK)
135-
.gauge(MetricsType.NUM_POLLERS)
136-
.update(pollGauge.decrementAndGet());
137-
}
114+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK)
115+
.gauge(MetricsType.NUM_POLLERS)
116+
.update(pollGauge.decrementAndGet());
138117
});
139118
}
140119

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
44

5-
import com.google.common.util.concurrent.ListenableFuture;
65
import com.google.protobuf.Timestamp;
76
import com.uber.m3.tally.Scope;
87
import io.temporal.api.common.v1.WorkerVersionCapabilities;
98
import io.temporal.api.taskqueue.v1.TaskQueue;
109
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
1110
import io.temporal.api.workflowservice.v1.PollNexusTaskQueueRequest;
1211
import io.temporal.api.workflowservice.v1.PollNexusTaskQueueResponse;
12+
import io.temporal.internal.common.GrpcUtils;
1313
import io.temporal.internal.common.ProtobufTimeUtils;
1414
import io.temporal.serviceclient.MetricsTag;
1515
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -19,8 +19,6 @@
1919
import io.temporal.worker.tuning.SlotReleaseReason;
2020
import java.util.Objects;
2121
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.ExecutionException;
23-
import java.util.concurrent.ForkJoinPool;
2422
import java.util.concurrent.atomic.AtomicInteger;
2523
import java.util.function.Supplier;
2624
import javax.annotation.Nonnull;
@@ -68,23 +66,6 @@ public AsyncNexusPollTask(
6866
this.pollRequest = pollRequest.build();
6967
}
7068

71-
private static <T> CompletableFuture<T> toCompletableFuture(
72-
ListenableFuture<T> listenableFuture) {
73-
CompletableFuture<T> result = new CompletableFuture<>();
74-
listenableFuture.addListener(
75-
() -> {
76-
try {
77-
result.complete(listenableFuture.get());
78-
} catch (ExecutionException e) {
79-
result.completeExceptionally(e.getCause());
80-
} catch (Exception e) {
81-
result.completeExceptionally(e);
82-
}
83-
},
84-
ForkJoinPool.commonPool());
85-
return result;
86-
}
87-
8869
@Override
8970
@SuppressWarnings("deprecation")
9071
public CompletableFuture<NexusTask> poll(SlotPermit permit) {
@@ -97,7 +78,7 @@ public CompletableFuture<NexusTask> poll(SlotPermit permit) {
9778
.update(pollGauge.incrementAndGet());
9879

9980
CompletableFuture<PollNexusTaskQueueResponse> response =
100-
toCompletableFuture(
81+
GrpcUtils.toCompletableFuture(
10182
service
10283
.futureStub()
10384
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
@@ -123,11 +104,9 @@ public CompletableFuture<NexusTask> poll(SlotPermit permit) {
123104
})
124105
.whenComplete(
125106
(r, e) -> {
126-
if (e != null) {
127-
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK)
128-
.gauge(MetricsType.NUM_POLLERS)
129-
.update(pollGauge.decrementAndGet());
130-
}
107+
MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK)
108+
.gauge(MetricsType.NUM_POLLERS)
109+
.update(pollGauge.decrementAndGet());
131110
});
132111
}
133112

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ public boolean start() {
9797
pollerOptions.getPollThreadNamePrefix(),
9898
pollerOptions.getUncaughtExceptionHandler()));
9999
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
100-
AdjustableSemaphore pollerSemaphore = new AdjustableSemaphore();
101-
pollerSemaphore.setMaxPermits(pollerBehavior.getInitialMaxConcurrentTaskPollers());
100+
AdjustableSemaphore pollerSemaphore =
101+
new AdjustableSemaphore(pollerBehavior.getInitialMaxConcurrentTaskPollers());
102102
PollScaleReportHandle pollScaleReportHandle =
103103
new PollScaleReportHandle<>(
104104
pollerBehavior.getMinConcurrentTaskPollers(),

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22

33
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
44

5-
import com.google.common.util.concurrent.ListenableFuture;
65
import com.google.protobuf.Timestamp;
76
import com.uber.m3.tally.Scope;
87
import io.temporal.api.common.v1.WorkerVersionCapabilities;
98
import io.temporal.api.enums.v1.TaskQueueKind;
109
import io.temporal.api.taskqueue.v1.TaskQueue;
1110
import io.temporal.api.workflowservice.v1.*;
11+
import io.temporal.internal.common.GrpcUtils;
1212
import io.temporal.internal.common.ProtobufTimeUtils;
1313
import io.temporal.serviceclient.MetricsTag;
1414
import io.temporal.serviceclient.WorkflowServiceStubs;
@@ -18,8 +18,6 @@
1818
import io.temporal.worker.tuning.WorkflowSlotInfo;
1919
import java.util.Objects;
2020
import java.util.concurrent.CompletableFuture;
21-
import java.util.concurrent.ExecutionException;
22-
import java.util.concurrent.ForkJoinPool;
2321
import java.util.concurrent.atomic.AtomicInteger;
2422
import java.util.function.Supplier;
2523
import javax.annotation.Nonnull;
@@ -101,23 +99,6 @@ public AsyncWorkflowPollTask(
10199
}
102100
}
103101

104-
private static <T> CompletableFuture<T> toCompletableFuture(
105-
ListenableFuture<T> listenableFuture) {
106-
CompletableFuture<T> result = new CompletableFuture<>();
107-
listenableFuture.addListener(
108-
() -> {
109-
try {
110-
result.complete(listenableFuture.get());
111-
} catch (ExecutionException e) {
112-
result.completeExceptionally(e.getCause());
113-
} catch (Exception e) {
114-
result.completeExceptionally(e);
115-
}
116-
},
117-
ForkJoinPool.commonPool());
118-
return result;
119-
}
120-
121102
@Override
122103
@SuppressWarnings("deprecation")
123104
public CompletableFuture<WorkflowTask> poll(SlotPermit permit) {
@@ -130,7 +111,7 @@ public CompletableFuture<WorkflowTask> poll(SlotPermit permit) {
130111
.update(pollGauge.incrementAndGet());
131112

132113
CompletableFuture<PollWorkflowTaskQueueResponse> response =
133-
toCompletableFuture(
114+
GrpcUtils.toCompletableFuture(
134115
service
135116
.futureStub()
136117
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
@@ -151,11 +132,9 @@ public CompletableFuture<WorkflowTask> poll(SlotPermit permit) {
151132
})
152133
.whenComplete(
153134
(r, e) -> {
154-
if (e != null) {
155-
MetricsTag.tagged(metricsScope, taskQueueTagValue)
156-
.gauge(MetricsType.NUM_POLLERS)
157-
.update(pollGauge.decrementAndGet());
158-
}
135+
MetricsTag.tagged(metricsScope, taskQueueTagValue)
136+
.gauge(MetricsType.NUM_POLLERS)
137+
.update(pollGauge.decrementAndGet());
159138
});
160139
}
161140
}

temporal-sdk/src/main/java/io/temporal/internal/worker/PollerOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public Builder setBackoffMaximumJitterCoefficient(double backoffMaximumJitterCoe
121121
return this;
122122
}
123123

124-
/** Number of parallel polling threads. */
124+
/** Set poller behavior. */
125125
public Builder setPollerBehavior(PollerBehavior pollerBehavior) {
126126
this.pollerBehavior = pollerBehavior;
127127
return this;

0 commit comments

Comments
 (0)