Skip to content

Commit 3af25d9

Browse files
Add support for activity reset
1 parent f919926 commit 3af25d9

5 files changed

Lines changed: 145 additions & 3 deletions

File tree

temporal-sdk/src/main/java/io/temporal/client/ActivityPausedException.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.temporal.client;
22

33
import io.temporal.activity.ActivityInfo;
4+
import io.temporal.common.Experimental;
45

56
/***
67
* Indicates that the activity was paused by the user.
78
*
89
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
910
*/
11+
@Experimental
1012
public final class ActivityPausedException extends ActivityCompletionException {
1113
public ActivityPausedException(ActivityInfo info) {
1214
super(info);
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.temporal.client;
2+
3+
import io.temporal.activity.ActivityInfo;
4+
import io.temporal.common.Experimental;
5+
6+
/***
7+
* Indicates that the activity attempt was reset by the user.
8+
*
9+
* <p>Catching this exception directly is discouraged and catching the parent class {@link ActivityCompletionException} is recommended instead.<br>
10+
*/
11+
@Experimental
12+
public final class ActivityResetException extends ActivityCompletionException {
13+
public ActivityResetException(ActivityInfo info) {
14+
super(info);
15+
}
16+
17+
public ActivityResetException() {
18+
super();
19+
}
20+
}

temporal-sdk/src/main/java/io/temporal/internal/activity/HeartbeatContextImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,8 @@ private void sendHeartbeatRequest(Object details) {
228228
lastException = new ActivityCanceledException(info);
229229
} else if (status.getActivityPaused()) {
230230
lastException = new ActivityPausedException(info);
231+
} else if (status.getActivityReset()) {
232+
lastException = new ActivityResetException(info);
231233
} else {
232234
lastException = null;
233235
}

temporal-sdk/src/main/java/io/temporal/internal/client/external/ManualActivityCompletionClientImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import io.temporal.api.common.v1.Payloads;
1212
import io.temporal.api.common.v1.WorkflowExecution;
1313
import io.temporal.api.workflowservice.v1.*;
14-
import io.temporal.client.ActivityCanceledException;
15-
import io.temporal.client.ActivityCompletionFailureException;
16-
import io.temporal.client.ActivityNotExistsException;
14+
import io.temporal.client.*;
1715
import io.temporal.common.converter.DataConverter;
1816
import io.temporal.failure.CanceledFailure;
1917
import io.temporal.internal.client.ActivityClientHelper;
@@ -190,6 +188,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure {
190188
metricsScope);
191189
if (status.getCancelRequested()) {
192190
throw new ActivityCanceledException();
191+
} else if (status.getActivityPaused()) {
192+
throw new ActivityPausedException();
193+
} else if (status.getActivityReset()) {
194+
throw new ActivityResetException();
193195
}
194196
} else {
195197
RecordActivityTaskHeartbeatByIdResponse status =
@@ -203,6 +205,10 @@ public void recordHeartbeat(@Nullable Object details) throws CanceledFailure {
203205
metricsScope);
204206
if (status.getCancelRequested()) {
205207
throw new ActivityCanceledException();
208+
} else if (status.getActivityPaused()) {
209+
throw new ActivityPausedException();
210+
} else if (status.getActivityReset()) {
211+
throw new ActivityResetException();
206212
}
207213
}
208214
} catch (Exception e) {
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package io.temporal.activity;
2+
3+
import io.temporal.api.common.v1.WorkflowExecution;
4+
import io.temporal.api.workflow.v1.PendingActivityInfo;
5+
import io.temporal.api.workflowservice.v1.ResetActivityRequest;
6+
import io.temporal.client.ActivityResetException;
7+
import io.temporal.client.WorkflowStub;
8+
import io.temporal.common.converter.GlobalDataConverter;
9+
import io.temporal.testing.internal.SDKTestOptions;
10+
import io.temporal.testing.internal.SDKTestWorkflowRule;
11+
import io.temporal.workflow.Async;
12+
import io.temporal.workflow.Workflow;
13+
import io.temporal.workflow.shared.TestActivities;
14+
import io.temporal.workflow.shared.TestWorkflows;
15+
import java.time.Duration;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
import org.junit.Assert;
18+
import org.junit.Rule;
19+
import org.junit.Test;
20+
21+
public class ActivityResetTest {
22+
23+
@Rule
24+
public SDKTestWorkflowRule testWorkflowRule =
25+
SDKTestWorkflowRule.newBuilder()
26+
.setWorkflowTypes(TestWorkflowImpl.class)
27+
.setActivityImplementations(new HeartBeatingActivityImpl())
28+
.setUseExternalService(true)
29+
.build();
30+
31+
@Test
32+
public void activityReset() {
33+
// assumeTrue(
34+
// "Test Server doesn't support activity pause", SDKTestWorkflowRule.useExternalService);
35+
36+
TestWorkflows.TestWorkflowReturnString workflow =
37+
testWorkflowRule.newWorkflowStub(TestWorkflows.TestWorkflowReturnString.class);
38+
Assert.assertEquals("I am stopped after reset", workflow.execute());
39+
Assert.assertEquals(
40+
1,
41+
WorkflowStub.fromTyped(workflow)
42+
.describe()
43+
.getRawDescription()
44+
.getPendingActivitiesCount());
45+
PendingActivityInfo activityInfo =
46+
WorkflowStub.fromTyped(workflow).describe().getRawDescription().getPendingActivities(0);
47+
Assert.assertEquals(
48+
"1",
49+
GlobalDataConverter.get()
50+
.fromPayload(
51+
activityInfo.getHeartbeatDetails().getPayloads(0), String.class, String.class));
52+
}
53+
54+
public static class TestWorkflowImpl implements TestWorkflows.TestWorkflowReturnString {
55+
56+
private final TestActivities.TestActivity1 activities =
57+
Workflow.newActivityStub(
58+
TestActivities.TestActivity1.class,
59+
SDKTestOptions.newActivityOptions20sScheduleToClose());
60+
61+
@Override
62+
public String execute() {
63+
Async.function(activities::execute, "");
64+
Workflow.sleep(Duration.ofSeconds(1));
65+
return activities.execute("CompleteOnPause");
66+
}
67+
}
68+
69+
public static class HeartBeatingActivityImpl implements TestActivities.TestActivity1 {
70+
private final AtomicInteger resetCounter = new AtomicInteger(0);
71+
72+
@Override
73+
public String execute(String arg) {
74+
ActivityInfo info = Activity.getExecutionContext().getInfo();
75+
// Have the activity pause itself
76+
Activity.getExecutionContext()
77+
.getWorkflowClient()
78+
.getWorkflowServiceStubs()
79+
.blockingStub()
80+
.resetActivity(
81+
ResetActivityRequest.newBuilder()
82+
.setNamespace(info.getNamespace())
83+
.setExecution(
84+
WorkflowExecution.newBuilder()
85+
.setWorkflowId(info.getWorkflowId())
86+
.setRunId(info.getRunId())
87+
.build())
88+
.setId(info.getActivityId())
89+
.build());
90+
while (true) {
91+
try {
92+
Thread.sleep(1000);
93+
// Check if the activity has been reset, and the activity info shows we are on the 1st
94+
// attempt.
95+
if (resetCounter.get() >= 1
96+
&& Activity.getExecutionContext().getInfo().getAttempt() == 1) {
97+
return "I am stopped after reset";
98+
}
99+
// Heartbeat and verify that the correct exception is thrown
100+
Activity.getExecutionContext().heartbeat("1");
101+
} catch (ActivityResetException pe) {
102+
// Counter is incremented to track the number of resets
103+
resetCounter.addAndGet(1);
104+
// This will fail the attempt, and the activity will be retried.
105+
throw pe;
106+
} catch (InterruptedException e) {
107+
throw new RuntimeException(e);
108+
}
109+
}
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)