Skip to content

Commit aa093f1

Browse files
some clean up
1 parent 0af1921 commit aa093f1

7 files changed

Lines changed: 131 additions & 23 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationToken.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,23 @@ public class OperationToken {
2424
@JsonInclude(JsonInclude.Include.NON_NULL)
2525
private final String activityId;
2626

27+
@JsonProperty("rid")
28+
@JsonInclude(JsonInclude.Include.NON_NULL)
29+
private final String runId;
30+
2731
@JsonCreator
2832
public OperationToken(
2933
@JsonProperty("t") Integer type,
3034
@JsonProperty("ns") String namespace,
3135
@JsonProperty("wid") String workflowId,
3236
@JsonProperty("aid") String activityId,
37+
@JsonProperty("rid") String runId,
3338
@JsonProperty("v") Integer version) {
3439
this.type = OperationTokenType.fromValue(type);
3540
this.namespace = namespace;
3641
this.workflowId = workflowId;
3742
this.activityId = activityId;
43+
this.runId = runId;
3844
this.version = version;
3945
}
4046

@@ -43,15 +49,26 @@ public OperationToken(OperationTokenType type, String namespace, String workflow
4349
this.namespace = namespace;
4450
this.workflowId = workflowId;
4551
this.activityId = null;
52+
this.runId = null;
4653
this.version = null;
4754
}
4855

4956
public OperationToken(
5057
OperationTokenType type, String namespace, String workflowId, String activityId) {
58+
this(type, namespace, workflowId, activityId, null);
59+
}
60+
61+
public OperationToken(
62+
OperationTokenType type,
63+
String namespace,
64+
String workflowId,
65+
String activityId,
66+
String runId) {
5167
this.type = type;
5268
this.namespace = namespace;
5369
this.workflowId = workflowId;
5470
this.activityId = activityId;
71+
this.runId = runId;
5572
this.version = null;
5673
}
5774

@@ -74,4 +91,15 @@ public String getWorkflowId() {
7491
public String getActivityId() {
7592
return activityId;
7693
}
94+
95+
/**
96+
* Returns the activity run ID embedded in the token, or {@code null} if absent.
97+
*
98+
* <p>Run ID is only present on activity-execution tokens that were generated AFTER the start
99+
* activity RPC completed (so the run ID was known). Tokens written into the Nexus operation-token
100+
* callback header are generated before that point and therefore do not carry a run ID.
101+
*/
102+
public String getRunId() {
103+
return runId;
104+
}
77105
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,33 @@ public static String generateWorkflowRunOperationToken(String workflowId, String
107107
return encoder.encodeToString(json.getBytes());
108108
}
109109

110-
/** Generate an activity execution operation token from an activity ID and namespace. */
110+
/**
111+
* Generate an activity execution operation token from an activity ID and namespace.
112+
*
113+
* <p>This overload omits the run ID. Use it when writing the token into the Nexus operation-token
114+
* callback header — that token is generated before the start RPC completes, so the run ID is not
115+
* yet known.
116+
*/
111117
public static String generateActivityExecutionOperationToken(String activityId, String namespace)
112118
throws JsonProcessingException {
119+
return generateActivityExecutionOperationToken(activityId, null, namespace);
120+
}
121+
122+
/**
123+
* Generate an activity execution operation token from an activity ID, run ID, and namespace. The
124+
* {@code runId} is included only when non-null.
125+
*
126+
* <p>This overload is used for the operation token returned to the Nexus caller from a start
127+
* operation — at that point the start RPC has completed and the run ID is known. The header token
128+
* written into the activity completion callback must NOT carry a run ID; use {@link
129+
* #generateActivityExecutionOperationToken(String, String)} for that path.
130+
*/
131+
public static String generateActivityExecutionOperationToken(
132+
String activityId, String runId, String namespace) throws JsonProcessingException {
113133
String json =
114134
ow.writeValueAsString(
115-
new OperationToken(OperationTokenType.ACTIVITY_EXECUTION, namespace, null, activityId));
135+
new OperationToken(
136+
OperationTokenType.ACTIVITY_EXECUTION, namespace, null, activityId, runId));
116137
return encoder.encodeToString(json.getBytes());
117138
}
118139

temporal-sdk/src/main/java/io/temporal/nexus/CancelActivityExecutionInput.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.temporal.common.Experimental;
44
import java.util.Objects;
5+
import javax.annotation.Nullable;
56

67
/**
78
* Input to {@link TemporalOperationHandler#cancelActivityExecution} describing the activity
@@ -11,13 +12,27 @@
1112
public final class CancelActivityExecutionInput {
1213

1314
private final String activityId;
15+
private final @Nullable String runId;
1416

15-
public CancelActivityExecutionInput(String activityId) {
17+
public CancelActivityExecutionInput(String activityId, @Nullable String runId) {
1618
this.activityId = Objects.requireNonNull(activityId);
19+
this.runId = runId;
1720
}
1821

1922
/** Returns the activity ID extracted from the operation token. */
2023
public String getActivityId() {
2124
return activityId;
2225
}
26+
27+
/**
28+
* Returns the activity run ID extracted from the operation token, or {@code null} if absent.
29+
*
30+
* <p>Run ID is only present on operation tokens that were generated by this SDK AFTER the start
31+
* activity RPC completed. Tokens originating from the activity completion callback header do not
32+
* carry a run ID.
33+
*/
34+
@Nullable
35+
public String getRunId() {
36+
return runId;
37+
}
2338
}

