Skip to content

Commit dbd648b

Browse files
Update Nexus failure conversion (#2773)
1 parent 20fb852 commit dbd648b

File tree

23 files changed

+870
-152
lines changed

23 files changed

+870
-152
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ ext {
3030
// Platforms
3131
grpcVersion = '1.75.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
3232
jacksonVersion = '2.15.4' // [2.9.0,)
33-
nexusVersion = '0.4.0-alpha'
33+
nexusVersion = '0.5.0-alpha'
3434
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
3535
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)
3636

temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.temporal.common.converter.FailureConverter;
1616
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
1717
import io.temporal.internal.common.FailureUtils;
18+
import io.temporal.internal.common.NexusUtil;
1819
import io.temporal.internal.common.ProtobufTimeUtils;
1920
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
2021
import io.temporal.serviceclient.CheckedExceptionWrapper;
@@ -192,7 +193,19 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d
192193
retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE;
193194
break;
194195
}
195-
return new HandlerException(info.getType(), cause, retryBehavior);
196+
if (failure.getMessage().isEmpty()
197+
|| failure
198+
.getMessage()
199+
.startsWith(String.format("handler error (%s)", info.getType()))) {
200+
return new HandlerException(info.getType(), cause, retryBehavior);
201+
} else {
202+
return new HandlerException(
203+
info.getType(),
204+
failure.getMessage(),
205+
cause,
206+
retryBehavior,
207+
NexusUtil.temporalFailureToNexusFailureInfo(failure));
208+
}
196209
}
197210
case FAILUREINFO_NOT_SET:
198211
default:
@@ -324,6 +337,9 @@ private Failure exceptionToFailure(Throwable throwable) {
324337
failure.setNexusOperationExecutionFailureInfo(op);
325338
} else if (throwable instanceof HandlerException) {
326339
HandlerException he = (HandlerException) throwable;
340+
if (he.getOriginalFailure() != null) {
341+
return NexusUtil.nexusFailureToTemporalFailure(he.getOriginalFailure(), true);
342+
}
327343
NexusHandlerErrorRetryBehavior retryBehavior =
328344
NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
329345
switch (he.getRetryBehavior()) {

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ public static NexusWorkflowStarter createNexusBoundStub(
8686
} catch (JsonProcessingException e) {
8787
// Not expected as the link is constructed by the SDK.
8888
throw new HandlerException(
89-
HandlerException.ErrorType.BAD_REQUEST,
90-
new IllegalArgumentException("failed to generate workflow operation token", e));
89+
HandlerException.ErrorType.BAD_REQUEST, "failed to generate workflow operation token", e);
9190
}
9291
List<Link> links =
9392
request.getLinks() == null

temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java

Lines changed: 140 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package io.temporal.internal.common;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.ObjectWriter;
36
import com.google.protobuf.ByteString;
47
import com.google.protobuf.InvalidProtocolBufferException;
58
import com.google.protobuf.util.JsonFormat;
9+
import io.nexusrpc.FailureInfo;
610
import io.nexusrpc.Link;
11+
import io.nexusrpc.handler.HandlerException;
12+
import io.temporal.api.common.v1.Payload;
13+
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
714
import io.temporal.api.nexus.v1.Failure;
15+
import io.temporal.api.nexus.v1.HandlerError;
816
import io.temporal.common.converter.DataConverter;
917
import java.net.URI;
1018
import java.net.URISyntaxException;
@@ -13,7 +21,8 @@
1321
import java.util.Map;
1422

1523
public class NexusUtil {
16-
private static final JsonFormat.Printer JSON_PRINTER =
24+
private static final ObjectWriter JSON_OBJECT_WRITER = new ObjectMapper().writer();
25+
private static final JsonFormat.Printer PROTO_JSON_PRINTER =
1726
JsonFormat.printer().omittingInsignificantWhitespace();
1827
private static final String TEMPORAL_FAILURE_TYPE_STRING =
1928
io.temporal.api.failure.v1.Failure.getDescriptor().getFullName();
@@ -47,23 +56,146 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink)
4756
.build();
4857
}
4958

