Skip to content

Commit 27cfa7d

Browse files
Add Temporal Nexus Operation Handler (#2842)
Add Temporal Nexus Operation Handler
1 parent 5f25aad commit 27cfa7d

19 files changed

Lines changed: 2032 additions & 87 deletions
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.temporal.internal.nexus;
2+
3+
import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink;
4+
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;
5+
6+
import io.nexusrpc.handler.HandlerException;
7+
import io.nexusrpc.handler.OperationContext;
8+
import io.nexusrpc.handler.OperationStartDetails;
9+
import io.temporal.api.common.v1.Link;
10+
import io.temporal.api.common.v1.WorkflowExecution;
11+
import io.temporal.api.enums.v1.EventType;
12+
import io.temporal.internal.client.NexusStartWorkflowRequest;
13+
import io.temporal.internal.client.NexusStartWorkflowResponse;
14+
import java.net.URISyntaxException;
15+
import java.util.function.Function;
16+
17+
/**
18+
* Shared helper for starting a workflow from a Nexus operation and attaching workflow links to the
19+
* operation context. Used by both {@code WorkflowRunOperationImpl} and {@code
20+
* TemporalNexusClientImpl}.
21+
*/
22+
public class NexusStartWorkflowHelper {
23+
24+
/**
25+
* Starts a workflow via the provided invoker function, attaches workflow links to the operation
26+
* context, and returns the response.
27+
*
28+
* @param ctx the operation context (links will be attached as a side-effect)
29+
* @param details the operation start details containing requestId, callback, links
30+
* @param invoker function that starts the workflow given a {@link NexusStartWorkflowRequest}
31+
* @return the {@link NexusStartWorkflowResponse} containing the operation token and workflow
32+
* execution
33+
*/
34+
public static NexusStartWorkflowResponse startWorkflowAndAttachLinks(
35+
OperationContext ctx,
36+
OperationStartDetails details,
37+
Function<NexusStartWorkflowRequest, NexusStartWorkflowResponse> invoker) {
38+
InternalNexusOperationContext nexusCtx = CurrentNexusOperationContext.get();
39+
40+
NexusStartWorkflowRequest nexusRequest =
41+
new NexusStartWorkflowRequest(
42+
details.getRequestId(),
43+
details.getCallbackUrl(),
44+
details.getCallbackHeaders(),
45+
nexusCtx.getTaskQueue(),
46+
details.getLinks());
47+
48+
NexusStartWorkflowResponse response = invoker.apply(nexusRequest);
49+
WorkflowExecution workflowExec = response.getWorkflowExecution();
50+
51+
// If the start workflow response returned a link use it, otherwise
52+
// create the link information about the new workflow and return to the caller.
53+
Link.WorkflowEvent workflowEventLink =
54+
nexusCtx.getStartWorkflowResponseLink().hasWorkflowEvent()
55+
? nexusCtx.getStartWorkflowResponseLink().getWorkflowEvent()
56+
: null;
57+
if (workflowEventLink == null) {
58+
workflowEventLink =
59+
Link.WorkflowEvent.newBuilder()
60+
.setNamespace(nexusCtx.getNamespace())
61+
.setWorkflowId(workflowExec.getWorkflowId())
62+
.setRunId(workflowExec.getRunId())
63+
.setEventRef(
64+
Link.WorkflowEvent.EventReference.newBuilder()
65+
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
66+
.build();
67+
}
68+
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
69+
if (nexusLink != null) {
70+
try {
71+
ctx.addLinks(nexusProtoLinkToLink(nexusLink));
72+
} catch (URISyntaxException e) {
73+
throw new HandlerException(HandlerException.ErrorType.INTERNAL, "failed to parse URI", e);
74+
}
75+
}
76+
77+
return response;
78+
}
79+
80+
private NexusStartWorkflowHelper() {}
81+
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/WorkflowRunOperationToken.java renamed to temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import com.fasterxml.jackson.annotation.JsonInclude;
44
import com.fasterxml.jackson.annotation.JsonProperty;
55

6-
public class WorkflowRunOperationToken {
6+
/** Deserialized representation of a Nexus operation token. */
7+
public class OperationToken {
78
@JsonProperty("v")
89
@JsonInclude(JsonInclude.Include.NON_NULL)
910
private final Integer version;
@@ -17,7 +18,7 @@ public class WorkflowRunOperationToken {
1718
@JsonProperty("wid")
1819
private final String workflowId;
1920

20-
public WorkflowRunOperationToken(
21+
public OperationToken(
2122
@JsonProperty("t") Integer type,
2223
@JsonProperty("ns") String namespace,
2324
@JsonProperty("wid") String workflowId,
@@ -28,8 +29,8 @@ public WorkflowRunOperationToken(
2829
this.version = version;
2930
}
3031

31-
public WorkflowRunOperationToken(String namespace, String workflowId) {
32-
this.type = OperationTokenType.WORKFLOW_RUN;
32+
public OperationToken(OperationTokenType type, String namespace, String workflowId) {
33+
this.type = type;
3334
this.namespace = namespace;
3435
this.workflowId = workflowId;
3536
this.version = null;

temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,45 @@ public class OperationTokenUtil {
1515
private static final Base64.Encoder encoder = Base64.getUrlEncoder().withoutPadding();
1616

1717
/**
18-
* Load a workflow run operation token from an operation token.
18+
* Load and validate an operation token without asserting the token type. Use this for cancel
19+
* dispatch where the token type determines the cancel behavior.
1920
*
20-
* @throws IllegalArgumentException if the operation token is invalid
21+
* @throws IllegalArgumentException if the operation token is malformed or has invalid structure
2122
*/
22-
public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken) {
23-
WorkflowRunOperationToken token;
23+
public static OperationToken loadOperationToken(String operationToken) {
24+
OperationToken token;
2425
try {
25-
JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class);
26+
JavaType reference = mapper.getTypeFactory().constructType(OperationToken.class);
2627
token = mapper.readValue(decoder.decode(operationToken), reference);
2728
} catch (Exception e) {
2829
throw new IllegalArgumentException("Failed to parse operation token: " + e.getMessage());
2930
}
30-
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
31-
throw new IllegalArgumentException(
32-
"Invalid workflow run token: incorrect operation token type: " + token.getType());
33-
}
3431
if (token.getVersion() != null && token.getVersion() != 0) {
35-
throw new IllegalArgumentException("Invalid workflow run token: unexpected version field");
32+
throw new IllegalArgumentException("Invalid operation token: unexpected version field");
3633
}
3734
if (Strings.isNullOrEmpty(token.getWorkflowId())) {
38-
throw new IllegalArgumentException("Invalid workflow run token: missing workflow ID (wid)");
35+
throw new IllegalArgumentException("Invalid operation token: missing workflow ID (wid)");
36+
}
37+
return token;
38+
}
39+
40+
/**
41+
* Load a workflow run operation token, asserting that the token type is {@link
42+
* OperationTokenType#WORKFLOW_RUN}.
43+
*
44+
* @throws IllegalArgumentException if the operation token is invalid or not a workflow run token
45+
*/
46+
public static OperationToken loadWorkflowRunOperationToken(String operationToken) {
47+
OperationToken token = loadOperationToken(operationToken);
48+
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
49+
throw new IllegalArgumentException(
50+
"Invalid workflow run token: incorrect operation token type: " + token.getType());
3951
}
4052
return token;
4153
}
4254

4355
/**
44-
* Attempt to extract the workflow Id from an operation token.
56+
* Extract the workflow ID from a workflow run operation token.
4557
*
4658
* @throws IllegalArgumentException if the operation token is invalid
4759
*/
@@ -52,7 +64,9 @@ public static String loadWorkflowIdFromOperationToken(String operationToken) {
5264
/** Generate a workflow run operation token from a workflow ID and namespace. */
5365
public static String generateWorkflowRunOperationToken(String workflowId, String namespace)
5466
throws JsonProcessingException {
55-
String json = ow.writeValueAsString(new WorkflowRunOperationToken(namespace, workflowId));
67+
String json =
68+
ow.writeValueAsString(
69+
new OperationToken(OperationTokenType.WORKFLOW_RUN, namespace, workflowId));
5670
return encoder.encodeToString(json.getBytes());
5771
}
5872

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.temporal.nexus;
2+
3+
import io.temporal.common.Experimental;
4+
import java.util.Objects;
5+
6+
/**
7+
* Input to {@link TemporalOperationHandler#cancelWorkflowRun} describing the workflow run to
8+
* cancel.
9+
*/
10+
@Experimental
11+
public final class CancelWorkflowRunInput {
12+
13+
private final String workflowId;
14+
15+
public CancelWorkflowRunInput(String workflowId) {
16+
this.workflowId = Objects.requireNonNull(workflowId);
17+
}
18+
19+
/** Returns the workflow ID extracted from the operation token. */
20+
public String getWorkflowId() {
21+
return workflowId;
22+
}
23+
}

0 commit comments

Comments
 (0)