Skip to content

Commit 5863dba

Browse files
Add Temporal Nexus Operation Handler
1 parent 05ee827 commit 5863dba

16 files changed

Lines changed: 1031 additions & 88 deletions
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.temporal.internal.nexus;
2+
3+
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
4+
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;
5+
6+
import io.nexusrpc.handler.HandlerException;
7+
import io.nexusrpc.handler.OperationContext;
8+
import io.nexusrpc.handler.OperationStartDetails;
9+
import io.temporal.api.common.v1.Link;
10+
import io.temporal.api.common.v1.WorkflowExecution;
11+
import io.temporal.api.enums.v1.EventType;
12+
import io.temporal.internal.client.NexusStartWorkflowRequest;
13+
import io.temporal.internal.client.NexusStartWorkflowResponse;
14+
import java.net.URISyntaxException;
15+
import java.util.function.Function;
16+
17+
/**
18+
* Shared helper for starting a workflow from a Nexus operation and attaching workflow links to the
19+
* operation context. Used by both {@code WorkflowRunOperationImpl} and {@code TemporalNexusClient}.
20+
*/
21+
public class NexusStartWorkflowHelper {
22+
23+
/**
24+
* Starts a workflow via the provided invoker function, attaches workflow links to the operation
25+
* context, and returns the response.
26+
*
27+
* @param ctx the operation context (links will be attached as a side-effect)
28+
* @param details the operation start details containing requestId, callback, links
29+
* @param invoker function that starts the workflow given a {@link NexusStartWorkflowRequest}
30+
* @return the {@link NexusStartWorkflowResponse} containing the operation token and workflow
31+
* execution
32+
*/
33+
public static NexusStartWorkflowResponse startWorkflowAndAttachLinks(
34+
OperationContext ctx,
35+
OperationStartDetails details,
36+
Function<NexusStartWorkflowRequest, NexusStartWorkflowResponse> invoker) {
37+
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();
38+
39+
NexusStartWorkflowRequest nexusRequest =
40+
new NexusStartWorkflowRequest(
41+
details.getRequestId(),
42+
details.getCallbackUrl(),
43+
details.getCallbackHeaders(),
44+
nexusCtx.getTaskQueue(),
45+
details.getLinks());
46+
47+
NexusStartWorkflowResponse response = invoker.apply(nexusRequest);
48+
WorkflowExecution workflowExec = response.getWorkflowExecution();
49+
50+
// If the start workflow response returned a link use it, otherwise
51+
// create the link information about the new workflow and return to the caller.
52+
Link.WorkflowEvent workflowEventLink =
53+
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
54+
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
55+
: null;
56+
if (workflowEventLink == null) {
57+
workflowEventLink =
58+
Link.WorkflowEvent.newBuilder()
59+
.setNamespace(nexusCtx.getNamespace())
60+
.setWorkflowId(workflowExec.getWorkflowId())
61+
.setRunId(workflowExec.getRunId())
62+
.setEventRef(
63+
Link.WorkflowEvent.EventReference.newBuilder()
64+
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
65+
.build();
66+
}
67+
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
68+
if (nexusLink != null) {
69+
try {
70+
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
71+
} catch (URISyntaxException e) {
72+
throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e);
73+
}
74+
}
75+
76+
return response;
77+
}
78+
79+
private NexusStartWorkflowHelper() {}
80+
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java renamed to temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import com.fasterxml.jackson.annotation.JsonInclude;
44
import com.fasterxml.jackson.annotation.JsonProperty;
55

6-
public class WorkflowRunOperationToken {
6+
/** Deserialized representation of a Nexus operation token. */
7+
public class OperationToken {
78
@JsonProperty("v")
89
@JsonInclude(JsonInclude.Include.NON_NULL)
910
private final Integer version;
@@ -17,7 +18,7 @@ public class WorkflowRunOperationToken {
1718
@JsonProperty("wid")
1819
private final String workflowId;
1920

20-
public WorkflowRunOperationToken(
21+
public OperationToken(
2122
@JsonProperty("t") Integer type,
2223
@JsonProperty("ns") String namespace,
2324
@JsonProperty("wid") String workflowId,
@@ -28,8 +29,8 @@ public WorkflowRunOperationToken(
2829
this.version = version;
2930
}
3031

31-
public WorkflowRunOperationToken(String namespace, String workflowId) {
32-
this.type = OperationTokenType.WORKFLOW_RUN;
32+
public OperationToken(OperationTokenType type, String namespace, String workflowId) {
33+
this.type = type;
3334
this.namespace = namespace;
3435
this.workflowId = workflowId;
3536
this.version = null;

temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,45 @@ public class OperationTokenUtil {
1515
private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
1616

1717
/**
18-
* Load a workflow run operation token from an operation token.
18+
* Load and validate an operation token without asserting the token type. Use this for cancel
19+
* dispatch where the token type determines the cancel behavior.
1920
*
20-
* @throws IllegalArgumentException if the operation token is invalid
21+
* @throws IllegalArgumentException if the operation token is malformed or has invalid structure
2122
*/
22-
public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken) {
23-
WorkflowRunOperationToken token;
23+
public static OperationToken loadOperationToken(String operationToken) {
24+
OperationToken token;
2425
try {
25-
JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class);
26+
JavaType reference = mapper.getTypeFactory().constructType(OperationToken.class);
2627
token = mapper.readValue(decoder.decode(operationToken), reference);
2728
} catch (Exception e) {
2829
throw new IllegalArgumentException("Failed to parse operation token: " + e.getMessage());
2930
}
30-
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
31-
throw new IllegalArgumentException(
32-
"Invalid workflow run token: incorrect operation token type: " + token.getType());
33-
}
3431
if (token.getVersion() != null && token.getVersion() != 0) {
35-
throw new IllegalArgumentException("Invalid workflow run token: unexpected version field");
32+
throw new IllegalArgumentException("Invalid operation token: unexpected version field");
3633
}
3734
if (Strings.isNullOrEmpty(token.getWorkflowId())) {
38-
throw new IllegalArgumentException("Invalid workflow run token: missing workflow ID (wid)");
35+
throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)");
36+
}
37+
return token;
38+
}
39+
40+
/**
41+
* Load a workflow run operation token, asserting that the token type is {@link
42+
* OperationTokenType#WORKFLOW_RUN}.
43+
*
44+
* @throws IllegalArgumentException if the operation token is invalid or not a workflow run token
45+
*/
46+
public static OperationToken loadWorkflowRunOperationToken(String operationToken) {
47+
OperationToken token = loadOperationToken(operationToken);
48+
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
49+
throw new IllegalArgumentException(
50+
"Invalid workflow run token: incorrect operation token type: " + token.getType());
3951
}
4052
return token;
4153
}
4254

4355
/**
44-
* Attempt to extract the workflow Id from an operation token.
56+
* Extract the workflow ID from a workflow run operation token.
4557
*
4658
* @throws IllegalArgumentException if the operation token is invalid
4759
*/
@@ -52,7 +64,9 @@ public static String loadWorkflowIdFromOperationToken(String operationToken) {
5264
/** Generate a workflow run operation token from a workflow ID and namespace. */
5365
public static String generateWorkflowRunOperationToken(String workflowId, String namespace)
5466
throws JsonProcessingException {
55-
String json = ow.writeValueAsString(new WorkflowRunOperationToken(namespace, workflowId));
67+
String json =
68+
ow.writeValueAsString(
69+
new OperationToken(OperationTokenType.WORKFLOW_RUN, namespace, workflowId));
5670
return encoder.encodeToString(json.getBytes());
5771
}
5872

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package io.temporal.nexus;
2+
3+
import io.nexusrpc.handler.OperationContext;
4+
import io.nexusrpc.handler.OperationStartDetails;
5+
import io.temporal.client.WorkflowClient;
6+
import io.temporal.client.WorkflowOptions;
7+
import io.temporal.client.WorkflowStub;
8+
import io.temporal.common.Experimental;
9+
import io.temporal.internal.client.NexusStartWorkflowResponse;
10+
import io.temporal.internal.nexus.NexusStartWorkflowHelper;
11+
import io.temporal.workflow.Functions;
12+
import java.util.Objects;
13+
import java.util.function.Consumer;
14+
15+
/**
16+
* Nexus-aware client wrapping {@link WorkflowClient}. Provides methods for interacting with
17+
* Temporal workflows from within a Nexus operation handler.
18+
*
19+
* <p>Obtained via the {@link TemporalOperationHandler.StartFunction} parameter. The client creates
20+
* workflow stubs internally — users pass the workflow class, a lambda that calls the workflow
21+
* method, and workflow options.
22+
*
23+
* <p>Usage example:
24+
*
25+
* <pre>{@code
26+
* @OperationImpl
27+
* public OperationHandler<OrderInput, OrderResult> createOrder() {
28+
* return TemporalOperationHandler.from((context, client, input) -> {
29+
* return client.startWorkflow(
30+
* OrderWorkflow.class,
31+
* wf -> wf.processOrder(input),
32+
* WorkflowOptions.newBuilder()
33+
* .setWorkflowId("order-" + context.getRequestId())
34+
* .build());
35+
* });
36+
* }
37+
* }</pre>
38+
*
39+
* <p>For advanced use cases, the underlying {@link WorkflowClient} can be accessed via {@link
40+
* #getWorkflowClient()}. For example, to send a signal and return a synchronous result:
41+
*
42+
* <pre>{@code
43+
* @OperationImpl
44+
* public OperationHandler<CancelOrderInput, Void> cancelOrder() {
45+
* return TemporalOperationHandler.from((context, client, input) -> {
46+
* client.getWorkflowClient()
47+
* .newUntypedWorkflowStub("order-" + input.getOrderId())
48+
* .signal("requestCancellation", input);
49+
* return TemporalOperationResult.sync(null);
50+
* });
51+
* }
52+
* }</pre>
53+
*/
54+
@Experimental
55+
public final class TemporalNexusClient {
56+
57+
private final WorkflowClient client;
58+
private final OperationContext operationContext;
59+
private final OperationStartDetails operationStartDetails;
60+
61+
TemporalNexusClient(
62+
WorkflowClient client,
63+
OperationContext operationContext,
64+
OperationStartDetails operationStartDetails) {
65+
this.client = Objects.requireNonNull(client);
66+
this.operationContext = Objects.requireNonNull(operationContext);
67+
this.operationStartDetails = Objects.requireNonNull(operationStartDetails);
68+
}
69+
70+
/** Returns the underlying {@link WorkflowClient} for advanced use cases. */
71+
public WorkflowClient getWorkflowClient() {
72+
return client;
73+
}
74+
75+
/**
76+
* Starts a workflow by invoking a method on a workflow stub. The client creates the stub from the
77+
* given class and options, then passes it to the provided consumer which should call exactly one
78+
* workflow method. Works for both returning and void workflow methods.
79+
*
80+
* <p>Example (returning):
81+
*
82+
* <pre>{@code
83+
* client.startWorkflow(MyWorkflow.class, wf -> wf.run(input), options)
84+
* }</pre>
85+
*
86+
* <p>Example (void):
87+
*
88+
* <pre>{@code
89+
* client.startWorkflow(MyWorkflow.class, wf -> wf.execute(input), options)
90+
* }</pre>
91+
*
92+
* @param workflowClass the workflow interface class
93+
* @param workflowInvocation receives the workflow stub and calls exactly one workflow method
94+
* @param options workflow start options (must include workflowId)
95+
* @param <T> the workflow interface type
96+
* @param <R> the workflow return type (inferred from calling context)
97+
* @return an async {@link TemporalOperationResult} with the workflow-run operation token
98+
*/
99+
public <T, R> TemporalOperationResult<R> startWorkflow(
100+
Class<T> workflowClass, Consumer<T> workflowInvocation, WorkflowOptions options) {
101+
T stub = client.newWorkflowStub(workflowClass, options);
102+
Functions.Proc bound = () -> workflowInvocation.accept(stub);
103+
return invokeAndReturn(WorkflowHandle.fromWorkflowMethod(bound));
104+
}
105+
106+
/**
107+
* Starts a workflow using an untyped workflow type name.
108+
*
109+
* @param workflowType the workflow type name string
110+
* @param resultClass the expected result class
111+
* @param args workflow arguments
112+
* @param options workflow start options (must include workflowId)
113+
* @param <R> the workflow return type
114+
* @return an async {@link TemporalOperationResult} with the workflow-run operation token
115+
*/
116+
public <R> TemporalOperationResult<R> startWorkflow(
117+
String workflowType, Class<R> resultClass, Object[] args, WorkflowOptions options) {
118+
WorkflowStub stub = client.newUntypedWorkflowStub(workflowType, options);
119+
WorkflowHandle<R> handle = WorkflowHandle.fromWorkflowStub(stub, resultClass, args);
120+
return invokeAndReturn(handle);
121+
}
122+
123+
private <R> TemporalOperationResult<R> invokeAndReturn(WorkflowHandle<?> handle) {
124+
NexusStartWorkflowResponse response =
125+
NexusStartWorkflowHelper.startWorkflowAndAttachLinks(
126+
operationContext,
127+
operationStartDetails,
128+
request -> handle.getInvoker().invoke(request));
129+
return TemporalOperationResult.async(response.getOperationToken());
130+
}
131+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.temporal.nexus;
2+
3+
import io.nexusrpc.handler.OperationCancelDetails;
4+
import io.nexusrpc.handler.OperationContext;
5+
import io.temporal.common.Experimental;
6+
import java.util.Objects;
7+
8+
/**
9+
* Context for a Nexus cancel operation. Combines the {@link OperationContext} and {@link
10+
* OperationCancelDetails} into a single object passed to cancel methods on {@link
11+
* TemporalOperationHandler}.
12+
*/
13+
@Experimental
14+
public final class TemporalOperationCancelContext {
15+
16+
private final OperationContext operationContext;
17+
private final OperationCancelDetails operationCancelDetails;
18+
19+
TemporalOperationCancelContext(
20+
OperationContext operationContext, OperationCancelDetails operationCancelDetails) {
21+
this.operationContext = Objects.requireNonNull(operationContext);
22+
this.operationCancelDetails = Objects.requireNonNull(operationCancelDetails);
23+
}
24+
25+
/** Returns the service name for this operation. */
26+
public String getService() {
27+
return operationContext.getService();
28+
}
29+
30+
/** Returns the operation name. */
31+
public String getOperation() {
32+
return operationContext.getOperation();
33+
}
34+
35+
/** Returns the operation token identifying the operation to cancel. */
36+
public String getOperationToken() {
37+
return operationCancelDetails.getOperationToken();
38+
}
39+
40+
/** Returns the underlying {@link OperationContext} for advanced use cases. */
41+
public OperationContext getOperationContext() {
42+
return operationContext;
43+
}
44+
45+
/** Returns the underlying {@link OperationCancelDetails} for advanced use cases. */
46+
public OperationCancelDetails getOperationCancelDetails() {
47+
return operationCancelDetails;
48+
}
49+
}

0 commit comments

Comments
 (0)