Skip to content

Commit 29ff5dc

Browse files
committed
fixes for activity failures, added test for activity failures
1 parent 6875557 commit 29ff5dc

2 files changed

Lines changed: 330 additions & 5 deletions

File tree

temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ public static boolean isBenignApplicationFailure(@Nullable Throwable t) {
283283
if (t instanceof ApplicationFailure
284284
&& ((ApplicationFailure) t).getApplicationErrorCategory()
285285
== ApplicationErrorCategory.BENIGN) {
286-
return true;
287286
}
288287

289288
// Handle WorkflowExecutionException, which wraps a protobuf Failure
@@ -298,11 +297,20 @@ public static boolean isBenignApplicationFailure(@Nullable Throwable t) {
298297
}
299298
}
300299

300+
// Handle ActivityFailure, which wraps the actual ApplicationFailure
301+
if (t instanceof io.temporal.failure.ActivityFailure) {
302+
Throwable cause = t.getCause();
303+
boolean result = cause != null && isBenignApplicationFailure(cause);
304+
return result;
305+
}
306+
301307
// Check the immediate cause.
302308
Throwable cause = t.getCause();
303-
return cause != null
304-
&& cause instanceof ApplicationFailure
305-
&& ((ApplicationFailure) cause).getApplicationErrorCategory()
306-
== ApplicationErrorCategory.BENIGN;
309+
boolean result =
310+
cause != null
311+
&& cause instanceof ApplicationFailure
312+
&& ((ApplicationFailure) cause).getApplicationErrorCategory()
313+
== ApplicationErrorCategory.BENIGN;
314+
return result;
307315
}
308316
}
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.worker;
22+
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertFalse;
25+
import static org.junit.Assert.assertThrows;
26+
import static org.junit.Assert.assertTrue;
27+
28+
import ch.qos.logback.classic.Level;
29+
import ch.qos.logback.classic.LoggerContext;
30+
import ch.qos.logback.classic.spi.ILoggingEvent;
31+
import ch.qos.logback.core.read.ListAppender;
32+
import com.uber.m3.tally.RootScopeBuilder;
33+
import com.uber.m3.tally.Scope;
34+
import io.temporal.activity.ActivityInterface;
35+
import io.temporal.activity.ActivityMethod;
36+
import io.temporal.activity.ActivityOptions;
37+
import io.temporal.activity.LocalActivityOptions;
38+
import io.temporal.client.WorkflowClient;
39+
import io.temporal.client.WorkflowFailedException;
40+
import io.temporal.client.WorkflowOptions;
41+
import io.temporal.common.reporter.TestStatsReporter;
42+
import io.temporal.failure.ApplicationErrorCategory;
43+
import io.temporal.failure.ApplicationFailure;
44+
import io.temporal.failure.TemporalFailure;
45+
import io.temporal.testing.internal.SDKTestWorkflowRule;
46+
import io.temporal.worker.MetricsType;
47+
import io.temporal.workflow.Workflow;
48+
import io.temporal.workflow.WorkflowInterface;
49+
import io.temporal.workflow.WorkflowMethod;
50+
import java.time.Duration;
51+
import java.util.ArrayList;
52+
import java.util.HashMap;
53+
import java.util.List;
54+
import java.util.Map;
55+
import org.junit.Before;
56+
import org.junit.Rule;
57+
import org.junit.Test;
58+
import org.slf4j.LoggerFactory;
59+
60+
public class ActivityFailedMetricsTests {
61+
private final TestStatsReporter reporter = new TestStatsReporter();
62+
63+
private static final ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
64+
65+
static {
66+
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
67+
ch.qos.logback.classic.Logger logger = context.getLogger("io.temporal.internal.activity");
68+
listAppender.setContext(context);
69+
listAppender.start();
70+
logger.addAppender(listAppender);
71+
logger.setLevel(Level.DEBUG); // Ensure we capture both debug and warn levels
72+
}
73+
74+
Scope metricsScope =
75+
new RootScopeBuilder().reporter(reporter).reportEvery(com.uber.m3.util.Duration.ofMillis(1));
76+
77+
@Rule
78+
public SDKTestWorkflowRule testWorkflowRule =
79+
SDKTestWorkflowRule.newBuilder()
80+
.setMetricsScope(metricsScope)
81+
.setWorkflowTypes(ActivityWorkflowImpl.class, LocalActivityWorkflowImpl.class)
82+
.setActivityImplementations(new TestActivityImpl())
83+
.build();
84+
85+
@Before
86+
public void setup() {
87+
reporter.flush();
88+
listAppender.list.clear();
89+
}
90+
91+
@ActivityInterface
92+
public interface TestActivity {
93+
@ActivityMethod
94+
void execute(boolean isBenign);
95+
}
96+
97+
@WorkflowInterface
98+
public interface ActivityWorkflow {
99+
@WorkflowMethod
100+
void execute(boolean isBenign);
101+
}
102+
103+
@WorkflowInterface
104+
public interface LocalActivityWorkflow {
105+
@WorkflowMethod
106+
void execute(boolean isBenign);
107+
}
108+
109+
public static class TestActivityImpl implements TestActivity {
110+
@Override
111+
public void execute(boolean isBenign) {
112+
if (!isBenign) {
113+
throw ApplicationFailure.newFailure("Non-benign activity failure", "NonBenignType");
114+
} else {
115+
throw ApplicationFailure.newFailureWithCategory(
116+
"Benign activity failure", "BenignType", ApplicationErrorCategory.BENIGN, null);
117+
}
118+
}
119+
}
120+
121+
public static class ActivityWorkflowImpl implements ActivityWorkflow {
122+
@Override
123+
public void execute(boolean isBenign) {
124+
TestActivity activity =
125+
Workflow.newActivityStub(
126+
TestActivity.class,
127+
ActivityOptions.newBuilder()
128+
.setStartToCloseTimeout(Duration.ofSeconds(3))
129+
.setRetryOptions(
130+
io.temporal.common.RetryOptions.newBuilder().setMaximumAttempts(1).build())
131+
.build());
132+
activity.execute(isBenign);
133+
}
134+
}
135+
136+
public static class LocalActivityWorkflowImpl implements LocalActivityWorkflow {
137+
@Override
138+
public void execute(boolean isBenign) {
139+
TestActivity activity =
140+
Workflow.newLocalActivityStub(
141+
TestActivity.class,
142+
LocalActivityOptions.newBuilder()
143+
.setStartToCloseTimeout(Duration.ofSeconds(3))
144+
.setRetryOptions(
145+
io.temporal.common.RetryOptions.newBuilder().setMaximumAttempts(1).build())
146+
.build());
147+
activity.execute(isBenign);
148+
}
149+
}
150+
151+
private Map<String, String> getActivityTagsWithWorkerType(
152+
String workerType, String workflowType) {
153+
Map<String, String> tags = new HashMap<>();
154+
tags.put("task_queue", testWorkflowRule.getTaskQueue());
155+
tags.put("namespace", "UnitTest");
156+
tags.put("activity_type", "Execute");
157+
tags.put("exception", "ApplicationFailure");
158+
tags.put("worker_type", workerType);
159+
tags.put("workflow_type", workflowType);
160+
return tags;
161+
}
162+
163+
private int countLogMessages(String message, Level level) {
164+
int count = 0;
165+
List<ILoggingEvent> list = new ArrayList<>(listAppender.list);
166+
for (ILoggingEvent event : list) {
167+
if (event.getFormattedMessage().contains(message) && event.getLevel() == level) {
168+
count++;
169+
}
170+
}
171+
return count;
172+
}
173+
174+
@Test
175+
public void activityFailureMetricBenignApplicationError() {
176+
reporter.assertNoMetric(
177+
MetricsType.ACTIVITY_EXEC_FAILED_COUNTER,
178+
getActivityTagsWithWorkerType("ActivityWorker", "ActivityWorkflow"));
179+
180+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
181+
ActivityWorkflow nonBenignStub =
182+
client.newWorkflowStub(
183+
ActivityWorkflow.class,
184+
WorkflowOptions.newBuilder()
185+
.setTaskQueue(testWorkflowRule.getTaskQueue())
186+
.validateBuildWithDefaults());
187+
188+
WorkflowFailedException e1 =
189+
assertThrows(WorkflowFailedException.class, () -> nonBenignStub.execute(false));
190+
191+
assertTrue(
192+
"Cause should be ActivityFailure",
193+
e1.getCause() instanceof io.temporal.failure.ActivityFailure);
194+
assertTrue(
195+
"Inner cause should be ApplicationFailure",
196+
e1.getCause().getCause() instanceof ApplicationFailure);
197+
assertFalse(
198+
"Failure should not be benign",
199+
ApplicationFailure.isBenignApplicationFailure(e1.getCause().getCause()));
200+
assertEquals(
201+
"Non-benign activity failure",
202+
((TemporalFailure) e1.getCause().getCause()).getOriginalMessage());
203+
204+
reporter.assertCounter(
205+
MetricsType.ACTIVITY_EXEC_FAILED_COUNTER,
206+
getActivityTagsWithWorkerType("ActivityWorker", "ActivityWorkflow"),
207+
1);
208+
209+
// Execute workflow with benign activity failure
210+
WorkflowFailedException e2 =
211+
assertThrows(
212+
WorkflowFailedException.class,
213+
() ->
214+
client
215+
.newWorkflowStub(
216+
ActivityWorkflow.class,
217+
WorkflowOptions.newBuilder()
218+
.setTaskQueue(testWorkflowRule.getTaskQueue())
219+
.validateBuildWithDefaults())
220+
.execute(true));
221+
222+
assertTrue(
223+
"Cause should be ActivityFailure",
224+
e2.getCause() instanceof io.temporal.failure.ActivityFailure);
225+
assertTrue(
226+
"Inner cause should be ApplicationFailure",
227+
e2.getCause().getCause() instanceof ApplicationFailure);
228+
assertTrue(
229+
"Failure should be benign",
230+
ApplicationFailure.isBenignApplicationFailure(e2.getCause().getCause()));
231+
assertEquals(
232+
"Benign activity failure",
233+
((TemporalFailure) e2.getCause().getCause()).getOriginalMessage());
234+
235+
// Expect metrics to remain unchanged for benign failure
236+
reporter.assertCounter(
237+
MetricsType.ACTIVITY_EXEC_FAILED_COUNTER,
238+
getActivityTagsWithWorkerType("ActivityWorker", "ActivityWorkflow"),
239+
1);
240+
241+
// Verify log levels
242+
assertEquals(countLogMessages("Activity failure.", Level.WARN), 1);
243+
assertEquals(countLogMessages("Activity failure.", Level.DEBUG), 1);
244+
}
245+
246+
@Test
247+
public void localActivityFailureMetricBenignApplicationError() {
248+
reporter.assertNoMetric(
249+
MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER,
250+
getActivityTagsWithWorkerType("LocalActivityWorker", "LocalActivityWorkflow"));
251+
252+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
253+
LocalActivityWorkflow nonBenignStub =
254+
client.newWorkflowStub(
255+
LocalActivityWorkflow.class,
256+
WorkflowOptions.newBuilder()
257+
.setTaskQueue(testWorkflowRule.getTaskQueue())
258+
.validateBuildWithDefaults());
259+
260+
WorkflowFailedException e1 =
261+
assertThrows(WorkflowFailedException.class, () -> nonBenignStub.execute(false));
262+
263+
assertTrue(
264+
"Cause should be ActivityFailure",
265+
e1.getCause() instanceof io.temporal.failure.ActivityFailure);
266+
assertTrue(
267+
"Inner cause should be ApplicationFailure",
268+
e1.getCause().getCause() instanceof ApplicationFailure);
269+
assertFalse(
270+
"Failure should not be benign",
271+
ApplicationFailure.isBenignApplicationFailure(e1.getCause().getCause()));
272+
assertEquals(
273+
"Non-benign activity failure",
274+
((TemporalFailure) e1.getCause().getCause()).getOriginalMessage());
275+
276+
// Expect metrics to be incremented for non-benign failure
277+
reporter.assertCounter(
278+
MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER,
279+
getActivityTagsWithWorkerType("LocalActivityWorker", "LocalActivityWorkflow"),
280+
1);
281+
282+
WorkflowFailedException e2 =
283+
assertThrows(
284+
WorkflowFailedException.class,
285+
() ->
286+
client
287+
.newWorkflowStub(
288+
LocalActivityWorkflow.class,
289+
WorkflowOptions.newBuilder()
290+
.setTaskQueue(testWorkflowRule.getTaskQueue())
291+
.validateBuildWithDefaults())
292+
.execute(true));
293+
294+
assertTrue(
295+
"Cause should be ActivityFailure",
296+
e2.getCause() instanceof io.temporal.failure.ActivityFailure);
297+
assertTrue(
298+
"Inner cause should be ApplicationFailure",
299+
e2.getCause().getCause() instanceof ApplicationFailure);
300+
assertTrue(
301+
"Failure should be benign",
302+
ApplicationFailure.isBenignApplicationFailure(e2.getCause().getCause()));
303+
assertEquals(
304+
"Benign activity failure",
305+
((TemporalFailure) e2.getCause().getCause()).getOriginalMessage());
306+
307+
// Expect metrics to remain unchanged for benign failure
308+
reporter.assertCounter(
309+
MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER,
310+
getActivityTagsWithWorkerType("LocalActivityWorker", "LocalActivityWorkflow"),
311+
1);
312+
313+
// Verify log levels
314+
assertEquals(countLogMessages("Activity failure.", Level.WARN), 1);
315+
assertEquals(countLogMessages("Activity failure.", Level.DEBUG), 1);
316+
}
317+
}

0 commit comments

Comments
 (0)