Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 8e1de20

Browse files
lukecwikdavorbonaci
authored andcommitted
Handle exceptions from within the SDK while processing work
The DataflowWorkerHarness when processing work wasn't handling general exceptions from being thrown causing the processing threads to all die leaving a zombie worker. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=104664789
1 parent ca1ebdc commit 8e1de20

2 files changed

Lines changed: 38 additions & 10 deletions

File tree

sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorkerHarness.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,7 @@ public Boolean call() {
142142
boolean success = true;
143143
try {
144144
do { // We loop getting and processing work.
145-
try {
146-
LOG.debug("Thread starting getAndPerformWork.");
147-
success = worker.getAndPerformWork();
148-
LOG.debug("{} processing one WorkItem.", success ? "Finished" : "Failed");
149-
} catch (IOException e) { // If there is a problem getting work.
150-
success = false;
151-
}
145+
success = doWork();
152146
if (success) {
153147
backOff.reset();
154148
}
@@ -158,10 +152,27 @@ public Boolean call() {
158152
LOG.error("Already tried several attempts at working on tasks. Aborting.", e);
159153
} catch (InterruptedException e) {
160154
LOG.error("Interrupted during thread execution or sleep.", e);
155+
} catch (Throwable t) {
156+
LOG.error("Thread {} died.", Thread.currentThread().getId(), t);
161157
}
162158
return false;
163159
}
164160

161+
private boolean doWork() {
162+
try {
163+
LOG.debug("Thread starting getAndPerformWork.");
164+
boolean success = worker.getAndPerformWork();
165+
LOG.debug("{} processing one WorkItem.", success ? "Finished" : "Failed");
166+
return success;
167+
} catch (IOException e) { // If there is a problem getting work.
168+
LOG.debug("There was a problem getting work.", e);
169+
return false;
170+
} catch (Exception e) { // These exceptions are caused by bugs within the SDK
171+
LOG.error("There was an unhandled error caused by the Dataflow SDK.", e);
172+
return false;
173+
}
174+
}
175+
165176
private final DataflowWorker worker;
166177
private final Sleeper sleeper;
167178
private final BackOff backOff;
@@ -183,6 +194,7 @@ static void processWork(DataflowWorkerHarnessOptions pipelineOptions,
183194
LOG.debug("Waiting for {} worker threads", numThreads);
184195
// We wait forever unless there is a big problem.
185196
executor.invokeAll(tasks);
197+
LOG.error("All threads died.");
186198
}
187199

188200
static DataflowWorker create(DataflowWorkerHarnessOptions options) {

sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/worker/DataflowWorkerHarnessTest.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,8 @@ public void setUp() throws Exception {
9494
pipelineOptions.setGcpCredential(new TestCredential());
9595
}
9696

97-
@Test
98-
public void testThatWeRetryIfTaskExecutionFailAgainAndAgain() throws Exception {
97+
public void runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain() throws Exception {
9998
final int numWorkers = Math.max(Runtime.getRuntime().availableProcessors(), 1);
100-
when(mockDataflowWorker.getAndPerformWork()).thenReturn(false);
10199
final AtomicInteger sleepCount = new AtomicInteger(0);
102100
final AtomicInteger illegalIntervalCount = new AtomicInteger(0);
103101
DataflowWorkerHarness.processWork(
@@ -124,6 +122,24 @@ public void sleep(long millis) throws InterruptedException {
124122
assertEquals(0, illegalIntervalCount.get());
125123
}
126124

125+
@Test
126+
public void testThatWeRetryIfTaskExecutionFailAgainAndAgain() throws Exception {
127+
when(mockDataflowWorker.getAndPerformWork()).thenReturn(false);
128+
runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain();
129+
}
130+
131+
@Test
132+
public void testThatWeRetryIfTaskExecutionFailAgainAndAgainByIOException() throws Exception {
133+
when(mockDataflowWorker.getAndPerformWork()).thenThrow(new IOException());
134+
runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain();
135+
}
136+
137+
@Test
138+
public void testThatWeRetryIfTaskExecutionFailAgainAndAgainByUnknownException() throws Exception {
139+
when(mockDataflowWorker.getAndPerformWork()).thenThrow(new RuntimeException());
140+
runTestThatWeRetryIfTaskExecutionFailsAgainAndAgain();
141+
}
142+
127143
@Test
128144
public void testNumberOfWorkerHarnessThreadsIsHonored() throws Exception {
129145
final int expectedNumberOfThreads = 5;

0 commit comments

Comments
 (0)