Skip to content

Commit 8106ebd

Browse files
authored
Merge branch 'master' into activity-heartbeat-timeout-local
2 parents fe0df3c + 6ba0947 commit 8106ebd

36 files changed

Lines changed: 1362 additions & 222 deletions

.github/workflows/ci.yml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ jobs:
7272

7373
- name: Start containerized server and dependencies
7474
env:
75-
TEMPORAL_CLI_VERSION: 1.4.1-cloud-v1-29-0-139-2.0
75+
TEMPORAL_CLI_VERSION: 1.6.1-server-1.31.0-151.0
7676
run: |
7777
wget -O temporal_cli.tar.gz https://github.com/temporalio/cli/releases/download/v${TEMPORAL_CLI_VERSION}/temporal_cli_${TEMPORAL_CLI_VERSION}_linux_amd64.tar.gz
7878
tar -xzf temporal_cli.tar.gz
@@ -95,9 +95,8 @@ jobs:
9595
--dynamic-config-value system.enableActivityEagerExecution=true \
9696
--dynamic-config-value history.MaxBufferedQueryCount=10000 \
9797
--dynamic-config-value frontend.workerVersioningDataAPIs=true \
98-
--dynamic-config-value component.nexusoperations.recordCancelRequestCompletionEvents=true \
99-
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
100-
--dynamic-config-value history.enableRequestIdRefLinks=true &
98+
--dynamic-config-value history.enableRequestIdRefLinks=true \
99+
--dynamic-config-value 'component.callbacks.allowedAddresses=[{"Pattern":"localhost:7243","AllowInsecure":true}]' &
101100
sleep 10s
102101
103102
- name: Run unit tests
@@ -187,4 +186,4 @@ jobs:
187186
name: Build native test server
188187
uses: ./.github/workflows/build-native-image.yml
189188
with:
190-
ref: ${{ github.event.pull_request.head.sha }}
189+
ref: ${{ github.event.pull_request.head.sha }}

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ ext {
3030
// Platforms
3131
grpcVersion = '1.75.0' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
3232
jacksonVersion = '2.15.4' // [2.9.0,)
33-
nexusVersion = '0.4.0-alpha'
33+
nexusVersion = '0.5.0-alpha'
3434
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
3535
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)
3636

temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import io.temporal.common.converter.FailureConverter;
1616
import io.temporal.internal.activity.ActivityTaskHandlerImpl;
1717
import io.temporal.internal.common.FailureUtils;
18+
import io.temporal.internal.common.NexusUtil;
1819
import io.temporal.internal.common.ProtobufTimeUtils;
1920
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
2021
import io.temporal.serviceclient.CheckedExceptionWrapper;
@@ -192,7 +193,19 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d
192193
retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE;
193194
break;
194195
}
195-
return new HandlerException(info.getType(), cause, retryBehavior);
196+
if (failure.getMessage().isEmpty()
197+
|| failure
198+
.getMessage()
199+
.startsWith(String.format("handler error (%s)", info.getType()))) {
200+
return new HandlerException(info.getType(), cause, retryBehavior);
201+
} else {
202+
return new HandlerException(
203+
info.getType(),
204+
failure.getMessage(),
205+
cause,
206+
retryBehavior,
207+
NexusUtil.temporalFailureToNexusFailureInfo(failure));
208+
}
196209
}
197210
case FAILUREINFO_NOT_SET:
198211
default:
@@ -324,6 +337,9 @@ private Failure exceptionToFailure(Throwable throwable) {
324337
failure.setNexusOperationExecutionFailureInfo(op);
325338
} else if (throwable instanceof HandlerException) {
326339
HandlerException he = (HandlerException) throwable;
340+
if (he.getOriginalFailure() != null) {
341+
return NexusUtil.nexusFailureToTemporalFailure(he.getOriginalFailure(), true);
342+
}
327343
NexusHandlerErrorRetryBehavior retryBehavior =
328344
NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
329345
switch (he.getRetryBehavior()) {

temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ public static NexusWorkflowStarter createNexusBoundStub(
8686
} catch (JsonProcessingException e) {
8787
// Not expected as the link is constructed by the SDK.
8888
throw new HandlerException(
89-
HandlerException.ErrorType.BAD_REQUEST,
90-
new IllegalArgumentException("failed to generate workflow operation token", e));
89+
HandlerException.ErrorType.BAD_REQUEST, "failed to generate workflow operation token", e);
9190
}
9291
List<Link> links =
9392
request.getLinks() == null

temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java

Lines changed: 140 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
package io.temporal.internal.common;
22

3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.fasterxml.jackson.databind.ObjectWriter;
36
import com.google.protobuf.ByteString;
47
import com.google.protobuf.InvalidProtocolBufferException;
58
import com.google.protobuf.util.JsonFormat;
9+
import io.nexusrpc.FailureInfo;
610
import io.nexusrpc.Link;
11+
import io.nexusrpc.handler.HandlerException;
12+
import io.temporal.api.common.v1.Payload;
13+
import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior;
714
import io.temporal.api.nexus.v1.Failure;
15+
import io.temporal.api.nexus.v1.HandlerError;
816
import io.temporal.common.converter.DataConverter;
917
import java.net.URI;
1018
import java.net.URISyntaxException;
@@ -13,7 +21,8 @@
1321
import java.util.Map;
1422

1523
public class NexusUtil {
16-
private static final JsonFormat.Printer JSON_PRINTER =
24+
private static final ObjectWriter JSON_OBJECT_WRITER = new ObjectMapper().writer();
25+
private static final JsonFormat.Printer PROTO_JSON_PRINTER =
1726
JsonFormat.printer().omittingInsignificantWhitespace();
1827
private static final String TEMPORAL_FAILURE_TYPE_STRING =
1928
io.temporal.api.failure.v1.Failure.getDescriptor().getFullName();
@@ -47,23 +56,146 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink)
4756
.build();
4857
}
4958

