Skip to content

Commit 1527de4

Browse files
committed
Nexus caller timeouts
1 parent 2322bd0 commit 1527de4

8 files changed

Lines changed: 257 additions & 10 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,10 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
799799
input.getHeaders().forEach((k, v) -> attributes.putNexusHeader(k.toLowerCase(), v));
800800
attributes.setScheduleToCloseTimeout(
801801
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));
802+
attributes.setScheduleToStartTimeout(
803+
ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToStartTimeout()));
804+
attributes.setStartToCloseTimeout(
805+
ProtobufTimeUtils.toProtoDuration(input.getOptions().getStartToCloseTimeout()));
802806

803807
@Nullable
804808
UserMetadata userMetadata =

temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationOptions.java

Lines changed: 85 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ public static NexusOperationOptions getDefaultInstance() {
3131

3232
public static final class Builder {
3333
private Duration scheduleToCloseTimeout;
34+
private Duration scheduleToStartTimeout;
35+
private Duration startToCloseTimeout;
3436
private NexusOperationCancellationType cancellationType;
3537
private String summary;
3638

@@ -46,6 +48,45 @@ public NexusOperationOptions.Builder setScheduleToCloseTimeout(
4648
return this;
4749
}
4850

51+
/**
52+
* Sets the schedule to start timeout for the Nexus operation.
53+
*
54+
* <p>Maximum time to wait for the operation to be started (or completed if synchronous) by the
55+
* handler. If the operation is not started within this timeout, it will fail with
56+
* TIMEOUT_TYPE_SCHEDULE_TO_START.
57+
*
58+
* <p>Requires Temporal Server 1.31.0 or later.
59+
*
60+
* @param scheduleToStartTimeout the schedule to start timeout for the Nexus operation
61+
* @return this
62+
*/
63+
@Experimental
64+
public NexusOperationOptions.Builder setScheduleToStartTimeout(
65+
Duration scheduleToStartTimeout) {
66+
this.scheduleToStartTimeout = scheduleToStartTimeout;
67+
return this;
68+
}
69+
70+
/**
71+
* Sets the start to close timeout for the Nexus operation.
72+
*
73+
* <p>Maximum time to wait for an asynchronous operation to complete after it has been started.
74+
* If the operation does not complete within this timeout after starting, it will fail with
75+
* TIMEOUT_TYPE_START_TO_CLOSE.
76+
*
77+
* <p>Only applies to asynchronous operations. Synchronous operations ignore this timeout.
78+
*
79+
* <p>Requires Temporal Server 1.31.0 or later.
80+
*
81+
* @param startToCloseTimeout the start to close timeout for the Nexus operation
82+
* @return this
83+
*/
84+
@Experimental
85+
public NexusOperationOptions.Builder setStartToCloseTimeout(Duration startToCloseTimeout) {
86+
this.startToCloseTimeout = startToCloseTimeout;
87+
return this;
88+
}
89+
4990
/**
5091
* Sets the cancellation type for the Nexus operation. Defaults to WAIT_COMPLETED.
5192
*
@@ -78,12 +119,19 @@ private Builder(NexusOperationOptions options) {
78119
return;
79120
}
80121
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
122+
this.scheduleToStartTimeout = options.getScheduleToStartTimeout();
123+
this.startToCloseTimeout = options.getStartToCloseTimeout();
81124
this.cancellationType = options.getCancellationType();
82125
this.summary = options.getSummary();
83126
}
84127

85128
public NexusOperationOptions build() {
86-
return new NexusOperationOptions(scheduleToCloseTimeout, cancellationType, summary);
129+
return new NexusOperationOptions(
130+
scheduleToCloseTimeout,
131+
scheduleToStartTimeout,
132+
startToCloseTimeout,
133+
cancellationType,
134+
summary);
87135
}
88136

89137
public NexusOperationOptions.Builder mergeNexusOperationOptions(
@@ -95,6 +143,14 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(
95143
(override.scheduleToCloseTimeout == null)
96144
? this.scheduleToCloseTimeout
97145
: override.scheduleToCloseTimeout;
146+
this.scheduleToStartTimeout =
147+
(override.scheduleToStartTimeout == null)
148+
? this.scheduleToStartTimeout
149+
: override.scheduleToStartTimeout;
150+
this.startToCloseTimeout =
151+
(override.startToCloseTimeout == null)
152+
? this.startToCloseTimeout
153+
: override.startToCloseTimeout;
98154
this.cancellationType =
99155
(override.cancellationType == null) ? this.cancellationType : override.cancellationType;
100156
this.summary = (override.summary == null) ? this.summary : override.summary;
@@ -104,9 +160,13 @@ public NexusOperationOptions.Builder mergeNexusOperationOptions(
104160

105161
private NexusOperationOptions(
106162
Duration scheduleToCloseTimeout,
163+
Duration scheduleToStartTimeout,
164+
Duration startToCloseTimeout,
107165
NexusOperationCancellationType cancellationType,
108166
String summary) {
109167
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
168+
this.scheduleToStartTimeout = scheduleToStartTimeout;
169+
this.startToCloseTimeout = startToCloseTimeout;
110170
this.cancellationType = cancellationType;
111171
this.summary = summary;
112172
}
@@ -116,13 +176,25 @@ public NexusOperationOptions.Builder toBuilder() {
116176
}
117177

118178
private final Duration scheduleToCloseTimeout;
179+
private final Duration scheduleToStartTimeout;
180+
private final Duration startToCloseTimeout;
119181
private final NexusOperationCancellationType cancellationType;
120182
private final String summary;
121183

122184
public Duration getScheduleToCloseTimeout() {
123185
return scheduleToCloseTimeout;
124186
}
125187

188+
@Experimental
189+
public Duration getScheduleToStartTimeout() {
190+
return scheduleToStartTimeout;
191+
}
192+
193+
@Experimental
194+
public Duration getStartToCloseTimeout() {
195+
return startToCloseTimeout;
196+
}
197+
126198
public NexusOperationCancellationType getCancellationType() {
127199
return cancellationType;
128200
}
@@ -138,20 +210,31 @@ public boolean equals(Object o) {
138210
if (o == null || getClass() != o.getClass()) return false;
139211
NexusOperationOptions that = (NexusOperationOptions) o;
140212
return Objects.equals(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
213+
&& Objects.equals(scheduleToStartTimeout, that.scheduleToStartTimeout)
214+
&& Objects.equals(startToCloseTimeout, that.startToCloseTimeout)
141215
&& Objects.equals(cancellationType, that.cancellationType)
142216
&& Objects.equals(summary, that.summary);
143217
}
144218

145219
@Override
146220
public int hashCode() {
147-
return Objects.hash(scheduleToCloseTimeout, cancellationType, summary);
221+
return Objects.hash(
222+
scheduleToCloseTimeout,
223+
scheduleToStartTimeout,
224+
startToCloseTimeout,
225+
cancellationType,
226+
summary);
148227
}
149228

150229
@Override
151230
public String toString() {
152231
return "NexusOperationOptions{"
153232
+ "scheduleToCloseTimeout="
154233
+ scheduleToCloseTimeout
234+
+ ", scheduleToStartTimeout="
235+
+ scheduleToStartTimeout
236+
+ ", startToCloseTimeout="
237+
+ startToCloseTimeout
155238
+ ", cancellationType="
156239
+ cancellationType
157240
+ ", summary='"

temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,8 @@ private static void scheduleNexusOperation(
668668
.setOperation(attr.getOperation())
669669
.setInput(attr.getInput())
670670
.setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout())
671+
.setScheduleToStartTimeout(attr.getScheduleToStartTimeout())
672+
.setStartToCloseTimeout(attr.getStartToCloseTimeout())
671673
.putAllNexusHeader(attr.getNexusHeaderMap())
672674
.setRequestId(UUID.randomUUID().toString())
673675
.setWorkflowTaskCompletedEventId(workflowTaskCompletedId);
@@ -778,11 +780,27 @@ private static void completeNexusOperation(
778780

779781
private static void timeoutNexusOperation(
780782
RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) {
781-
if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) {
783+
if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
784+
&& timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START
785+
&& timeoutType != TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE) {
782786
throw new IllegalArgumentException(
783787
"Timeout type not supported for Nexus operations: " + timeoutType);
784788
}
785789

790+
String timeoutMessage;
791+
switch (timeoutType) {
792+
case TIMEOUT_TYPE_SCHEDULE_TO_START:
793+
timeoutMessage = "operation timed out before starting";
794+
break;
795+
case TIMEOUT_TYPE_START_TO_CLOSE:
796+
timeoutMessage = "operation timed out after starting";
797+
break;
798+
case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
799+
default:
800+
timeoutMessage = "operation timed out";
801+
break;
802+
}
803+
786804
Failure failure =
787805
Failure.newBuilder()
788806
.setMessage("nexus operation completed unsuccessfully")
@@ -795,7 +813,7 @@ private static void timeoutNexusOperation(
795813
.setScheduledEventId(data.scheduledEventId))
796814
.setCause(
797815
Failure.newBuilder()
798-
.setMessage("operation timed out")
816+
.setMessage(timeoutMessage)
799817
.setTimeoutFailureInfo(
800818
TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType)))
801819
.build();

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,18 @@ private void processScheduleNexusOperation(
852852
operation.getData().getAttempt()),
853853
"NexusOperation ScheduleToCloseTimeout");
854854
}
855+
if (attr.hasScheduleToStartTimeout()
856+
&& Durations.toMillis(attr.getScheduleToStartTimeout()) > 0) {
857+
// ScheduleToStartTimeout is the time from schedule to start (or completion if synchronous)
858+
ctx.addTimer(
859+
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToStartTimeout()),
860+
() ->
861+
timeoutNexusOperation(
862+
scheduleEventId,
863+
TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START,
864+
operation.getData().getAttempt()),
865+
"NexusOperation ScheduleToStartTimeout");
866+
}
855867
ctx.lockTimer("processScheduleNexusOperation");
856868
}
857869

@@ -2309,6 +2321,23 @@ public void startNexusOperation(
23092321
StateMachine<NexusOperationData> operation = getPendingNexusOperation(scheduledEventId);
23102322
operation.action(StateMachines.Action.START, ctx, resp, 0);
23112323
operation.getData().identity = clientIdentity;
2324+
2325+
// Add start-to-close timeout timer if configured
2326+
NexusOperationScheduledEventAttributes scheduledEvent =
2327+
operation.getData().scheduledEvent;
2328+
if (scheduledEvent.hasStartToCloseTimeout()
2329+
&& Durations.toMillis(scheduledEvent.getStartToCloseTimeout()) > 0) {
2330+
// StartToCloseTimeout measures from when the operation started to when it completes
2331+
ctx.addTimer(
2332+
ProtobufTimeUtils.toJavaDuration(scheduledEvent.getStartToCloseTimeout()),
2333+
() ->
2334+
timeoutNexusOperation(
2335+
scheduledEventId,
2336+
TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE,
2337+
operation.getData().getAttempt()),
2338+
"NexusOperation StartToCloseTimeout");
2339+
}
2340+
23122341
scheduleWorkflowTask(ctx);
23132342
});
23142343
}

temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import static io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED;
44
import static io.temporal.api.enums.v1.WorkflowExecutionStatus.*;
5-
import static io.temporal.api.enums.v1.WorkflowIdReusePolicy.*;
5+
import static io.temporal.api.enums.v1.WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE;
6+
import static io.temporal.api.enums.v1.WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY;
7+
import static io.temporal.api.enums.v1.WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE;
8+
import static io.temporal.api.enums.v1.WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_UNSPECIFIED;
69
import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.START_WORKFLOW;
710
import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.UPDATE_WORKFLOW;
811
import static io.temporal.internal.testservice.CronUtils.getBackoffInterval;
@@ -21,6 +24,7 @@
2124
import io.temporal.api.common.v1.*;
2225
import io.temporal.api.common.v1.Link;
2326
import io.temporal.api.enums.v1.*;
27+
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
2428
import io.temporal.api.errordetails.v1.MultiOperationExecutionFailure;
2529
import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure;
2630
import io.temporal.api.failure.v1.*;
@@ -285,6 +289,8 @@ private TestWorkflowMutableState getMutableState(
285289
return getMutableState(executionId, failNotExists);
286290
}
287291

292+
@SuppressWarnings("deprecation") // Backwards compatibility for
293+
// WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
288294
@Override
289295
public void startWorkflowExecution(
290296
StartWorkflowExecutionRequest request,
@@ -310,6 +316,8 @@ public void startWorkflowExecution(
310316
}
311317
}
312318

319+
@SuppressWarnings("deprecation") // Backwards compatibility for
320+
// WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
313321
StartWorkflowExecutionResponse startWorkflowExecutionImpl(
314322
StartWorkflowExecutionRequest startRequest,
315323
Duration backoffStartInterval,
@@ -325,8 +333,9 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
325333
validateWorkflowIdReusePolicy(reusePolicy, conflictPolicy);
326334
validateOnConflictOptions(startRequest);
327335

328-
// Backwards compatibility: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING is deprecated
329-
if (reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
336+
// Backwards compatibility: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
337+
// is deprecated
338+
if (reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
330339
conflictPolicy = WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING;
331340
reusePolicy = WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE;
332341
}
@@ -475,12 +484,14 @@ private StartWorkflowExecutionResponse throwDuplicatedWorkflow(
475484
WorkflowExecutionAlreadyStartedFailure.getDescriptor());
476485
}
477486

487+
@SuppressWarnings("deprecation") // Backwards compatibility for
488+
// WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING
478489
private void validateWorkflowIdReusePolicy(
479490
WorkflowIdReusePolicy reusePolicy, WorkflowIdConflictPolicy conflictPolicy) {
480491
if (conflictPolicy != WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
481-
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
492+
&& reusePolicy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
482493
throw createInvalidArgument(
483-
"Invalid WorkflowIDReusePolicy: WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
494+
"Invalid WorkflowIDReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING cannot be used together with a WorkflowIDConflictPolicy.");
484495
}
485496
if (conflictPolicy == WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
486497
&& reusePolicy == WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {

0 commit comments

Comments
 (0)