50-
public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
51-
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
59+
public static Failure temporalFailureToNexusFailure(
60+
io.temporal.api.failure.v1.Failure temporalFailure) {
5261
String details;
5362
try {
54-
details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build());
63+
details =
64+
PROTO_JSON_PRINTER.print(
65+
temporalFailure.toBuilder().setMessage("").setStackTrace("").build());
5566
} catch (InvalidProtocolBufferException e) {
5667
return Failure.newBuilder()
5768
.setMessage("Failed to serialize failure details")
5869
.setDetails(ByteString.copyFromUtf8(e.getMessage()))
5970
.build();
6071
}
61-
return Failure.newBuilder()
62-
.setMessage(failure.getMessage())
63-
.setDetails(ByteString.copyFromUtf8(details))
64-
.putAllMetadata(NEXUS_FAILURE_METADATA)
72+
Failure.Builder failureBuilder =
73+
Failure.newBuilder()
74+
.setMessage(temporalFailure.getMessage())
75+
.setDetails(ByteString.copyFromUtf8(details))
76+
.putAllMetadata(NEXUS_FAILURE_METADATA);
77+
if (!temporalFailure.getStackTrace().isEmpty()) {
78+
failureBuilder.setStackTrace(temporalFailure.getStackTrace());
79+
}
80+
return failureBuilder.build();
81+
}
82+
83+
public static io.temporal.api.failure.v1.Failure nexusFailureToTemporalFailure(
84+
FailureInfo failureInfo, boolean retryable) {
85+
io.temporal.api.failure.v1.Failure.Builder apiFailure =
86+
io.temporal.api.failure.v1.Failure.newBuilder();
87+
88+
if (failureInfo.getMetadata().containsKey("type")
89+
&& failureInfo.getMetadata().get("type").equals(TEMPORAL_FAILURE_TYPE_STRING)) {
90+
// Details contains a JSON-serialized Temporal failure
91+
try {
92+
JsonFormat.parser().ignoringUnknownFields().merge(failureInfo.getDetailsJson(), apiFailure);
93+
} catch (InvalidProtocolBufferException e) {
94+
throw new RuntimeException(e);
95+
}
96+
} else {
97+
// Create an ApplicationFailure with the Nexus failure data
98+
io.temporal.api.common.v1.Payloads payloads = nexusFailureMetadataToPayloads(failureInfo);
99+
io.temporal.api.failure.v1.ApplicationFailureInfo.Builder appFailureInfo =
100+
io.temporal.api.failure.v1.ApplicationFailureInfo.newBuilder()
101+
.setType("NexusFailure")
102+
.setNonRetryable(!retryable);
103+
if (payloads != null) {
104+
appFailureInfo.setDetails(payloads);
105+
}
106+
apiFailure.setApplicationFailureInfo(appFailureInfo.build());
107+
}
108+
109+
// Ensure these always get written
110+
apiFailure.setMessage(failureInfo.getMessage());
111+
if (failureInfo.getStackTrace() != null && !failureInfo.getStackTrace().isEmpty()) {
112+
apiFailure.setStackTrace(failureInfo.getStackTrace());
113+
}
114+
115+
return apiFailure.build();
116+
}
117+
118+
private static io.temporal.api.common.v1.Payloads nexusFailureMetadataToPayloads(
119+
FailureInfo failureInfo) {
120+
if (failureInfo.getMetadata().isEmpty() && failureInfo.getDetailsJson().isEmpty()) {
121+
return null;
122+
}
123+
124+
// Create a copy without the message before serializing
125+
FailureInfo failureCopy = FailureInfo.newBuilder(failureInfo).setMessage("").build();
126+
String json = null;
127+
try {
128+
json = JSON_OBJECT_WRITER.writeValueAsString(failureCopy);
129+
} catch (JsonProcessingException e) {
130+
throw new RuntimeException(e);
131+
}
132+
133+
return io.temporal.api.common.v1.Payloads.newBuilder()
134+
.addPayloads(
135+
Payload.newBuilder()
136+
.putMetadata("encoding", ByteString.copyFromUtf8("json/plain"))
137+
.setData(ByteString.copyFromUtf8(json))
138+
.build())
139+
.build();
140+
}
141+
142+
public static FailureInfo temporalFailureToNexusFailureInfo(
143+
io.temporal.api.failure.v1.Failure temporalFailure) {
144+
String details;
145+
try {
146+
details =
147+
PROTO_JSON_PRINTER.print(
148+
temporalFailure.toBuilder().setMessage("").setStackTrace("").build());
149+
} catch (InvalidProtocolBufferException e) {
150+
return FailureInfo.newBuilder()
151+
.setMessage("Failed to serialize failure details")
152+
.setDetailsJson(e.getMessage())
153+
.build();
154+
}
155+
return FailureInfo.newBuilder()
156+
.setMessage(temporalFailure.getMessage())
157+
.setDetailsJson(details)
158+
.putMetadata("type", TEMPORAL_FAILURE_TYPE_STRING)
65159
.build();
66160
}
67161

