Skip to content

Commit 4c1b9a6

Browse files
Add support for workflow init in Springboot
1 parent b3b7806 commit 4c1b9a6

6 files changed

Lines changed: 106 additions & 26 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public final class POJOWorkflowImplementationFactory implements ReplayWorkflowFa
9191
workflowDefinitions = Collections.synchronizedMap(new HashMap<>());
9292

9393
/** Factories providing instances of workflow classes. */
94-
private final Map<Class<?>, Functions.Func<?>> workflowInstanceFactories =
94+
private final Map<Class<?>, Functions.Func1<EncodedValues, ?>> workflowInstanceFactories =
9595
Collections.synchronizedMap(new HashMap<>());
9696

9797
/** If present then it is called for any unknown workflow type. */
@@ -146,7 +146,9 @@ public void registerWorkflowImplementationTypes(
146146
*/
147147
@SuppressWarnings("unchecked")
148148
public <R> void addWorkflowImplementationFactory(
149-
WorkflowImplementationOptions options, Class<R> clazz, Functions.Func<R> factory) {
149+
WorkflowImplementationOptions options,
150+
Class<R> clazz,
151+
Functions.Func1<EncodedValues, R> factory) {
150152
if (DynamicWorkflow.class.isAssignableFrom(clazz)) {
151153
if (dynamicWorkflowImplementationFactory != null) {
152154
throw new TypeAlreadyRegisteredException(
@@ -433,16 +435,17 @@ public WorkflowOutput execute(WorkflowInput input) {
433435
}
434436

435437
protected void newInstance(Optional<Payloads> input) {
436-
Func<?> factory = workflowInstanceFactories.get(workflowImplementationClass);
438+
Functions.Func1<EncodedValues, ?> factory =
439+
workflowInstanceFactories.get(workflowImplementationClass);
437440
if (factory != null) {
438-
workflow = factory.apply();
441+
workflow = factory.apply(new EncodedValues(input, dataConverterWithWorkflowContext));
439442
} else {
440443
// Historically any exception thrown from the constructor was wrapped into Error causing a
441444
// workflow task failure.
442445
// This is not consistent with throwing exception from the workflow method which can
443446
// causes a workflow failure depending on the exception type.
444447
// To preserve backwards compatibility we only change behaviour if a constructor is
445-
// annotated with WorkflowInit.
448+
// annotated with @WorkflowInit.
446449
if (ctor != null) {
447450
try {
448451
workflow =

temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.temporal.api.taskqueue.v1.TaskQueue;
2727
import io.temporal.client.WorkflowClient;
2828
import io.temporal.common.converter.DataConverter;
29+
import io.temporal.common.converter.EncodedValues;
2930
import io.temporal.internal.activity.ActivityExecutionContextFactory;
3031
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
3132
import io.temporal.internal.activity.LocalActivityExecutionContextFactoryImpl;
@@ -38,6 +39,7 @@
3839
import io.temporal.worker.tuning.SlotSupplier;
3940
import io.temporal.worker.tuning.WorkflowSlotInfo;
4041
import io.temporal.workflow.Functions.Func;
42+
import io.temporal.workflow.Functions.Func1;
4143
import java.lang.reflect.Type;
4244
import java.time.Duration;
4345
import java.util.Objects;
@@ -165,6 +167,11 @@ public void registerWorkflowImplementationTypes(
165167

166168
public <R> void registerWorkflowImplementationFactory(
167169
WorkflowImplementationOptions options, Class<R> clazz, Func<R> factory) {
170+
this.factory.addWorkflowImplementationFactory(options, clazz, unused -> factory.apply());
171+
}
172+
173+
public <R> void registerWorkflowImplementationFactory(
174+
WorkflowImplementationOptions options, Class<R> clazz, Func1<EncodedValues, R> factory) {
168175
this.factory.addWorkflowImplementationFactory(options, clazz, factory);
169176
}
170177

temporal-sdk/src/main/java/io/temporal/worker/Worker.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@
3131
import io.temporal.common.WorkflowExecutionHistory;
3232
import io.temporal.common.context.ContextPropagator;
3333
import io.temporal.common.converter.DataConverter;
34+
import io.temporal.common.converter.EncodedValues;
3435
import io.temporal.failure.TemporalFailure;
3536
import io.temporal.internal.sync.WorkflowInternal;
3637
import io.temporal.internal.sync.WorkflowThreadExecutor;
3738
import io.temporal.internal.worker.*;
3839
import io.temporal.serviceclient.MetricsTag;
3940
import io.temporal.worker.tuning.*;
41+
import io.temporal.workflow.Functions;
4042
import io.temporal.workflow.Functions.Func;
4143
import io.temporal.workflow.WorkflowMethod;
4244
import java.time.Duration;
@@ -325,6 +327,14 @@ public <R> void registerWorkflowImplementationFactory(
325327
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
326328
}
327329

330+
@VisibleForTesting
331+
public <R> void registerWorkflowImplementationFactory(
332+
Class<R> workflowInterface,
333+
Functions.Func1<EncodedValues, R> factory,
334+
WorkflowImplementationOptions options) {
335+
workflowWorker.registerWorkflowImplementationFactory(options, workflowInterface, factory);
336+
}
337+
328338
/**
329339
* Configures a factory to use when an instance of a workflow implementation is created.
330340
*

temporal-spring-boot-autoconfigure/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ dependencies {
3434
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
3535

3636
testImplementation "org.springframework.boot:spring-boot-starter-test"
37+
38+
testImplementation('org.slf4j:slf4j-api') {
39+
version {
40+
strictly "${slf4jVersion}"
41+
}
42+
}
3743
}
3844

3945
tasks.test {

temporal-spring-boot-autoconfigure/src/main/java/io/temporal/spring/boot/autoconfigure/template/WorkersTemplate.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import io.temporal.spring.boot.autoconfigure.properties.NamespaceProperties;
3838
import io.temporal.spring.boot.autoconfigure.properties.WorkerProperties;
3939
import io.temporal.worker.*;
40+
import java.lang.reflect.Constructor;
41+
import java.lang.reflect.InvocationTargetException;
4042
import java.util.ArrayList;
4143
import java.util.Collection;
4244
import java.util.HashMap;
@@ -510,7 +512,6 @@ private void configureWorkflowImplementationAutoDiscovery(
510512

511513
@SuppressWarnings("unchecked")
512514
private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz) {
513-
514515
POJOWorkflowImplMetadata workflowMetadata =
515516
POJOWorkflowImplMetadata.newInstanceForWorkflowFactory(clazz);
516517
List<POJOWorkflowMethodMetadata> workflowMethods = workflowMetadata.getWorkflowMethods();
@@ -524,10 +525,15 @@ private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz)
524525
WorkflowImplementationOptions workflowImplementationOptions =
525526
new WorkflowImplementationOptionsTemplate(workflowImplementationCustomizer)
526527
.createWorkflowImplementationOptions();
527-
528528
WorkerDeploymentOptions deploymentOptions = worker.getWorkerOptions().getDeploymentOptions();
529-
530-
for (POJOWorkflowMethodMetadata workflowMethod : workflowMetadata.getWorkflowMethods()) {
529+
if (workflowMetadata.getWorkflowInit() != null) {
530+
if (workflowMethods.size() > 1) {
531+
throw new BeanDefinitionValidationException(
532+
"Workflow implementation class "
533+
+ clazz
534+
+ " has more then one workflow method and a constructor annotated with @WorkflowInit.");
535+
}
536+
POJOWorkflowMethodMetadata workflowMethod = workflowMetadata.getWorkflowMethods().get(0);
531537
if (deploymentOptions != null && deploymentOptions.isUsingVersioning()) {
532538
POJOWorkflowImplementationFactory.validateVersioningBehavior(
533539
clazz,
@@ -538,10 +544,43 @@ private <T> void configureWorkflowImplementation(Worker worker, Class<?> clazz)
538544

539545
worker.registerWorkflowImplementationFactory(
540546
(Class<T>) workflowMethod.getWorkflowInterface(),
541-
() -> (T) beanFactory.createBean(clazz),
547+
(encodedValues) -> {
548+
try {
549+
Constructor<?> ctor = workflowMetadata.getWorkflowInit();
550+
Object[] parameters = new Object[ctor.getParameterCount()];
551+
for (int i = 0; i < ctor.getParameterCount(); i++) {
552+
parameters[i] =
553+
encodedValues.get(
554+
i, ctor.getParameterTypes()[i], ctor.getGenericParameterTypes()[i]);
555+
}
556+
T workflowInstance = (T) workflowMetadata.getWorkflowInit().newInstance(parameters);
557+
beanFactory.autowireBean(workflowInstance);
558+
return workflowInstance;
559+
} catch (InstantiationException
560+
| IllegalAccessException
561+
| InvocationTargetException e) {
562+
throw new RuntimeException(e);
563+
}
564+
},
542565
workflowImplementationOptions);
543566
addRegisteredWorkflowImpl(
544567
worker, workflowMethod.getWorkflowInterface().getName(), workflowMetadata);
568+
} else {
569+
for (POJOWorkflowMethodMetadata workflowMethod : workflowMetadata.getWorkflowMethods()) {
570+
if (deploymentOptions != null && deploymentOptions.isUsingVersioning()) {
571+
POJOWorkflowImplementationFactory.validateVersioningBehavior(
572+
clazz,
573+
workflowMethod,
574+
deploymentOptions.getDefaultVersioningBehavior(),
575+
deploymentOptions.isUsingVersioning());
576+
}
577+
worker.registerWorkflowImplementationFactory(
578+
(Class<T>) workflowMethod.getWorkflowInterface(),
579+
() -> (T) beanFactory.createBean(clazz),
580+
workflowImplementationOptions);
581+
addRegisteredWorkflowImpl(
582+
worker, workflowMethod.getWorkflowInterface().getName(), workflowMetadata);
583+
}
545584
}
546585
}
547586

temporal-spring-boot-autoconfigure/src/test/java/io/temporal/spring/boot/autoconfigure/bytaskqueue/TestWorkflowImpl.java

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,29 +26,44 @@
2626
import io.temporal.workflow.NexusOperationOptions;
2727
import io.temporal.workflow.NexusServiceOptions;
2828
import io.temporal.workflow.Workflow;
29+
import io.temporal.workflow.WorkflowInit;
2930
import java.time.Duration;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.ConfigurableApplicationContext;
3033

3134
@WorkflowImpl(taskQueues = {"${default-queue.name:UnitTest}"})
3235
public class TestWorkflowImpl implements TestWorkflow {
36+
private final TestNexusService nexusService;
37+
private final TestActivity activity;
38+
39+
@Autowired private ConfigurableApplicationContext applicationContext;
40+
41+
@WorkflowInit
42+
public TestWorkflowImpl(String input) {
43+
nexusService =
44+
Workflow.newNexusServiceStub(
45+
TestNexusService.class,
46+
NexusServiceOptions.newBuilder()
47+
.setEndpoint("AutoDiscoveryByTaskQueueEndpoint")
48+
.setOperationOptions(
49+
NexusOperationOptions.newBuilder()
50+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
51+
.build())
52+
.build());
53+
54+
activity =
55+
Workflow.newActivityStub(
56+
TestActivity.class,
57+
ActivityOptions.newBuilder()
58+
.setStartToCloseTimeout(Duration.ofSeconds(1))
59+
.validateAndBuildWithDefaults());
60+
}
61+
3362
@Override
3463
public String execute(String input) {
3564
if (input.equals("nexus")) {
36-
Workflow.newNexusServiceStub(
37-
TestNexusService.class,
38-
NexusServiceOptions.newBuilder()
39-
.setEndpoint("AutoDiscoveryByTaskQueueEndpoint")
40-
.setOperationOptions(
41-
NexusOperationOptions.newBuilder()
42-
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
43-
.build())
44-
.build())
45-
.operation(input);
65+
nexusService.operation(input);
4666
}
47-
return Workflow.newActivityStub(
48-
TestActivity.class,
49-
ActivityOptions.newBuilder()
50-
.setStartToCloseTimeout(Duration.ofSeconds(1))
51-
.validateAndBuildWithDefaults())
52-
.execute("done");
67+
return activity.execute("done");
5368
}
5469
}

0 commit comments

Comments
 (0)