Skip to content

Commit 4182485

Browse files
authored
Merge branch 'master' into add_application_err_category
2 parents b25c482 + 2807771 commit 4182485

10 files changed

Lines changed: 175 additions & 59 deletions

File tree

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ before we can merge in any of your changes
88

99
## Development Environment
1010

11-
* Java 11+
11+
* Java 21+
1212
* Docker to run Temporal Server
1313

1414
## Build

temporal-sdk/src/main/java/io/temporal/internal/nexus/OperationTokenUtil.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,15 @@ public class OperationTokenUtil {
3737
/**
3838
* Load a workflow run operation token from an operation token.
3939
*
40-
* @throws FallbackToWorkflowIdException if the operation token is not a workflow run token
4140
* @throws IllegalArgumentException if the operation token is invalid
4241
*/
43-
public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken)
44-
throws FallbackToWorkflowIdException {
42+
public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String operationToken) {
4543
WorkflowRunOperationToken token;
4644
try {
4745
JavaType reference = mapper.getTypeFactory().constructType(WorkflowRunOperationToken.class);
4846
token = mapper.readValue(decoder.decode(operationToken), reference);
4947
} catch (Exception e) {
50-
throw new FallbackToWorkflowIdException("Failed to parse operation token: " + e.getMessage());
48+
throw new IllegalArgumentException("Failed to parse operation token: " + e.getMessage());
5149
}
5250
if (!token.getType().equals(OperationTokenType.WORKFLOW_RUN)) {
5351
throw new IllegalArgumentException(
@@ -68,16 +66,7 @@ public static WorkflowRunOperationToken loadWorkflowRunOperationToken(String ope
6866
* @throws IllegalArgumentException if the operation token is invalid
6967
*/
7068
public static String loadWorkflowIdFromOperationToken(String operationToken) {
71-
try {
72-
WorkflowRunOperationToken token = loadWorkflowRunOperationToken(operationToken);
73-
return token.getWorkflowId();
74-
} catch (OperationTokenUtil.FallbackToWorkflowIdException e) {
75-
// Previous versions of the SDK simply used the workflow ID as the operation token
76-
// This fallback is provided for backwards compatibility for those cases.
77-
// This fallback will be removed in a future release.
78-
// See: https://github.com/temporalio/sdk-java/issues/2423
79-
return operationToken;
80-
}
69+
return loadWorkflowRunOperationToken(operationToken).getWorkflowId();
8170
}
8271

8372
/** Generate a workflow run operation token from a workflow ID and namespace. */
@@ -87,11 +76,5 @@ public static String generateWorkflowRunOperationToken(String workflowId, String
8776
return encoder.encodeToString(json.getBytes());
8877
}
8978

90-
public static class FallbackToWorkflowIdException extends RuntimeException {
91-
public FallbackToWorkflowIdException(String message) {
92-
super(message);
93-
}
94-
}
95-
9679
private OperationTokenUtil() {}
9780
}

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ private WorkflowThread newRootThread(Runnable runnable) {
449449
"newRootThread can be called only if there is no existing root workflow thread");
450450
}
451451
rootWorkflowThread =
452-
new WorkflowThreadImpl(
452+
new RootWorkflowThreadImpl(
453453
workflowThreadExecutor,
454454
workflowContext,
455455
this,
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.sync;
22+
23+
import io.temporal.common.context.ContextPropagator;
24+
import io.temporal.internal.worker.WorkflowExecutorCache;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.function.Supplier;
28+
import javax.annotation.Nonnull;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
public class RootWorkflowThreadImpl extends WorkflowThreadImpl {
33+
private static final Logger log = LoggerFactory.getLogger(RootWorkflowThreadImpl.class);
34+
35+
RootWorkflowThreadImpl(
36+
WorkflowThreadExecutor workflowThreadExecutor,
37+
SyncWorkflowContext syncWorkflowContext,
38+
DeterministicRunnerImpl runner,
39+
@Nonnull String name,
40+
int priority,
41+
boolean detached,
42+
CancellationScopeImpl parentCancellationScope,
43+
Runnable runnable,
44+
WorkflowExecutorCache cache,
45+
List<ContextPropagator> contextPropagators,
46+
Map<String, Object> propagatedContexts) {
47+
super(
48+
workflowThreadExecutor,
49+
syncWorkflowContext,
50+
runner,
51+
name,
52+
priority,
53+
detached,
54+
parentCancellationScope,
55+
runnable,
56+
cache,
57+
contextPropagators,
58+
propagatedContexts);
59+
}
60+
61+
@Override
62+
public void yield(String reason, Supplier<Boolean> unblockCondition)
63+
throws DestroyWorkflowThreadError {
64+
log.warn(
65+
"Detected root workflow thread yielding {}. This can happen by making blocking calls during workflow instance creating, such as executing an activity inside a @WorkflowInit constructor. This can cause issues, like Queries or Updates rejection because the workflow instance creation is delayed",
66+
getName());
67+
super.yield(reason, unblockCondition);
68+
}
69+
}

temporal-sdk/src/main/java/io/temporal/workflow/WorkflowInit.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
* thrown by the constructor are treated the same as exceptions thrown by the workflow method.
3535
*
3636
* <p>Workflow initialization methods are called before the workflow method, signal handlers, update
37-
* handlers or query handlers.
37+
* handlers or query handlers. Users should be careful not to block in constructors. Blocking in the
38+
* constructor will delay handling of the workflow method and signal handlers, and cause rejection
39+
* of the update and query handlers.
3840
*
3941
* <p>This annotation applies only to workflow implementation constructors.
4042
*/

temporal-sdk/src/test/java/io/temporal/internal/nexus/WorkflowRunTokenTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,11 @@ public void deserializeWorkflowRunToken() throws IOException {
7676
}
7777

7878
@Test
79-
public void loadOldWorkflowRunToken() {
79+
public void failLoadOldWorkflowRunToken() {
8080
String operationToken = "AAAAA-BBBBB-CCCCC";
81-
Assert.assertEquals(
82-
operationToken, OperationTokenUtil.loadWorkflowIdFromOperationToken(operationToken));
81+
Assert.assertThrows(
82+
IllegalArgumentException.class,
83+
() -> OperationTokenUtil.loadWorkflowIdFromOperationToken(operationToken));
8384
}
8485

8586
@Test

temporal-sdk/src/test/java/io/temporal/internal/testing/ActivityTestingTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.temporal.client.ActivityCanceledException;
3232
import io.temporal.failure.ActivityFailure;
3333
import io.temporal.failure.ApplicationFailure;
34+
import io.temporal.testing.ActivityRequestedAsyncCompletion;
3435
import io.temporal.testing.TestActivityEnvironment;
3536
import java.io.IOException;
3637
import java.util.ArrayList;
@@ -39,10 +40,7 @@
3940
import java.util.concurrent.ConcurrentHashMap;
4041
import java.util.concurrent.atomic.AtomicInteger;
4142
import java.util.concurrent.atomic.AtomicReference;
42-
import org.junit.After;
43-
import org.junit.Before;
44-
import org.junit.Rule;
45-
import org.junit.Test;
43+
import org.junit.*;
4644
import org.junit.rules.Timeout;
4745

4846
public class ActivityTestingTest {
@@ -111,6 +109,21 @@ public void testFailure() {
111109
}
112110
}
113111

112+
private static class AsyncActivityImpl implements TestActivity {
113+
@Override
114+
public String activity1(String input) {
115+
Activity.getExecutionContext().doNotCompleteOnReturn();
116+
return "";
117+
}
118+
}
119+
120+
@Test
121+
public void testAsyncActivity() {
122+
testEnvironment.registerActivitiesImplementations(new AsyncActivityImpl());
123+
TestActivity activity = testEnvironment.newActivityStub(TestActivity.class);
124+
Assert.assertThrows(ActivityRequestedAsyncCompletion.class, () -> activity.activity1("input1"));
125+
}
126+
114127
private static class HeartbeatActivityImpl implements TestActivity {
115128

116129
@Override
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.testing;
22+
23+
/**
24+
* Exception thrown when an activity request to complete asynchronously in the {@link
25+
* TestActivityEnvironment}. Intended to be used in unit tests to assert an activity requested async
26+
* completion.
27+
*/
28+
public final class ActivityRequestedAsyncCompletion extends RuntimeException {
29+
private final String activityId;
30+
private final boolean manualCompletion;
31+
32+
public ActivityRequestedAsyncCompletion(String activityId, boolean manualCompletion) {
33+
super("activity requested async completion");
34+
this.activityId = activityId;
35+
this.manualCompletion = manualCompletion;
36+
}
37+
38+
public String getActivityId() {
39+
return activityId;
40+
}
41+
42+
public boolean isManualCompletion() {
43+
return manualCompletion;
44+
}
45+
}

temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironment.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) {
8282
* Creates a stub that can be used to invoke activities registered through {@link
8383
* #registerActivitiesImplementations(Object...)}.
8484
*
85+
* <p>Activity methods may throw {@link ActivityRequestedAsyncCompletion} if the activity
86+
* requested async completion.
87+
*
8588
* @param activityInterface activity interface class that the object under test implements.
8689
* @param <T> Type of the activity interface.
8790
* @return The stub that implements the activity interface.
@@ -92,6 +95,9 @@ static TestActivityEnvironment newInstance(TestEnvironmentOptions options) {
9295
* Creates a stub that can be used to invoke activities registered through {@link
9396
* #registerActivitiesImplementations(Object...)}.
9497
*
98+
* <p>Activity methods may throw {@link ActivityRequestedAsyncCompletion} if the activity
99+
* requested async completion.
100+
*
95101
* @param <T> Type of the activity interface.
96102
* @param activityInterface activity interface class that the object under test implements
97103
* @param options options that specify the activity invocation parameters

temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
package io.temporal.testing;
2222

23-
import com.google.common.base.Defaults;
2423
import com.google.protobuf.ByteString;
2524
import com.uber.m3.tally.NoopScope;
2625
import com.uber.m3.tally.Scope;
@@ -511,40 +510,38 @@ private <T> T getReply(
511510
Type resultType) {
512511
DataConverter dataConverter =
513512
testEnvironmentOptions.getWorkflowClientOptions().getDataConverter();
514-
RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
515-
if (taskCompleted != null) {
513+
if (response.getTaskCompleted() != null) {
514+
RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
516515
Optional<Payloads> result =
517516
taskCompleted.hasResult() ? Optional.of(taskCompleted.getResult()) : Optional.empty();
518517
return dataConverter.fromPayloads(0, result, resultClass, resultType);
519-
} else {
518+
} else if (response.getTaskFailed() != null) {
520519
RespondActivityTaskFailedRequest taskFailed =
521520
response.getTaskFailed().getTaskFailedRequest();
522-
if (taskFailed != null) {
523-
Exception cause = dataConverter.failureToException(taskFailed.getFailure());
524-
throw new ActivityFailure(
525-
taskFailed.getFailure().getMessage(),
526-
0,
527-
0,
528-
task.getActivityType().getName(),
529-
task.getActivityId(),
530-
RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
531-
"TestActivityEnvironment",
532-
cause);
533-
} else {
534-
RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
535-
if (taskCanceled != null) {
536-
throw new CanceledFailure(
537-
"canceled",
538-
new EncodedValues(
539-
taskCanceled.hasDetails()
540-
? Optional.of(taskCanceled.getDetails())
541-
: Optional.empty(),
542-
dataConverter),
543-
null);
544-
}
545-
}
521+
Exception cause = dataConverter.failureToException(taskFailed.getFailure());
522+
throw new ActivityFailure(
523+
taskFailed.getFailure().getMessage(),
524+
0,
525+
0,
526+
task.getActivityType().getName(),
527+
task.getActivityId(),
528+
RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
529+
"TestActivityEnvironment",
530+
cause);
531+
} else if (response.getTaskCanceled() != null) {
532+
RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
533+
throw new CanceledFailure(
534+
"canceled",
535+
new EncodedValues(
536+
taskCanceled.hasDetails()
537+
? Optional.of(taskCanceled.getDetails())
538+
: Optional.empty(),
539+
dataConverter),
540+
null);
541+
} else {
542+
throw new ActivityRequestedAsyncCompletion(
543+
task.getActivityId(), response.isManualCompletion());
546544
}
547-
return Defaults.defaultValue(resultClass);
548545
}
549546

550547
@Override

0 commit comments

Comments
 (0)