162+
public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
163+
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
164+
return temporalFailureToNexusFailure(failure);
165+
}
166+
167+
/**
168+
* Convert a HandlerException to the legacy HandlerError format used by Nexus, including
169+
* converting the cause to a Failure.
170+
*/
171+
public static HandlerError handlerErrorToNexusError(
172+
HandlerException e, DataConverter dataConverter) {
173+
HandlerError.Builder handlerError =
174+
HandlerError.newBuilder()
175+
.setErrorType(e.getErrorType().toString())
176+
.setRetryBehavior(mapRetryBehavior(e.getRetryBehavior()));
177+
if (e.getCause() != null) {
178+
handlerError.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter));
179+
} else if (e.getMessage() != null && !e.getMessage().isEmpty()) {
180+
// Generate a failure from the message if no cause is provided, to ensure the error is not
181+
// empty
182+
handlerError.setFailure(
183+
exceptionToNexusFailure(new RuntimeException(e.getMessage()), dataConverter));
184+
}
185+
return handlerError.build();
186+
}
187+
188+
private static NexusHandlerErrorRetryBehavior mapRetryBehavior(
189+
HandlerException.RetryBehavior retryBehavior) {
190+
switch (retryBehavior) {
191+
case RETRYABLE:
192+
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
193+
case NON_RETRYABLE:
194+
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
195+
default:
196+
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
197+
}
198+
}
199+
68200
private NexusUtil() {}
69201
}

temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
package io.temporal.internal.nexus;
22

3-
import static io.temporal.internal.common.NexusUtil.exceptionToNexusFailure;
43
import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink;
54

65
import com.uber.m3.tally.Scope;
76
import io.grpc.StatusRuntimeException;
87
import io.nexusrpc.Header;
98
import io.nexusrpc.OperationException;
9+
import io.nexusrpc.OperationState;
1010
import io.nexusrpc.handler.*;
1111
import io.temporal.api.common.v1.Payload;
12-
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
1312
import io.temporal.api.nexus.v1.*;
1413
import io.temporal.client.WorkflowClient;
1514
import io.temporal.client.WorkflowException;
1615
import io.temporal.client.WorkflowNotFoundException;
1716
import io.temporal.common.converter.DataConverter;
17+
import io.temporal.common.converter.EncodedValues;
1818
import io.temporal.common.interceptors.WorkerInterceptor;
1919
import io.temporal.failure.ApplicationFailure;
20+
import io.temporal.failure.CanceledFailure;
21+
import io.temporal.failure.TemporalFailure;
2022
import io.temporal.internal.common.InternalUtils;
2123
import io.temporal.internal.common.NexusUtil;
2224
import io.temporal.internal.worker.NexusTask;
@@ -26,6 +28,7 @@
2628
import io.temporal.worker.TypeAlreadyRegisteredException;
2729
import java.net.URISyntaxException;
2830
import java.time.Duration;
31+
import java.time.Instant;
2932
import java.util.*;
3033
import java.util.concurrent.*;
3134
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,9 +81,6 @@ public boolean start() {
7881
public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException {
7982
Request request = task.getResponse().getRequest();
8083
Map<String, String> headers = request.getHeaderMap();
81-
if (headers == null) {
82-
headers = Collections.emptyMap();
83-
}
8484

8585
OperationContext.Builder ctx = OperationContext.newBuilder();
8686
headers.forEach(ctx::putHeader);
@@ -104,10 +104,10 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
104104
},
105105
timeout.toMillis(),
106106
java.util.concurrent.TimeUnit.MILLISECONDS);
107+
ctx.setDeadline(Instant.now().plus(timeout));
107108
} catch (IllegalArgumentException e) {
108109
throw new HandlerException(
109-
HandlerException.ErrorType.BAD_REQUEST,
110-
new RuntimeException("Invalid request timeout header", e));
110+
HandlerException.ErrorType.BAD_REQUEST, "Invalid request timeout header", e);
111111
}
112112
}
113113