temporal-sdk/src/main/java/io/temporal/nexus/TemporalNexusClientImpl.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.temporal.nexus;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
34
import io.nexusrpc.handler.HandlerException;
45
import io.nexusrpc.handler.OperationContext;
56
import io.nexusrpc.handler.OperationStartDetails;
@@ -17,6 +18,7 @@
1718
import io.temporal.internal.client.NexusStartWorkflowResponse;
1819
import io.temporal.internal.nexus.NexusStartActivityHelper;
1920
import io.temporal.internal.nexus.NexusStartWorkflowHelper;
21+
import io.temporal.internal.nexus.OperationTokenUtil;
2022
import io.temporal.internal.util.MethodExtractor;
2123
import io.temporal.workflow.Functions;
2224
import java.lang.reflect.Method;
@@ -257,7 +259,7 @@ public <R> TemporalOperationResult<R> startWorkflow(
257259
}
258260

259261
private <R> TemporalOperationResult<R> invokeAndReturn(WorkflowHandle<R> handle) {
260-
claimAsyncSlot();
262+
markAsyncOperationStarted();
261263
try {
262264
NexusStartWorkflowResponse response =
263265
NexusStartWorkflowHelper.startWorkflowAndAttachLinks(
@@ -436,7 +438,7 @@ public <R> TemporalOperationResult<R> startActivity(
436438

437439
private <R> TemporalOperationResult<R> startActivityImpl(
438440
String activityType, List<Object> args, StartActivityOptions options) {
439-
claimAsyncSlot();
441+
markAsyncOperationStarted();
440442
try {
441443
NexusStartActivityResponse response =
442444
NexusStartActivityHelper.startActivityAndAttachLinks(
@@ -473,18 +475,34 @@ private <R> TemporalOperationResult<R> startActivityImpl(
473475
.build());
474476
ActivityClientCallsInterceptor.StartActivityOutput out =
475477
((ActivityClientInternal) activityClient).getInvoker().startActivity(input);
476-
// The invoker generated and injected the token into the callback headers before
477-
// the start RPC; read it back from the output instead of regenerating it.
478-
String token = out.getNexusOperationToken();
479-
if (token == null) {
478+
// The invoker generated and injected the runId-free token into the callback
479+
// headers before the start RPC fired. The header token cannot include a run ID
480+
// because the run ID isn't known until after the start RPC returns. The operation
481+
// token returned to the Nexus caller can — and should — include it, so it's
482+
// regenerated here from the same activity ID + the run ID the start RPC produced.
483+
String headerToken = out.getNexusOperationToken();
484+
if (headerToken == null) {
480485
throw new HandlerException(
481486
HandlerException.ErrorType.INTERNAL,
482487
"invoker did not return a Nexus operation token for activity start with callback",
483488
new IllegalStateException(
484489
"nexusOperationToken is null on StartActivityOutput when CompletionCallback was set"));
485490
}
491+
String returnToken;
492+
try {
493+
returnToken =
494+
OperationTokenUtil.generateActivityExecutionOperationToken(
495+
out.getActivityId(),
496+
out.getActivityRunId(),
497+
client.getOptions().getNamespace());
498+
} catch (JsonProcessingException e) {
499+
throw new HandlerException(
500+
HandlerException.ErrorType.INTERNAL,
501+
"failed to generate activity operation token",
502+
e);
503+
}
486504
return new NexusStartActivityResponse(
487-
out.getActivityId(), out.getActivityRunId(), token);
505+
out.getActivityId(), out.getActivityRunId(), returnToken);
488506
});
489507
return TemporalOperationResult.async(response.getOperationToken());
490508
} catch (Throwable t) {
@@ -495,7 +513,7 @@ private <R> TemporalOperationResult<R> startActivityImpl(
495513
}
496514
}
497515

498-
private void claimAsyncSlot() {
516+
private void markAsyncOperationStarted() {
499517
if (!asyncOperationStarted.compareAndSet(false, true)) {
500518
throw new HandlerException(
501519
HandlerException.ErrorType.BAD_REQUEST,

temporal-sdk/src/main/java/io/temporal/nexus/TemporalOperationHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public final void cancel(OperationContext ctx, OperationCancelDetails details) {
110110
break;
111111
case ACTIVITY_EXECUTION:
112112
cancelActivityExecution(
113-
cancelContext, new CancelActivityExecutionInput(token.getActivityId()));
113+
cancelContext,
114+
new CancelActivityExecutionInput(token.getActivityId(), token.getRunId()));
114115
break;
115116
default:
116117
throw new HandlerException(
@@ -154,6 +155,6 @@ protected void cancelActivityExecution(
154155
.setDataConverter(wc.getOptions().getDataConverter())
155156
.setIdentity(wc.getOptions().getIdentity())
156157
.build());
157-
ac.getHandle(input.getActivityId(), null).cancel();
158+
ac.getHandle(input.getActivityId(), input.getRunId()).cancel();
158159
}
159160
}

temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,36 @@ public void roundTripActivityExecutionToken() throws JsonProcessingException {
100100
Assert.assertEquals("act-1", token.getActivityId());
101101
Assert.assertEquals("ns", token.getNamespace());
102102
Assert.assertNull(token.getWorkflowId());
103+
Assert.assertNull(token.getRunId());
103104
Assert.assertNull(token.getVersion());
104105

105106
// Also exercise the symmetric activityId loader.
106107
Assert.assertEquals("act-1", OperationTokenUtil.loadActivityIdFromOperationToken(encoded));
107108
}
108109

110+
@Test
111+
public void roundTripActivityExecutionTokenWithRunId() throws JsonProcessingException {
112+
String encoded =
113+
OperationTokenUtil.generateActivityExecutionOperationToken("act-1", "run-1", "ns");
114+
OperationToken token = OperationTokenUtil.loadOperationToken(encoded);
115+
Assert.assertEquals(OperationTokenType.ACTIVITY_EXECUTION, token.getType());
116+
Assert.assertEquals("act-1", token.getActivityId());
117+
Assert.assertEquals("run-1", token.getRunId());
118+
Assert.assertEquals("ns", token.getNamespace());
119+
Assert.assertNull(token.getWorkflowId());
120+
Assert.assertNull(token.getVersion());
121+
}
122+
123+
@Test
124+
public void activityExecutionTokenOmitsRunIdWhenNull() throws JsonProcessingException {
125+
// The header-token path passes runId=null and must produce a payload byte-identical to the
126+
// two-arg overload — the runId-aware overload is the *only* one used in code now.
127+
String withRunId =
128+
OperationTokenUtil.generateActivityExecutionOperationToken("act-1", null, "ns");
129+
String withoutRunId = OperationTokenUtil.generateActivityExecutionOperationToken("act-1", "ns");
130+
Assert.assertEquals(withoutRunId, withRunId);
131+
}
132+
109133
@Test
110134
public void workflowRunTokenBytesByteIdenticalSnapshot() throws JsonProcessingException {
111135
String encoded = OperationTokenUtil.generateWorkflowRunOperationToken("wf-1", "ns");

temporal-sdk/src/test/java/io/temporal/nexus/TemporalNexusClientImplTest.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
import org.junit.Test;
2121

2222
/**
23-
* Pure unit tests for {@link TemporalNexusClientImpl#claimAsyncSlot()} semantics. These run without
24-
* a Temporal server (no {@link io.temporal.testing.internal.SDKTestWorkflowRule}).
23+
* Pure unit tests for {@link TemporalNexusClientImpl#markAsyncOperationStarted()} semantics. These
24+
* run without a Temporal server (no {@link io.temporal.testing.internal.SDKTestWorkflowRule}).
2525
*
26-
* <p>The {@code claimAsyncSlot} guard fires as the very first statement in both {@code
26+
* <p>The {@code markAsyncOperationStarted} guard fires as the very first statement in both {@code
2727
* startActivityImpl} and {@code invokeAndReturn}, so the second call always throws {@link
2828
* HandlerException}({@link HandlerException.ErrorType#BAD_REQUEST}) regardless of whether the first
2929
* call's downstream RPC succeeded.
@@ -80,22 +80,23 @@ public void doubleStartActivity_secondCallThrowsBadRequest() {
8080
.setStartToCloseTimeout(Duration.ofSeconds(10))
8181
.build();
8282

83-
// First call: claimAsyncSlot() succeeds (sets flag), RPC may throw — we don't care.
83+
// First call: markAsyncOperationStarted() succeeds (sets flag), RPC may throw — we don't care.
8484
try {
8585
client.startActivity(TestActivity.class, TestActivity::doSomething, options);
8686
} catch (HandlerException e) {
8787
// If a HandlerException leaks out of the first call it must NOT be BAD_REQUEST
88-
// from claimAsyncSlot — that would mean the flag was already set before setUp.
88+
// from markAsyncOperationStarted — that would mean the flag was already set before setUp.
8989
Assert.assertNotEquals(
90-
"First startActivity must not fail with BAD_REQUEST (claimAsyncSlot guard)",
90+
"First startActivity must not fail with BAD_REQUEST (markAsyncOperationStarted guard)",
9191
HandlerException.ErrorType.BAD_REQUEST,
9292
e.getErrorType());
9393
} catch (Exception ignored) {
9494
// Any other exception from the RPC layer is expected; the important thing is
95-
// claimAsyncSlot already ran and set asyncOperationStarted = true.
95+
// markAsyncOperationStarted already ran and set asyncOperationStarted = true.
9696
}
9797

98-
// Second call: claimAsyncSlot() sees the flag and must throw BAD_REQUEST immediately.
98+
// Second call: markAsyncOperationStarted() sees the flag and must throw BAD_REQUEST
99+
// immediately.
99100
HandlerException ex =
100101
Assert.assertThrows(
101102
HandlerException.class,
@@ -125,12 +126,12 @@ public void doubleStartWorkflow_secondCallThrowsBadRequest() {
125126
WorkflowOptions options =
126127
WorkflowOptions.newBuilder().setWorkflowId("wf-1").setTaskQueue(TASK_QUEUE).build();
127128

128-
// First call: claimAsyncSlot() succeeds, downstream RPC may throw — we don't care.
129+
// First call: markAsyncOperationStarted() succeeds, downstream RPC may throw — we don't care.
129130
try {
130131
client.startWorkflow(BlockingWorkflow.class, BlockingWorkflow::execute, "input", options);
131132
} catch (HandlerException e) {
132133
Assert.assertNotEquals(
133-
"First startWorkflow must not fail with BAD_REQUEST (claimAsyncSlot guard)",
134+
"First startWorkflow must not fail with BAD_REQUEST (markAsyncOperationStarted guard)",
134135
HandlerException.ErrorType.BAD_REQUEST,
135136
e.getErrorType());
136137
} catch (Exception ignored) {

0 commit comments

Comments
 (0)