50-
public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
51-
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
59+
public static Failure temporalFailureToNexusFailure(
60+
io.temporal.api.failure.v1.Failure temporalFailure) {
5261
String details;
5362
try {
54-
details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build());
63+
details =
64+
PROTO_JSON_PRINTER.print(
65+
temporalFailure.toBuilder().setMessage("").setStackTrace("").build());
5566
} catch (InvalidProtocolBufferException e) {
5667
return Failure.newBuilder()
5768
.setMessage("Failed to serialize failure details")
5869
.setDetails(ByteString.copyFromUtf8(e.getMessage()))
5970
.build();
6071
}
61-
return Failure.newBuilder()
62-
.setMessage(failure.getMessage())
63-
.setDetails(ByteString.copyFromUtf8(details))
64-
.putAllMetadata(NEXUS_FAILURE_METADATA)
72+
Failure.Builder failureBuilder =
73+
Failure.newBuilder()
74+
.setMessage(temporalFailure.getMessage())
75+
.setDetails(ByteString.copyFromUtf8(details))
76+
.putAllMetadata(NEXUS_FAILURE_METADATA);
77+
if (!temporalFailure.getStackTrace().isEmpty()) {
78+
failureBuilder.setStackTrace(temporalFailure.getStackTrace());
79+
}
80+
return failureBuilder.build();
81+
}
82+
83+
public static io.temporal.api.failure.v1.Failure nexusFailureToTemporalFailure(
84+
FailureInfo failureInfo, boolean retryable) {
85+
io.temporal.api.failure.v1.Failure.Builder apiFailure =
86+
io.temporal.api.failure.v1.Failure.newBuilder();
87+
88+
if (failureInfo.getMetadata().containsKey("type")
89+
&& failureInfo.getMetadata().get("type").equals(TEMPORAL_FAILURE_TYPE_STRING)) {
90+
// Details contains a JSON-serialized Temporal failure
91+
try {
92+
JsonFormat.parser().ignoringUnknownFields().merge(failureInfo.getDetailsJson(), apiFailure);
93+
} catch (InvalidProtocolBufferException e) {
94+
throw new RuntimeException(e);
95+
}
96+
} else {
97+
// Create an ApplicationFailure with the Nexus failure data
98+
io.temporal.api.common.v1.Payloads payloads = nexusFailureMetadataToPayloads(failureInfo);
99+
io.temporal.api.failure.v1.ApplicationFailureInfo.Builder appFailureInfo =
100+
io.temporal.api.failure.v1.ApplicationFailureInfo.newBuilder()
101+
.setType("NexusFailure")
102+
.setNonRetryable(!retryable);
103+
if (payloads != null) {
104+
appFailureInfo.setDetails(payloads);
105+
}
106+
apiFailure.setApplicationFailureInfo(appFailureInfo.build());
107+
}
108+
109+
// Ensure these always get written
110+
apiFailure.setMessage(failureInfo.getMessage());
111+
if (failureInfo.getStackTrace() != null && !failureInfo.getStackTrace().isEmpty()) {
112+
apiFailure.setStackTrace(failureInfo.getStackTrace());
113+
}
114+
115+
return apiFailure.build();
116+
}
117+
118+
private static io.temporal.api.common.v1.Payloads nexusFailureMetadataToPayloads(
119+
FailureInfo failureInfo) {
120+
if (failureInfo.getMetadata().isEmpty() && failureInfo.getDetailsJson().isEmpty()) {
121+
return null;
122+
}
123+
124+
// Create a copy without the message before serializing
125+
FailureInfo failureCopy = FailureInfo.newBuilder(failureInfo).setMessage("").build();
126+
String json = null;
127+
try {
128+
json = JSON_OBJECT_WRITER.writeValueAsString(failureCopy);
129+
} catch (JsonProcessingException e) {
130+
throw new RuntimeException(e);
131+
}
132+
133+
return io.temporal.api.common.v1.Payloads.newBuilder()
134+
.addPayloads(
135+
Payload.newBuilder()
136+
.putMetadata("encoding", ByteString.copyFromUtf8("json/plain"))
137+
.setData(ByteString.copyFromUtf8(json))
138+
.build())
139+
.build();
140+
}
141+
142+
public static FailureInfo temporalFailureToNexusFailureInfo(
143+
io.temporal.api.failure.v1.Failure temporalFailure) {
144+
String details;
145+
try {
146+
details =
147+
PROTO_JSON_PRINTER.print(
148+
temporalFailure.toBuilder().setMessage("").setStackTrace("").build());
149+
} catch (InvalidProtocolBufferException e) {
150+
return FailureInfo.newBuilder()
151+
.setMessage("Failed to serialize failure details")
152+
.setDetailsJson(e.getMessage())
153+
.build();
154+
}
155+
return FailureInfo.newBuilder()
156+
.setMessage(temporalFailure.getMessage())
157+
.setDetailsJson(details)
158+
.putMetadata("type", TEMPORAL_FAILURE_TYPE_STRING)
65159
.build();
66160
}
67161

