Skip to content

Commit 3c2d938

Browse files
Standalone Activities start delay (temporalio#2906)
* wip * fqn not needed * test timing fixes/tweaks * consistent javadoc * 1 hour start delay for terminate/cancel; disable time-sensitive tests. * retrigger opengrep
1 parent f3edb10 commit 3c2d938

5 files changed

Lines changed: 232 additions & 3 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ jobs:
114114
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' \
115115
--dynamic-config-value frontend.activityAPIsEnabled=true \
116116
--dynamic-config-value activity.enableStandalone=true \
117+
--dynamic-config-value activity.startDelayEnabled=true \
117118
--dynamic-config-value nexusoperation.enableStandalone=true \
118119
--dynamic-config-value history.enableChasm=true \
119120
--dynamic-config-value history.enableTransitionHistory=true &

temporal-sdk/src/main/java/io/temporal/client/StartActivityOptions.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static final class Builder {
4545
private @Nullable String staticSummary;
4646
private @Nullable String staticDetails;
4747
private @Nullable Priority priority;
48+
private @Nullable Duration startDelay;
4849

4950
private Builder() {}
5051

@@ -65,6 +66,7 @@ private Builder(StartActivityOptions options) {
6566
this.staticSummary = options.staticSummary;
6667
this.staticDetails = options.staticDetails;
6768
this.priority = options.priority;
69+
this.startDelay = options.startDelay;
6870
}
6971

7072
/** Required. A unique identifier for this activity in the namespace. */
@@ -159,6 +161,20 @@ public Builder setPriority(Priority priority) {
159161
return this;
160162
}
161163

164+
/**
165+
* Time to wait before dispatching the first activity task. The delay is one-shot — retry
166+
* attempts do not re-apply it. {@code ScheduleToStart} and {@code ScheduleToClose} timeouts
167+
* begin counting only after the delay elapses. Must be non-negative; {@code null} or {@link
168+
* Duration#ZERO} mean no delay.
169+
*/
170+
public Builder setStartDelay(Duration startDelay) {
171+
if (startDelay != null && startDelay.isNegative()) {
172+
throw new IllegalArgumentException("startDelay must be non-negative, got " + startDelay);
173+
}
174+
this.startDelay = startDelay;
175+
return this;
176+
}
177+
162178
public StartActivityOptions build() {
163179
Preconditions.checkArgument(!Strings.isNullOrEmpty(id), "id must not be null or empty");
164180
Preconditions.checkArgument(
@@ -183,6 +199,7 @@ public StartActivityOptions build() {
183199
private final @Nullable String staticSummary;
184200
private final @Nullable String staticDetails;
185201
private final @Nullable Priority priority;
202+
private final @Nullable Duration startDelay;
186203

187204
private StartActivityOptions(Builder builder) {
188205
this.id = builder.id;
@@ -198,6 +215,7 @@ private StartActivityOptions(Builder builder) {
198215
this.staticSummary = builder.staticSummary;
199216
this.staticDetails = builder.staticDetails;
200217
this.priority = builder.priority;
218+
this.startDelay = builder.startDelay;
201219
}
202220

203221
public Builder toBuilder() {
@@ -265,6 +283,11 @@ public Priority getPriority() {
265283
return priority;
266284
}
267285

286+
@Nullable
287+
public Duration getStartDelay() {
288+
return startDelay;
289+
}
290+
268291
@Override
269292
public boolean equals(Object o) {
270293
if (this == o) return true;
@@ -282,7 +305,8 @@ public boolean equals(Object o) {
282305
&& Objects.equals(typedSearchAttributes, that.typedSearchAttributes)
283306
&& Objects.equals(staticSummary, that.staticSummary)
284307
&& Objects.equals(staticDetails, that.staticDetails)
285-
&& Objects.equals(priority, that.priority);
308+
&& Objects.equals(priority, that.priority)
309+
&& Objects.equals(startDelay, that.startDelay);
286310
}
287311

288312
@Override
@@ -300,7 +324,8 @@ public int hashCode() {
300324
typedSearchAttributes,
301325
staticSummary,
302326
staticDetails,
303-
priority);
327+
priority,
328+
startDelay);
304329
}
305330

306331
@Override
@@ -332,6 +357,8 @@ public String toString() {
332357
+ staticDetails
333358
+ "', priority="
334359
+ priority
360+
+ ", startDelay="
361+
+ startDelay
335362
+ '}';
336363
}
337364
}

temporal-sdk/src/main/java/io/temporal/internal/client/RootActivityClientInvoker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ public StartActivityOutput startActivity(StartActivityInput input) {
9797
if (options.getPriority() != null) {
9898
request.setPriority(ProtoConverters.toProto(options.getPriority()));
9999
}
100+
if (options.getStartDelay() != null) {
101+
request.setStartDelay(ProtobufTimeUtils.toProtoDuration(options.getStartDelay()));
102+
}
100103

101104
io.temporal.api.common.v1.Header grpcHeader = HeaderUtils.toHeaderGrpc(input.getHeader(), null);
102105
request.setHeader(grpcHeader);

temporal-sdk/src/test/java/io/temporal/client/StartActivityOptionsTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,23 @@ public void testMissingTimeoutFails() {
3232
StartActivityOptions.newBuilder().setId("id").setTaskQueue("q").build();
3333
}
3434

35+
@Test(expected = IllegalArgumentException.class)
36+
public void testNegativeStartDelayFails() {
37+
StartActivityOptions.newBuilder().setStartDelay(Duration.ofSeconds(-1));
38+
}
39+
40+
@Test
41+
public void testZeroStartDelayAccepted() {
42+
StartActivityOptions opts =
43+
StartActivityOptions.newBuilder()
44+
.setId("id")
45+
.setTaskQueue("q")
46+
.setStartToCloseTimeout(Duration.ofSeconds(5))
47+
.setStartDelay(Duration.ZERO)
48+
.build();
49+
assertEquals(Duration.ZERO, opts.getStartDelay());
50+
}
51+
3552
@Test
3653
public void testToBuilder() {
3754
StartActivityOptions original =
@@ -64,6 +81,7 @@ public void testToBuilderPreservesAllFields() {
6481
.setStaticSummary("summary")
6582
.setStaticDetails("details")
6683
.setPriority(priority)
84+
.setStartDelay(Duration.ofSeconds(7))
6785
.build();
6886

6987
StartActivityOptions copy = original.toBuilder().build();
@@ -80,5 +98,6 @@ public void testToBuilderPreservesAllFields() {
8098
assertEquals("summary", copy.getStaticSummary());
8199
assertEquals("details", copy.getStaticDetails());
82100
assertEquals(priority, copy.getPriority());
101+
assertEquals(Duration.ofSeconds(7), copy.getStartDelay());
83102
}
84103
}

temporal-sdk/src/test/java/io/temporal/client/functional/StandaloneActivityTest.java

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
* test server may not support the standalone activity APIs.
4141
*/
4242
public class StandaloneActivityTest {
43+
// TODO: enable tests disabled with ths when time-skipping is available
44+
private static boolean RUN_TIME_SENSITIVE_TESTS = false;
4345

4446
// ---------------------------------------------------------------------------
4547
// Activity interfaces and implementations
@@ -96,6 +98,12 @@ public interface AlwaysFailActivity {
9698
void alwaysFail();
9799
}
98100

101+
@ActivityInterface
102+
public interface RetryThenSucceedActivity {
103+
@ActivityMethod(name = "RetryThenSucceed")
104+
int run();
105+
}
106+
99107
/** Snapshot of {@link ActivityInfo} fields captured inside an activity body. */
100108
public static class ActivityInfoSnapshot {
101109
public String activityId;
@@ -200,6 +208,17 @@ public void alwaysFail() {
200208
}
201209
}
202210

211+
public static class RetryThenSucceedActivityImpl implements RetryThenSucceedActivity {
212+
@Override
213+
public int run() {
214+
int attempt = Activity.getExecutionContext().getInfo().getAttempt();
215+
if (attempt == 1) {
216+
throw ApplicationFailure.newFailure("fail on attempt 1", "test-type");
217+
}
218+
return attempt;
219+
}
220+
}
221+
203222
// ---------------------------------------------------------------------------
204223
// Test rule
205224
// ---------------------------------------------------------------------------
@@ -215,7 +234,8 @@ public void alwaysFail() {
215234
new InspectInfoActivityImpl(),
216235
new EchoVoidActivityImpl(),
217236
new ConcatActivityImpl(),
218-
new AlwaysFailActivityImpl())
237+
new AlwaysFailActivityImpl(),
238+
new RetryThenSucceedActivityImpl())
219239
.build();
220240

221241
// ---------------------------------------------------------------------------
@@ -986,6 +1006,165 @@ public void testOnlyStartToCloseTimeoutIsValid() {
9861006
newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x"));
9871007
}
9881008

1009+
// ---------------------------------------------------------------------------
1010+
// Start delay
1011+
// ---------------------------------------------------------------------------
1012+
1013+
@Test
1014+
public void testStartDelayDelaysFirstDispatch() {
1015+
assumeTrue(SDKTestWorkflowRule.useExternalService);
1016+
Duration delay = Duration.ofSeconds(2);
1017+
StartActivityOptions opts =
1018+
StartActivityOptions.newBuilder()
1019+
.setId(uniqueId())
1020+
.setTaskQueue(testWorkflowRule.getTaskQueue())
1021+
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
1022+
.setStartDelay(delay)
1023+
.build();
1024+
1025+
ActivityHandle<String> handle =
1026+
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "hello");
1027+
assertEquals("echo:hello", handle.getResult());
1028+
1029+
ActivityExecutionDescription desc = handle.describe();
1030+
Duration between = Duration.between(desc.getScheduledTime(), desc.getLastStartedTime());
1031+
assertTrue(
1032+
"lastStartedTime - scheduledTime should be >= startDelay - 500ms, was " + between,
1033+
between.compareTo(delay.minusMillis(500)) >= 0);
1034+
}
1035+
1036+
@Test
1037+
public void testStartDelayPreservesScheduleToStartTimeout() {
1038+
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
1039+
assumeTrue(SDKTestWorkflowRule.useExternalService);
1040+
StartActivityOptions opts =
1041+
StartActivityOptions.newBuilder()
1042+
.setId(uniqueId())
1043+
.setTaskQueue(testWorkflowRule.getTaskQueue())
1044+
.setStartToCloseTimeout(Duration.ofMinutes(5))
1045+
.setScheduleToStartTimeout(Duration.ofSeconds(1))
1046+
.setStartDelay(Duration.ofSeconds(2))
1047+
.build();
1048+
String result =
1049+
newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x");
1050+
assertEquals("echo:x", result);
1051+
}
1052+
1053+
@Test
1054+
public void testStartDelayPreservesScheduleToCloseTimeout() {
1055+
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
1056+
assumeTrue(SDKTestWorkflowRule.useExternalService);
1057+
StartActivityOptions opts =
1058+
StartActivityOptions.newBuilder()
1059+
.setId(uniqueId())
1060+
.setTaskQueue(testWorkflowRule.getTaskQueue())
1061+
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
1062+
.setStartDelay(Duration.ofSeconds(2))
1063+
.build();
1064+
String result =
1065+
newActivityClient().execute(SimpleActivity.class, SimpleActivity::execute, opts, "x");
1066+
assertEquals("echo:x", result);
1067+
}
1068+
1069+
@Test
1070+
public void testStartDelayNotReappliedOnRetry() {
1071+
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
1072+
assumeTrue(SDKTestWorkflowRule.useExternalService);
1073+
StartActivityOptions opts =
1074+
StartActivityOptions.newBuilder()
1075+
.setId(uniqueId())
1076+
.setTaskQueue(testWorkflowRule.getTaskQueue())
1077+
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
1078+
.setStartDelay(Duration.ofSeconds(2))
1079+
.setRetryOptions(
1080+
RetryOptions.newBuilder()
1081+
.setInitialInterval(Duration.ofMillis(100))
1082+
.setMaximumAttempts(5)
1083+
.build())
1084+
.build();
1085+
long startMs = System.currentTimeMillis();
1086+
int finalAttempt =
1087+
newActivityClient()
1088+
.execute(RetryThenSucceedActivity.class, RetryThenSucceedActivity::run, opts);
1089+
long elapsedMs = System.currentTimeMillis() - startMs;
1090+
1091+
// Bug-trap: confirm the activity actually retried rather than succeeding silently on attempt 1.
1092+
assertTrue(
1093+
"activity should have retried at least once (final attempt was " + finalAttempt + ")",
1094+
finalAttempt >= 2);
1095+
1096+
// If start delay were re-applied to retries, elapsed would be ~2 * startDelay (~4000ms).
1097+
// Without re-application: ~2000ms delay + ~100ms retry interval + worker overhead.
1098+
assertTrue(
1099+
"retry should not re-apply startDelay; elapsed was " + elapsedMs + "ms", elapsedMs < 3500);
1100+
}
1101+
1102+
@Test
1103+
public void testCancelDuringStartDelay() {
1104+
assumeTrue(SDKTestWorkflowRule.useExternalService);
1105+
StartActivityOptions opts =
1106+
StartActivityOptions.newBuilder()
1107+
.setId(uniqueId())
1108+
.setTaskQueue(testWorkflowRule.getTaskQueue())
1109+
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
1110+
.setStartDelay(Duration.ofHours(1))
1111+
.build();
1112+
ActivityHandle<String> handle =
1113+
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x");
1114+
handle.cancel("test cancel during start delay");
1115+
1116+
assertEventually(
1117+
Duration.ofSeconds(10),
1118+
() ->
1119+
assertEquals(
1120+
ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_CANCELED,
1121+
handle.describe().getStatus()));
1122+
}
1123+
1124+
@Test
1125+
public void testTerminateDuringStartDelay() {
1126+
assumeTrue(SDKTestWorkflowRule.useExternalService);
1127+
StartActivityOptions opts =
1128+
StartActivityOptions.newBuilder()
1129+
.setId(uniqueId())
1130+
.setTaskQueue(testWorkflowRule.getTaskQueue())
1131+
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
1132+
.setStartDelay(Duration.ofHours(1))
1133+
.build();
1134+
ActivityHandle<String> handle =
1135+
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x");
1136+
handle.terminate("test terminate during start delay");
1137+
1138+
assertEventually(
1139+
Duration.ofSeconds(10),
1140+
() ->
1141+
assertEquals(
1142+
ActivityExecutionStatus.ACTIVITY_EXECUTION_STATUS_TERMINATED,
1143+
handle.describe().getStatus()));
1144+
}
1145+
1146+
@Test
1147+
public void testZeroStartDelayBehavesAsUnset() {
1148+
assumeTrue(RUN_TIME_SENSITIVE_TESTS);
1149+
assumeTrue(SDKTestWorkflowRule.useExternalService);
1150+
StartActivityOptions opts =
1151+
StartActivityOptions.newBuilder()
1152+
.setId(uniqueId())
1153+
.setTaskQueue(testWorkflowRule.getTaskQueue())
1154+
.setScheduleToCloseTimeout(Duration.ofMinutes(5))
1155+
.setStartDelay(Duration.ZERO)
1156+
.build();
1157+
ActivityHandle<String> handle =
1158+
newActivityClient().start(SimpleActivity.class, SimpleActivity::execute, opts, "x");
1159+
assertEquals("echo:x", handle.getResult());
1160+
1161+
ActivityExecutionDescription desc = handle.describe();
1162+
Duration between = Duration.between(desc.getScheduledTime(), desc.getLastStartedTime());
1163+
assertTrue(
1164+
"Duration.ZERO should not introduce dispatch latency, was " + between,
1165+
between.compareTo(Duration.ofSeconds(1)) < 0);
1166+
}
1167+
9891168
// ---------------------------------------------------------------------------
9901169
// Interceptor helpers
9911170
// ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)