Skip to content

Commit 9545fd2

Browse files
Respond to PR comments
1 parent 3d44cf5 commit 9545fd2

3 files changed

Lines changed: 29 additions & 41 deletions

File tree

temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ final class NexusWorker implements SuspendableWorker {
5252
private final GrpcRetryer grpcRetryer;
5353
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
5454
private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
55+
private final boolean forceOldFailureFormat;
5556

5657
public NexusWorker(
5758
@Nonnull WorkflowServiceStubs service,
@@ -76,6 +77,9 @@ public NexusWorker(
7677
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
7778

7879
this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
80+
// Allow tests to force old format for backward compatibility testing
81+
String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat");
82+
this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat);
7983
}
8084

8185
@Override
@@ -341,10 +345,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) {
341345
// Check if the server supports using the Failure directly in responses
342346
boolean supportTemporalFailure =
343347
task.getResponse().getRequest().getCapabilities().getTemporalFailureResponses();
344-
345-
// Allow tests to force old format for backward compatibility testing
346-
String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat");
347-
if ("true".equalsIgnoreCase(forceOldFormat)) {
348+
if (forceOldFailureFormat) {
348349
supportTemporalFailure = false;
349350
}
350351

@@ -373,6 +374,27 @@ private void logExceptionDuringResultReporting(
373374
}
374375
}
375376

377+
private Response getResponseForOldServer(Response response) {
378+
Response.Builder b = response.toBuilder();
379+
Failure failure = response.getStartOperation().getFailure();
380+
String operationState;
381+
if (failure.hasApplicationFailureInfo()) {
382+
operationState = OperationState.FAILED.toString().toLowerCase();
383+
} else if (failure.hasCanceledFailureInfo()) {
384+
operationState = OperationState.CANCELED.toString().toLowerCase();
385+
} else {
386+
throw new IllegalArgumentException(
387+
"[BUG] Failure must have either ApplicationFailureInfo or CanceledFailureInfo");
388+
}
389+
return b.setStartOperation(
390+
response.getStartOperation().toBuilder()
391+
.setOperationError(
392+
UnsuccessfulOperationError.newBuilder()
393+
.setOperationState(operationState)
394+
.setFailure(NexusUtil.temporalFailureToNexusFailure(failure.getCause()))))
395+
.build();
396+
}
397+
376398
@SuppressWarnings("deprecation")
377399
private void sendReply(
378400
ByteString taskToken,
@@ -384,43 +406,7 @@ private void sendReply(
384406
// For old servers that do not support TemporalFailure in Failure proto,
385407
// we need to convert the Failure to a UnsuccessfulOperationError
386408
if (!supportTemporalFailure && taskResponse.getStartOperation().hasFailure()) {
387-
Response.Builder b = taskResponse.toBuilder();
388-
Failure failure = taskResponse.getStartOperation().getFailure();
389-
String operationState;
390-
if (failure.hasApplicationFailureInfo()) {
391-
operationState = OperationState.FAILED.toString().toLowerCase();
392-
} else if (failure.hasCanceledFailureInfo()) {
393-
operationState = OperationState.CANCELED.toString().toLowerCase();
394-
} else {
395-
RespondNexusTaskFailedRequest.Builder request =
396-
RespondNexusTaskFailedRequest.newBuilder()
397-
.setTaskToken(taskToken)
398-
.setIdentity(options.getIdentity())
399-
.setNamespace(namespace)
400-
.setError(
401-
NexusUtil.handlerErrorToNexusError(
402-
new HandlerException(
403-
HandlerException.ErrorType.INTERNAL,
404-
"Failure does not have applicationFailureInfo or canceledFailureInfo"),
405-
dataConverter));
406-
grpcRetryer.retry(
407-
() ->
408-
service
409-
.blockingStub()
410-
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
411-
.respondNexusTaskFailed(request.build()),
412-
replyGrpcRetryerOptions);
413-
return;
414-
}
415-
taskResponse =
416-
b.setStartOperation(
417-
taskResponse.getStartOperation().toBuilder()
418-
.setOperationError(
419-
UnsuccessfulOperationError.newBuilder()
420-
.setOperationState(operationState)
421-
.setFailure(
422-
NexusUtil.temporalFailureToNexusFailure(failure.getCause()))))
423-
.build();
409+
taskResponse = getResponseForOldServer(taskResponse);
424410
}
425411
RespondNexusTaskCompletedRequest request =
426412
RespondNexusTaskCompletedRequest.newBuilder()

temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,7 @@ private CompletableFuture<Request> completeNexusTask(
11021102
});
11031103
}
11041104

1105+
@SuppressWarnings("deprecation")
11051106
private CompletableFuture<RespondNexusTaskFailedResponse> failNexusTask(
11061107
ByteString taskToken, HandlerError err) {
11071108
return CompletableFuture.supplyAsync(

temporal-test-server/src/test/java/io/temporal/testserver/functional/WorkflowIdReusePolicyTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public void alreadyRunningWorkflowBlocksSecondEvenWithAllowDuplicate() {
8282
@Test
8383
public void secondWorkflowTerminatesFirst() {
8484
String workflowId = "terminate-if-running-1";
85+
@SuppressWarnings("deprecation")
8586
WorkflowOptions options =
8687
WorkflowOptions.newBuilder()
8788
.setWorkflowId(workflowId)

0 commit comments

Comments
 (0)