Skip to content

Commit dea9ce3

Browse files
Fix springboot support
1 parent 72f457a commit dea9ce3

6 files changed

Lines changed: 161 additions & 72 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncPoller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ public boolean start() {
103103
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
104104
log.info("Starting async poller: {}", asyncTaskPoller.getLabel());
105105
AdjustableSemaphore pollerSemaphore =
106-
new AdjustableSemaphore(pollerBehavior.getInitialMaxConcurrentTaskPollers());
106+
new AdjustableSemaphore(pollerBehavior.getInitialConcurrentTaskPollers());
107107
PollScaleReportHandle<T> pollScaleReportHandle =
108108
new PollScaleReportHandle<>(
109109
pollerBehavior.getMinConcurrentTaskPollers(),
110110
pollerBehavior.getMaxConcurrentTaskPollers(),
111-
pollerBehavior.getInitialMaxConcurrentTaskPollers(),
111+
pollerBehavior.getInitialConcurrentTaskPollers(),
112112
(newTarget) -> {
113113
log.debug(
114114
"Updating maximum number of pollers for {} to: {}",

temporal-sdk/src/main/java/io/temporal/worker/tuning/PollerBehaviorAutoscaling.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.temporal.common.Experimental;
44
import java.util.Objects;
5+
import javax.annotation.Nullable;
56

67
/**
78
* A poller behavior that will automatically scale the number of pollers based on feedback from the
@@ -19,27 +20,37 @@ public final class PollerBehaviorAutoscaling implements PollerBehavior {
1920
/**
2021
* Creates a new PollerBehaviorAutoscaling with default parameters.
2122
*
22-
* <p> Default parameters are:
23+
* <p>Default parameters are:
24+
*
2325
* <ul>
24-
* <li>minConcurrentTaskPollers = 1</li>
25-
* <li>maxConcurrentTaskPollers = 100</li>
26-
* <li>initialConcurrentTaskPollers = 5</li>
26+
* <li>minConcurrentTaskPollers = 1
27+
* <li>maxConcurrentTaskPollers = 100
28+
* <li>initialConcurrentTaskPollers = 5
2729
*/
2830
public PollerBehaviorAutoscaling() {
29-
this(1, 100, 5);
31+
this(null, null, null);
3032
}
3133

3234
/**
3335
* Creates a new PollerBehaviorAutoscaling with the specified parameters.
3436
*
35-
* @param minConcurrentTaskPollers Minimum number of concurrent task pollers.
36-
* @param maxConcurrentTaskPollers Maximum number of concurrent task pollers.
37-
* @param initialConcurrentTaskPollers Initial number of concurrent task pollers.
37+
* @param minConcurrentTaskPollers Minimum number of concurrent task pollers. Default is 1.
38+
* @param maxConcurrentTaskPollers Maximum number of concurrent task pollers. Default is 100.
39+
* @param initialConcurrentTaskPollers Initial number of concurrent task pollers. Default is 5.
3840
*/
3941
public PollerBehaviorAutoscaling(
40-
int minConcurrentTaskPollers,
41-
int maxConcurrentTaskPollers,
42-
int initialConcurrentTaskPollers) {
42+
@Nullable Integer minConcurrentTaskPollers,
43+
@Nullable Integer maxConcurrentTaskPollers,
44+
@Nullable Integer initialConcurrentTaskPollers) {
45+
if (minConcurrentTaskPollers == null) {
46+
minConcurrentTaskPollers = 1;
47+
}
48+
if (maxConcurrentTaskPollers == null) {
49+
maxConcurrentTaskPollers = 100;
50+
}
51+
if (initialConcurrentTaskPollers == null) {
52+
initialConcurrentTaskPollers = 5;
53+
}
4354
if (minConcurrentTaskPollers < 1) {
4455
throw new IllegalArgumentException("minConcurrentTaskPollers must be at least 1");
4556
}
@@ -80,7 +91,7 @@ public int getMaxConcurrentTaskPollers() {
8091
*
8192
* @return Initial number of concurrent task pollers.
8293
*/
83-
public int getInitialMaxConcurrentTaskPollers() {
94+
public int getInitialConcurrentTaskPollers() {
8495
return initialConcurrentTaskPollers;
8596
}
8697

temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/properties/WorkerProperties.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.temporal.common.WorkerDeploymentVersion;
55
import io.temporal.worker.WorkerDeploymentOptions;
66
import io.temporal.worker.tuning.PollerBehavior;
7-
import io.temporal.worker.tuning.PollerBehaviorAutoscaling;
87
import java.util.Collection;
98
import javax.annotation.Nonnull;
109
import javax.annotation.Nullable;
@@ -97,19 +96,62 @@ public WorkerDeploymentConfigurationProperties getDeploymentProperties() {
9796
}
9897

9998
public static class PollerConfigurationProperties {
100-
private final @Nullable PollerBehaviorAutoscaling pollerBehaviorAutoscaling;
99+
public static class PollerBehaviorAutoscalingConfiguration {
100+
private final Boolean enabled;
101+
private final Integer minConcurrentTaskPollers;
102+
private final Integer maxConcurrentTaskPollers;
103+
private final Integer initialConcurrentTaskPollers;
104+
105+
@ConstructorBinding
106+
public PollerBehaviorAutoscalingConfiguration(
107+
@Nullable Boolean enabled,
108+
@Nullable Integer minConcurrentTaskPollers,
109+
@Nullable Integer maxConcurrentTaskPollers,
110+
@Nullable Integer initialConcurrentTaskPollers) {
111+
this.enabled = enabled;
112+
this.minConcurrentTaskPollers = minConcurrentTaskPollers;
113+
this.maxConcurrentTaskPollers = maxConcurrentTaskPollers;
114+
this.initialConcurrentTaskPollers = initialConcurrentTaskPollers;
115+
}
116+
117+
@Nullable
118+
public Boolean isEnabled() {
119+
// If enabled is true or any of the other parameters are set, then autoscaling is enabled.
120+
return Boolean.TRUE.equals(enabled)
121+
|| minConcurrentTaskPollers != null
122+
|| maxConcurrentTaskPollers != null
123+
|| initialConcurrentTaskPollers != null;
124+
}
125+
126+
@Nullable
127+
public Integer getMinConcurrentTaskPollers() {
128+
return minConcurrentTaskPollers;
129+
}
130+
131+
@Nullable
132+
public Integer getMaxConcurrentTaskPollers() {
133+
return maxConcurrentTaskPollers;
134+
}
135+
136+
@Nullable
137+
public Integer getInitialConcurrentTaskPollers() {
138+
return initialConcurrentTaskPollers;
139+
}
140+
}
141+
142+
private final @Nullable PollerBehaviorAutoscalingConfiguration pollerBehaviorAutoscaling;
101143

102144
/**
103145
* @param pollerBehaviorAutoscaling defines poller behavior for autoscaling
104146
*/
105147
@ConstructorBinding
106148
public PollerConfigurationProperties(
107-
@Nullable PollerBehaviorAutoscaling pollerBehaviorAutoscaling) {
149+
@Nullable PollerBehaviorAutoscalingConfiguration pollerBehaviorAutoscaling) {
108150
this.pollerBehaviorAutoscaling = pollerBehaviorAutoscaling;
109151
}
110152

111153
@Nullable
112-
public PollerBehaviorAutoscaling getPollerBehaviorAutoscaling() {
154+
public PollerBehaviorAutoscalingConfiguration getPollerBehaviorAutoscaling() {
113155
return pollerBehaviorAutoscaling;
114156
}
115157
}

temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkerOptionsTemplate.java

Lines changed: 73 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.temporal.spring.boot.autoconfigure.properties.WorkerProperties;
77
import io.temporal.worker.WorkerDeploymentOptions;
88
import io.temporal.worker.WorkerOptions;
9+
import io.temporal.worker.tuning.PollerBehaviorAutoscaling;
910
import java.util.Optional;
1011
import javax.annotation.Nonnull;
1112
import javax.annotation.Nullable;
@@ -50,70 +51,92 @@ WorkerOptions createWorkerOptions() {
5051
Optional.ofNullable(threadsConfiguration.getMaxConcurrentNexusTaskPollers())
5152
.ifPresent(options::setMaxConcurrentNexusTaskPollers);
5253
if (threadsConfiguration.getWorkflowTaskPollersConfiguration() != null) {
53-
Optional.ofNullable(
54+
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
55+
pollerBehaviorAutoscaling =
5456
threadsConfiguration
5557
.getWorkflowTaskPollersConfiguration()
56-
.getPollerBehaviorAutoscaling())
57-
.ifPresent(options::setWorkflowTaskPollersBehavior);
58+
.getPollerBehaviorAutoscaling();
59+
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
60+
options.setWorkflowTaskPollersBehavior(
61+
new PollerBehaviorAutoscaling(
62+
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
63+
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
64+
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
65+
}
5866
}
5967
if (threadsConfiguration.getActivityTaskPollersConfiguration() != null) {
60-
Optional.ofNullable(
68+
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
69+
pollerBehaviorAutoscaling =
6170
threadsConfiguration
6271
.getActivityTaskPollersConfiguration()
63-
.getPollerBehaviorAutoscaling())
64-
.ifPresent(options::setActivityTaskPollersBehavior);
72+
.getPollerBehaviorAutoscaling();
73+
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
74+
options.setActivityTaskPollersBehavior(
75+
new PollerBehaviorAutoscaling(
76+
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
77+
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
78+
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
79+
}
6580
}
6681
if (threadsConfiguration.getNexusTaskPollersConfiguration() != null) {
67-
Optional.ofNullable(
82+
WorkerProperties.PollerConfigurationProperties.PollerBehaviorAutoscalingConfiguration
83+
pollerBehaviorAutoscaling =
6884
threadsConfiguration
6985
.getNexusTaskPollersConfiguration()
70-
.getPollerBehaviorAutoscaling())
71-
.ifPresent(options::setNexusTaskPollersBehavior);
86+
.getPollerBehaviorAutoscaling();
87+
if (pollerBehaviorAutoscaling != null && pollerBehaviorAutoscaling.isEnabled()) {
88+
options.setNexusTaskPollersBehavior(
89+
new PollerBehaviorAutoscaling(
90+
pollerBehaviorAutoscaling.getMinConcurrentTaskPollers(),
91+
pollerBehaviorAutoscaling.getMaxConcurrentTaskPollers(),
92+
pollerBehaviorAutoscaling.getInitialConcurrentTaskPollers()));
93+
}
7294
}
73-
}
7495

75-
WorkerProperties.RateLimitsConfigurationProperties rateLimitConfiguration =
76-
workerProperties.getRateLimits();
77-
if (rateLimitConfiguration != null) {
78-
Optional.ofNullable(rateLimitConfiguration.getMaxWorkerActivitiesPerSecond())
79-
.ifPresent(options::setMaxWorkerActivitiesPerSecond);
80-
Optional.ofNullable(rateLimitConfiguration.getMaxTaskQueueActivitiesPerSecond())
81-
.ifPresent(options::setMaxTaskQueueActivitiesPerSecond);
82-
}
96+
WorkerProperties.RateLimitsConfigurationProperties rateLimitConfiguration =
97+
workerProperties.getRateLimits();
98+
if (rateLimitConfiguration != null) {
99+
Optional.ofNullable(rateLimitConfiguration.getMaxWorkerActivitiesPerSecond())
100+
.ifPresent(options::setMaxWorkerActivitiesPerSecond);
101+
Optional.ofNullable(rateLimitConfiguration.getMaxTaskQueueActivitiesPerSecond())
102+
.ifPresent(options::setMaxTaskQueueActivitiesPerSecond);
103+
}
83104

84-
WorkerProperties.BuildIdConfigurationProperties buildIdConfigurations =
85-
workerProperties.getBuildId();
86-
if (buildIdConfigurations != null) {
87-
Optional.ofNullable(buildIdConfigurations.getWorkerBuildId())
88-
.ifPresent(options::setBuildId);
89-
options.setUseBuildIdForVersioning(buildIdConfigurations.getEnabledWorkerVersioning());
90-
}
105+
WorkerProperties.BuildIdConfigurationProperties buildIdConfigurations =
106+
workerProperties.getBuildId();
107+
if (buildIdConfigurations != null) {
108+
Optional.ofNullable(buildIdConfigurations.getWorkerBuildId())
109+
.ifPresent(options::setBuildId);
110+
options.setUseBuildIdForVersioning(buildIdConfigurations.getEnabledWorkerVersioning());
111+
}
91112

92-
WorkerProperties.VirtualThreadConfigurationProperties virtualThreadConfiguration =
93-
workerProperties.getVirtualThreads();
94-
if (virtualThreadConfiguration != null) {
95-
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreads())
96-
.ifPresent(options::setUsingVirtualThreads);
97-
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreadsOnWorkflowWorker())
98-
.ifPresent(options::setUsingVirtualThreadsOnWorkflowWorker);
99-
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreadsOnActivityWorker())
100-
.ifPresent(options::setUsingVirtualThreadsOnActivityWorker);
101-
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreadsOnLocalActivityWorker())
102-
.ifPresent(options::setUsingVirtualThreadsOnLocalActivityWorker);
103-
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreadsOnNexusWorker())
104-
.ifPresent(options::setUsingVirtualThreadsOnNexusWorker);
105-
}
106-
WorkerProperties.WorkerDeploymentConfigurationProperties workerDeploymentConfiguration =
107-
workerProperties.getDeploymentProperties();
108-
if (workerDeploymentConfiguration != null) {
109-
WorkerDeploymentOptions.Builder opts = WorkerDeploymentOptions.newBuilder();
110-
Optional.ofNullable(workerDeploymentConfiguration.getUseVersioning())
111-
.ifPresent(opts::setUseVersioning);
112-
Optional.ofNullable(workerDeploymentConfiguration.getDeploymentVersion())
113-
.ifPresent((v) -> opts.setVersion(WorkerDeploymentVersion.fromCanonicalString(v)));
114-
Optional.ofNullable(workerDeploymentConfiguration.getDefaultVersioningBehavior())
115-
.ifPresent(opts::setDefaultVersioningBehavior);
116-
options.setDeploymentOptions(opts.build());
113+
WorkerProperties.VirtualThreadConfigurationProperties virtualThreadConfiguration =
114+
workerProperties.getVirtualThreads();
115+
if (virtualThreadConfiguration != null) {
116+
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreads())
117+
.ifPresent(options::setUsingVirtualThreads);
118+
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreadsOnWorkflowWorker())
119+
.ifPresent(options::setUsingVirtualThreadsOnWorkflowWorker);
120+
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreadsOnActivityWorker())
121+
.ifPresent(options::setUsingVirtualThreadsOnActivityWorker);
122+
Optional.ofNullable(
123+
virtualThreadConfiguration.isUsingVirtualThreadsOnLocalActivityWorker())
124+
.ifPresent(options::setUsingVirtualThreadsOnLocalActivityWorker);
125+
Optional.ofNullable(virtualThreadConfiguration.isUsingVirtualThreadsOnNexusWorker())
126+
.ifPresent(options::setUsingVirtualThreadsOnNexusWorker);
127+
}
128+
WorkerProperties.WorkerDeploymentConfigurationProperties workerDeploymentConfiguration =
129+
workerProperties.getDeploymentProperties();
130+
if (workerDeploymentConfiguration != null) {
131+
WorkerDeploymentOptions.Builder opts = WorkerDeploymentOptions.newBuilder();
132+
Optional.ofNullable(workerDeploymentConfiguration.getUseVersioning())
133+
.ifPresent(opts::setUseVersioning);
134+
Optional.ofNullable(workerDeploymentConfiguration.getDeploymentVersion())
135+
.ifPresent((v) -> opts.setVersion(WorkerDeploymentVersion.fromCanonicalString(v)));
136+
Optional.ofNullable(workerDeploymentConfiguration.getDefaultVersioningBehavior())
137+
.ifPresent(opts::setDefaultVersioningBehavior);
138+
options.setDeploymentOptions(opts.build());
139+
}
117140
}
118141
}
119142

temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/OptionalWorkerOptionsTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,23 @@ public TemporalOptionsCustomizer<WorkerOptions.Builder> workerCustomizer() {
114114
"Values from the Spring Config should be respected");
115115
assertEquals(
116116
5,
117-
autoscaling.getInitialMaxConcurrentTaskPollers(),
117+
autoscaling.getInitialConcurrentTaskPollers(),
118118
"Values from the Spring Config should be respected");
119+
assertNotNull(options.getActivityTaskPollersBehavior());
120+
assertInstanceOf(
121+
PollerBehaviorAutoscaling.class, options.getActivityTaskPollersBehavior());
122+
autoscaling = (PollerBehaviorAutoscaling) options.getActivityTaskPollersBehavior();
119123
assertEquals(
120124
1,
121-
options.getMaxConcurrentActivityTaskPollers(),
125+
autoscaling.getMinConcurrentTaskPollers(),
126+
"Values from the Spring Config should be respected");
127+
assertEquals(
128+
100,
129+
autoscaling.getMaxConcurrentTaskPollers(),
130+
"Values from the Spring Config should be respected");
131+
assertEquals(
132+
5,
133+
autoscaling.getInitialConcurrentTaskPollers(),
122134
"Values from the Spring Config should be respected");
123135
assertEquals(
124136
1,

temporal-spring-boot-autoconfigure/src/test/resources/application.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,14 @@ spring:
115115
max-concurrent-nexus-task-executors: 1
116116
max-concurrent-activity-executors: 1
117117
max-concurrent-local-activity-executors: 1
118-
max-concurrent-activity-task-pollers: 1
119118
max-concurrent-nexus-task-pollers: 1
120119
workflow-task-pollers-configuration:
121120
poller-behavior-autoscaling:
122121
min-concurrent-task-pollers: 1
123122
max-concurrent-task-pollers: 10
124-
initial-concurrent-task-pollers: 5
123+
activity-task-pollers-configuration:
124+
poller-behavior-autoscaling:
125+
enabled: true
125126
rate-limits:
126127
max-worker-activities-per-second: 1.0
127128
max-task-queue-activities-per-second: 1.0

0 commit comments

Comments
 (0)