Skip to content

Commit a333d53

Browse files
committed
The reported hang grpc#12109 in blockingUnaryCall was caused by an unhandled framing exception during the draining of DelayedStream. When MessageFramer throws an
exception (e.g., RESOURCE_EXHAUSTED), it bubbles up through DelayedStream.drainPendingCalls and is eventually caught and swallowed by ThreadlessExecutor.runQuietly. This leaves the DelayedStream in an inconsistent state (where passThrough is still false), and the responseFuture never completes, causing the blocking call to hang forever. I have implemented a fix that adds proper exception handling to the draining loops in both DelayedStream and DelayedClientCall. When an exception occurs during draining: 1. The realStream (or realCall) is explicitly cancelled with the error. 2. The pending calls are cleared. 3. The stream/call transitions to passThrough = true to prevent getting stuck. 4. The listener's pending callbacks are drained, ensuring that any closure notifications are delivered to the application.
1 parent 7c45aac commit a333d53

File tree

4 files changed

+103
-8
lines changed

4 files changed

+103
-8
lines changed

core/src/main/java/io/grpc/internal/DelayedClientCall.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ private void drainPendingCalls() {
294294
assert !passThrough;
295295
List<Runnable> toRun = new ArrayList<>();
296296
DelayedListener<RespT> delayedListener ;
297+
drainOut:
297298
while (true) {
298299
synchronized (this) {
299300
if (pendingRunnables.isEmpty()) {
@@ -311,8 +312,18 @@ private void drainPendingCalls() {
311312
}
312313
for (Runnable runnable : toRun) {
313314
// Must not call transport while lock is held to prevent deadlocks.
314-
// TODO(ejona): exception handling
315-
runnable.run();
315+
try {
316+
runnable.run();
317+
} catch (Throwable t) {
318+
Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls");
319+
realCall.cancel(status.getDescription(), status.getCause());
320+
synchronized (this) {
321+
pendingRunnables = null;
322+
passThrough = true;
323+
delayedListener = this.delayedListener;
324+
}
325+
break drainOut;
326+
}
316327
}
317328
toRun.clear();
318329
}
@@ -519,6 +530,7 @@ public void run() {
519530
void drainPendingCallbacks() {
520531
assert !passThrough;
521532
List<Runnable> toRun = new ArrayList<>();
533+
drainOut:
522534
while (true) {
523535
synchronized (this) {
524536
if (pendingCallbacks.isEmpty()) {
@@ -535,8 +547,15 @@ void drainPendingCallbacks() {
535547
}
536548
for (Runnable runnable : toRun) {
537549
// Avoid calling listener while lock is held to prevent deadlocks.
538-
// TODO(ejona): exception handling
539-
runnable.run();
550+
try {
551+
runnable.run();
552+
} catch (Throwable t) {
553+
synchronized (this) {
554+
pendingCallbacks = null;
555+
passThrough = true;
556+
}
557+
throw t;
558+
}
540559
}
541560
toRun.clear();
542561
}

core/src/main/java/io/grpc/internal/DelayedStream.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ private void drainPendingCalls() {
172172
assert !passThrough;
173173
List<Runnable> toRun = new ArrayList<>();
174174
DelayedStreamListener delayedListener = null;
175+
drainOut:
175176
while (true) {
176177
synchronized (this) {
177178
if (pendingCalls.isEmpty()) {
@@ -189,8 +190,18 @@ private void drainPendingCalls() {
189190
}
190191
for (Runnable runnable : toRun) {
191192
// Must not call transport while lock is held to prevent deadlocks.
192-
// TODO(ejona): exception handling
193-
runnable.run();
193+
try {
194+
runnable.run();
195+
} catch (Throwable t) {
196+
Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls");
197+
realStream.cancel(status);
198+
synchronized (this) {
199+
pendingCalls = null;
200+
passThrough = true;
201+
delayedListener = this.delayedListener;
202+
}
203+
break drainOut;
204+
}
194205
}
195206
toRun.clear();
196207
}
@@ -525,6 +536,7 @@ public void run() {
525536
public void drainPendingCallbacks() {
526537
assert !passThrough;
527538
List<Runnable> toRun = new ArrayList<>();
539+
drainOut:
528540
while (true) {
529541
synchronized (this) {
530542
if (pendingCallbacks.isEmpty()) {
@@ -541,8 +553,15 @@ public void drainPendingCallbacks() {
541553
}
542554
for (Runnable runnable : toRun) {
543555
// Avoid calling listener while lock is held to prevent deadlocks.
544-
// TODO(ejona): exception handling
545-
runnable.run();
556+
try {
557+
runnable.run();
558+
} catch (Throwable t) {
559+
synchronized (this) {
560+
pendingCallbacks = null;
561+
passThrough = true;
562+
}
563+
throw t;
564+
}
546565
}
547566
toRun.clear();
548567
}

core/src/test/java/io/grpc/internal/DelayedClientCallTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static java.util.concurrent.TimeUnit.SECONDS;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.ArgumentMatchers.same;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.never;
2526
import static org.mockito.Mockito.verify;
@@ -229,6 +230,32 @@ public void delayedCallsRunUnderContext() throws Exception {
229230
assertThat(contextKey.get(readyContext.get())).isEqualTo(goldenValue);
230231
}
231232

233+
@Test
234+
public void drainPendingCallFails() {
235+
DelayedClientCall<String, Integer> delayedClientCall =
236+
new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null);
237+
delayedClientCall.start(listener, new Metadata());
238+
delayedClientCall.request(1);
239+
240+
final RuntimeException error = new RuntimeException("fail");
241+
org.mockito.Mockito.doAnswer(new org.mockito.stubbing.Answer<Void>() {
242+
@Override
243+
public Void answer(org.mockito.invocation.InvocationOnMock invocation) {
244+
throw error;
245+
}
246+
}).when(mockRealCall).request(1);
247+
248+
Runnable runnable = delayedClientCall.setCall(mockRealCall);
249+
assertThat(runnable).isNotNull();
250+
try {
251+
runnable.run();
252+
} catch (RuntimeException e) {
253+
assertThat(e).isSameInstanceAs(error);
254+
}
255+
256+
verify(mockRealCall).cancel(eq("Failed to drain pending calls"), same(error));
257+
}
258+
232259
private void callMeMaybe(Runnable r) {
233260
if (r != null) {
234261
r.run();

core/src/test/java/io/grpc/internal/DelayedStreamTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,36 @@ public Void answer(InvocationOnMock in) {
472472
.matches("\\[test_op_delay=[0-9]+ns, remote_addr=127\\.0\\.0\\.1:443\\]");
473473
}
474474

475+
@Test
476+
public void drainPendingCallFails() {
477+
stream.start(listener);
478+
stream.request(1);
479+
final RuntimeException error = new RuntimeException("fail");
480+
doAnswer(new Answer<Void>() {
481+
@Override
482+
public Void answer(InvocationOnMock invocation) {
483+
throw error;
484+
}
485+
}).when(realStream).request(1);
486+
487+
Runnable runnable = stream.setStream(realStream);
488+
assertNotNull(runnable);
489+
try {
490+
runnable.run();
491+
} catch (RuntimeException e) {
492+
assertThat(e).isSameInstanceAs(error);
493+
}
494+
495+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
496+
verify(realStream).cancel(statusCaptor.capture());
497+
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.UNKNOWN);
498+
assertThat(statusCaptor.getValue().getCause()).isSameInstanceAs(error);
499+
500+
verify(realStream).start(listenerCaptor.capture());
501+
listenerCaptor.getValue().closed(statusCaptor.getValue(), RpcProgress.PROCESSED, new Metadata());
502+
verify(listener).closed(same(statusCaptor.getValue()), any(RpcProgress.class), any(Metadata.class));
503+
}
504+
475505
private void callMeMaybe(Runnable r) {
476506
if (r != null) {
477507
r.run();

0 commit comments

Comments
 (0)