162+
public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) {
163+
io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception);
164+
return temporalFailureToNexusFailure(failure);
165+
}
166+
167+
/**
168+
* Convert a HandlerException to the legacy HandlerError format used by Nexus, including
169+
* converting the cause to a Failure.
170+
*/
171+
public static HandlerError handlerErrorToNexusError(
172+
HandlerException e, DataConverter dataConverter) {
173+
HandlerError.Builder handlerError =
174+
HandlerError.newBuilder()
175+
.setErrorType(e.getErrorType().toString())
176+
.setRetryBehavior(mapRetryBehavior(e.getRetryBehavior()));
177+
if (e.getCause() != null) {
178+
handlerError.setFailure(exceptionToNexusFailure(e.getCause(), dataConverter));
179+
} else if (e.getMessage() != null && !e.getMessage().isEmpty()) {
180+
// Generate a failure from the message if no cause is provided, to ensure the error is not
181+
// empty
182+
handlerError.setFailure(
183+
exceptionToNexusFailure(new RuntimeException(e.getMessage()), dataConverter));
184+
}
185+
return handlerError.build();
186+
}
187+
188+
private static NexusHandlerErrorRetryBehavior mapRetryBehavior(
189+
HandlerException.RetryBehavior retryBehavior) {
190+
switch (retryBehavior) {
191+
case RETRYABLE:
192+
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE;
193+
case NON_RETRYABLE:
194+
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE;
195+
default:
196+
return NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED;
197+
}
198+
}
199+
68200
private NexusUtil() {}
69201
}

0 commit comments

Comments
 (0)