@@ -126,21 +126,14 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
126126
default:
127127
throw new HandlerException(
128128
HandlerException.ErrorType.NOT_IMPLEMENTED,
129-
new RuntimeException("Unknown request type: " + request.getVariantCase()));
129+
"Unknown request type: " + request.getVariantCase(),
130+
(Throwable) null);
130131
}
131132
} catch (HandlerException e) {
132-
return new Result(
133-
HandlerError.newBuilder()
134-
.setErrorType(e.getErrorType().toString())
135-
.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter))
136-
.setRetryBehavior(mapRetryBehavior(e.getRetryBehavior()))
137-
.build());
133+
return new Result(e);
138134
} catch (Throwable e) {
139135
return new Result(
140-
HandlerError.newBuilder()
141-
.setErrorType(HandlerException.ErrorType.INTERNAL.toString())
142-
.setFailure(exceptionToNexusFailure(e, dataConverter))
143-
.build());
136+
new HandlerException(HandlerException.ErrorType.INTERNAL, "internal handler error", e));
144137
} finally {
145138
// If the task timed out, we should not send a response back to the server
146139
if (timedOut.get()) {
@@ -154,18 +147,6 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException
154147
}
155148
}
156149

157-
private NexusHandlerErrorRetryBehavior mapRetryBehavior(
158-
HandlerException.RetryBehavior retryBehavior) {
159-
switch (retryBehavior) {
160-
case RETRYABLE:
161-
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
162-
case NON_RETRYABLE:
163-
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
164-
default:
165-
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
166-
}
167-
}
168-
169150
private void cancelOperation(OperationContext context, OperationCancelDetails details) {
170151
try {
171152
serviceHandler.cancelOperation(context, details);
@@ -215,6 +196,7 @@ private void convertKnownFailures(Throwable e) {
215196
if (((ApplicationFailure) failure).isNonRetryable()) {
216197
throw new HandlerException(
217198
HandlerException.ErrorType.INTERNAL,
199+
"Handler failed with non-retryable application error",
218200
failure,
219201
HandlerException.RetryBehavior.NON_RETRYABLE);
220202
}
@@ -306,7 +288,8 @@ private StartOperationResponse handleStartOperation(
306288
log.error("failed to parse link url: " + link.getUrl(), e);
307289
throw new HandlerException(
308290
HandlerException.ErrorType.BAD_REQUEST,
309-
new RuntimeException("Invalid link URL: " + link.getUrl(), e));
291+
"Invalid link URL: " + link.getUrl(),
292+
e);
310293
}
311294
});
312295

@@ -346,11 +329,21 @@ private StartOperationResponse handleStartOperation(
346329
convertKnownFailures(failure);
347330
}
348331
} catch (OperationException e) {
349-
startResponseBuilder.setOperationError(
350-
UnsuccessfulOperationError.newBuilder()
351-
.setOperationState(e.getState().toString().toLowerCase())
352-
.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter))
353-
.build());
332+
TemporalFailure temporalFailure;
333+
if (e.getState() == OperationState.FAILED) {
334+
temporalFailure =
335+
ApplicationFailure.newFailureWithCause(e.getMessage(), "OperationError", e.getCause());
336+
temporalFailure.setStackTrace(e.getStackTrace());
337+
} else if (e.getState() == OperationState.CANCELED) {
338+
temporalFailure =
339+
new CanceledFailure(e.getMessage(), new EncodedValues(null), e.getCause());
340+
temporalFailure.setStackTrace(e.getStackTrace());
341+
} else {
342+
throw new HandlerException(
343+
HandlerException.ErrorType.INTERNAL,
344+
new RuntimeException("Unknown operation state: " + e.getState()));
345+
}
346+
startResponseBuilder.setFailure(dataConverter.exceptionToFailure(temporalFailure));
354347
}
355348
return startResponseBuilder.build();
356349
}

0 commit comments

Comments
 (0)