Skip to content

Commit 7561d0b

Browse files
jnowjack-lucidchartkannanjgithubCopilot
authored
Cancel DelayedClientCall when application listener throws (#12761)
Align DelayedClientCall.DelayedListener with ClientCallImpl's existing behavior for listener exceptions. When the application listener throws from onHeaders/onMessage/onReady, catch the Throwable, cancel the call with CANCELLED (cause = the throwable), and swallow subsequent callbacks. When onClose throws, log and continue, matching ClientCallImpl.closeObserver. If onClose arrives from the transport after a prior callback threw, override its status/trailers with the captured CANCELLED so a server-supplied OK can't mask the local failure. Previously, a throw from the application listener escaped to the callExecutor's uncaught-exception handler. The real call was not cancelled and the transport kept delivering callbacks to an already broken listener, different from how the same bug behaves on a normal ClientCallImpl, and a timing-dependent inconsistency depending on whether callbacks arrived before or after setCall + drain completed. Trade-off: listener-callback throws are no longer visible to the executor's UncaughtExceptionHandler (they're attached as Status.cause instead). This matches ClientCallImpl and is the intended behavior. Exception handling for the outer drainPendingCalls loop (realCall.sendMessage/request/halfClose/cancel) remains unaddressed; that TODO is preserved. **Note:** This change only handles exceptions thrown by the application listener. I don't try and solve the problems that #12737 is attempting to fix. My motivation is to fix the root cause behind bazelbuild/bazel#29316 --------- Co-authored-by: Kannan J <kannanjgithub@google.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
1 parent ec10992 commit 7561d0b

2 files changed

Lines changed: 302 additions & 10 deletions

File tree

core/src/main/java/io/grpc/internal/DelayedClientCall.java

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public final void start(Listener<RespT> listener, final Metadata headers) {
206206
savedError = error;
207207
savedPassThrough = passThrough;
208208
if (!savedPassThrough) {
209-
listener = delayedListener = new DelayedListener<>(listener);
209+
listener = delayedListener = new DelayedListener<>(this, listener);
210210
startHeaders = headers;
211211
}
212212
}
@@ -445,15 +445,33 @@ public void runInContext() {
445445
}
446446

447447
private static final class DelayedListener<RespT> extends Listener<RespT> {
448+
private final DelayedClientCall<?, RespT> call;
448449
private final Listener<RespT> realListener;
449450
private volatile boolean passThrough;
451+
private volatile Status exceptionStatus;
450452
@GuardedBy("this")
451453
private List<Runnable> pendingCallbacks = new ArrayList<>();
452454

453-
public DelayedListener(Listener<RespT> listener) {
455+
public DelayedListener(DelayedClientCall<?, RespT> call, Listener<RespT> listener) {
456+
this.call = call;
454457
this.realListener = listener;
455458
}
456459

460+
/**
461+
* Cancels call and schedules onClose() notification. May only be called from within a
462+
* DelayedListener callback dispatch (either queued drain or passThrough). Visibility of the
463+
* write to {@code exceptionStatus} does not rely on a single callback executor; it is a
464+
* {@code volatile} field, and callback queuing/pass-through transitions are coordinated by
465+
* this listener's synchronization so subsequent callbacks observe the updated status.
466+
*/
467+
private void exceptionThrown(Throwable t, String description) {
468+
// onClose() must be delivered exactly once and last. Other callbacks may already be queued
469+
// ahead of realCall's eventual onClose, so we can't call onClose() here. We set the status
470+
// and overwrite the onClose() details when it arrives.
471+
exceptionStatus = Status.CANCELLED.withCause(t).withDescription(description);
472+
call.cancel(description, t);
473+
}
474+
457475
private void delayOrExecute(Runnable runnable) {
458476
synchronized (this) {
459477
if (!passThrough) {
@@ -467,55 +485,104 @@ private void delayOrExecute(Runnable runnable) {
467485
@Override
468486
public void onHeaders(final Metadata headers) {
469487
if (passThrough) {
470-
realListener.onHeaders(headers);
488+
deliverHeaders(headers);
471489
} else {
472490
delayOrExecute(new Runnable() {
473491
@Override
474492
public void run() {
475-
realListener.onHeaders(headers);
493+
deliverHeaders(headers);
476494
}
477495
});
478496
}
479497
}
480498

499+
private void deliverHeaders(Metadata headers) {
500+
if (exceptionStatus != null) {
501+
return;
502+
}
503+
try {
504+
realListener.onHeaders(headers);
505+
} catch (Throwable t) {
506+
exceptionThrown(t, "Failed to read headers");
507+
}
508+
}
509+
481510
@Override
482511
public void onMessage(final RespT message) {
483512
if (passThrough) {
484-
realListener.onMessage(message);
513+
deliverMessage(message);
485514
} else {
486515
delayOrExecute(new Runnable() {
487516
@Override
488517
public void run() {
489-
realListener.onMessage(message);
518+
deliverMessage(message);
490519
}
491520
});
492521
}
493522
}
494523

524+
private void deliverMessage(RespT message) {
525+
if (exceptionStatus != null) {
526+
return;
527+
}
528+
try {
529+
realListener.onMessage(message);
530+
} catch (Throwable t) {
531+
exceptionThrown(t, "Failed to read message.");
532+
}
533+
}
534+
495535
@Override
496536
public void onClose(final Status status, final Metadata trailers) {
497537
delayOrExecute(new Runnable() {
498538
@Override
499539
public void run() {
500-
realListener.onClose(status, trailers);
540+
Status effectiveStatus = status;
541+
Metadata effectiveTrailers = trailers;
542+
if (exceptionStatus != null) {
543+
// Ideally status matches exceptionStatus, since exceptionStatus was used to cancel
544+
// the call. However, cancel() may reconstruct a new Status instance, and the cancel
545+
// is racy so this onClose may have already been queued when the cancellation
546+
// occurred. Since other callbacks throw away data if exceptionStatus != null, it is
547+
// semantically essential that we _not_ use a status provided by the server.
548+
effectiveStatus = exceptionStatus;
549+
// Replace trailers to prevent mixing sources of status and trailers.
550+
effectiveTrailers = new Metadata();
551+
}
552+
try {
553+
realListener.onClose(effectiveStatus, effectiveTrailers);
554+
} catch (RuntimeException ex) {
555+
logger.log(Level.WARNING, "Exception thrown by onClose() in ClientCall", ex);
556+
}
501557
}
502558
});
503559
}
504560

505561
@Override
506562
public void onReady() {
507563
if (passThrough) {
508-
realListener.onReady();
564+
deliverOnReady();
509565
} else {
510566
delayOrExecute(new Runnable() {
511567
@Override
512568
public void run() {
513-
realListener.onReady();
569+
deliverOnReady();
514570
}
515571
});
516572
}
517573
}
518574

575+
private void deliverOnReady() {
576+
if (exceptionStatus != null) {
577+
return;
578+
}
579+
try {
580+
realListener.onReady();
581+
} catch (Throwable t) {
582+
exceptionThrown(t, "Failed to call onReady.");
583+
}
584+
}
585+
519586
void drainPendingCallbacks() {
520587
assert !passThrough;
521588
List<Runnable> toRun = new ArrayList<>();
@@ -535,7 +602,6 @@ void drainPendingCallbacks() {
535602
}
536603
for (Runnable runnable : toRun) {
537604
// Avoid calling listener while lock is held to prevent deadlocks.
538-
// TODO(ejona): exception handling
539605
runnable.run();
540606
}
541607
toRun.clear();

core/src/test/java/io/grpc/internal/DelayedClientCallTest.java

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,232 @@ public void delayedCallsRunUnderContext() throws Exception {
229229
assertThat(contextKey.get(readyContext.get())).isEqualTo(goldenValue);
230230
}
231231

232+
@Test
233+
public void listenerThrowsInPendingCallback_cancelsRealCall() {
234+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
235+
callExecutor, fakeClock.getScheduledExecutorService(), null);
236+
final RuntimeException boom = new RuntimeException("boom");
237+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
238+
@Override
239+
public void onMessage(Integer msg) {
240+
throw boom;
241+
}
242+
};
243+
delayedClientCall.start(throwingListener, new Metadata());
244+
// Deliver onMessage while the wrapping DelayedListener is still buffering, by firing
245+
// it from within realCall.start() — drainPendingCalls has not yet flipped the listener
246+
// to pass-through. The queued onMessage is then drained and throws; the fix must catch
247+
// the throwable and cancel the real call rather than let it escape.
248+
Runnable r = delayedClientCall.setCall(new SimpleForwardingClientCall<String, Integer>(
249+
mockRealCall) {
250+
@Override
251+
public void start(Listener<Integer> listener, Metadata metadata) {
252+
super.start(listener, metadata);
253+
listener.onMessage(42);
254+
}
255+
});
256+
assertThat(r).isNotNull();
257+
r.run(); // Must not propagate `boom`.
258+
verify(mockRealCall).cancel(eq("Failed to read message."), eq(boom));
259+
}
260+
261+
@Test
262+
public void listenerThrowsInPendingOnHeaders_cancelsRealCall() {
263+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
264+
callExecutor, fakeClock.getScheduledExecutorService(), null);
265+
final RuntimeException boom = new RuntimeException("boom");
266+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
267+
@Override
268+
public void onHeaders(Metadata headers) {
269+
throw boom;
270+
}
271+
};
272+
delayedClientCall.start(throwingListener, new Metadata());
273+
Runnable r = delayedClientCall.setCall(new SimpleForwardingClientCall<String, Integer>(
274+
mockRealCall) {
275+
@Override
276+
public void start(Listener<Integer> listener, Metadata metadata) {
277+
super.start(listener, metadata);
278+
listener.onHeaders(new Metadata());
279+
}
280+
});
281+
assertThat(r).isNotNull();
282+
r.run();
283+
verify(mockRealCall).cancel(eq("Failed to read headers"), eq(boom));
284+
}
285+
286+
@Test
287+
public void listenerThrowsInPendingOnReady_cancelsRealCall() {
288+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
289+
callExecutor, fakeClock.getScheduledExecutorService(), null);
290+
final RuntimeException boom = new RuntimeException("boom");
291+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
292+
@Override
293+
public void onReady() {
294+
throw boom;
295+
}
296+
};
297+
delayedClientCall.start(throwingListener, new Metadata());
298+
Runnable r = delayedClientCall.setCall(new SimpleForwardingClientCall<String, Integer>(
299+
mockRealCall) {
300+
@Override
301+
public void start(Listener<Integer> listener, Metadata metadata) {
302+
super.start(listener, metadata);
303+
listener.onReady();
304+
}
305+
});
306+
assertThat(r).isNotNull();
307+
r.run();
308+
verify(mockRealCall).cancel(eq("Failed to call onReady."), eq(boom));
309+
}
310+
311+
@Test
312+
public void onCloseExceptionCaughtAndLogged() {
313+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
314+
callExecutor, fakeClock.getScheduledExecutorService(), null);
315+
final RuntimeException boom = new RuntimeException("boom");
316+
final AtomicReference<Status> observed = new AtomicReference<>();
317+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
318+
@Override
319+
public void onClose(Status status, Metadata trailers) {
320+
observed.set(status);
321+
throw boom;
322+
}
323+
};
324+
delayedClientCall.start(throwingListener, new Metadata());
325+
Runnable r = delayedClientCall.setCall(new SimpleForwardingClientCall<String, Integer>(
326+
mockRealCall) {
327+
@Override
328+
public void start(Listener<Integer> listener, Metadata metadata) {
329+
super.start(listener, metadata);
330+
listener.onClose(Status.DATA_LOSS, new Metadata());
331+
}
332+
});
333+
assertThat(r).isNotNull();
334+
r.run(); // Must not propagate `boom`.
335+
assertThat(observed.get().getCode()).isEqualTo(Status.Code.DATA_LOSS);
336+
verify(mockRealCall, never()).cancel(any(), any());
337+
}
338+
339+
@Test
340+
public void listenerThrowsInPassThroughOnMessage_cancelsRealCall() {
341+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
342+
callExecutor, fakeClock.getScheduledExecutorService(), null);
343+
final RuntimeException boom = new RuntimeException("boom");
344+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
345+
@Override
346+
public void onMessage(Integer msg) {
347+
throw boom;
348+
}
349+
};
350+
delayedClientCall.start(throwingListener, new Metadata());
351+
Runnable r = delayedClientCall.setCall(mockRealCall);
352+
assertThat(r).isNotNull();
353+
r.run(); // drain completes, listener transitions to passThrough
354+
@SuppressWarnings("unchecked")
355+
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(Listener.class);
356+
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
357+
Listener<Integer> realCallListener = listenerCaptor.getValue();
358+
realCallListener.onMessage(42); // dispatched on passThrough fast path
359+
verify(mockRealCall).cancel(eq("Failed to read message."), eq(boom));
360+
}
361+
362+
@Test
363+
public void listenerThrowsInPassThroughOnHeaders_cancelsRealCall() {
364+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
365+
callExecutor, fakeClock.getScheduledExecutorService(), null);
366+
final RuntimeException boom = new RuntimeException("boom");
367+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
368+
@Override
369+
public void onHeaders(Metadata headers) {
370+
throw boom;
371+
}
372+
};
373+
delayedClientCall.start(throwingListener, new Metadata());
374+
Runnable r = delayedClientCall.setCall(mockRealCall);
375+
assertThat(r).isNotNull();
376+
r.run();
377+
@SuppressWarnings("unchecked")
378+
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(Listener.class);
379+
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
380+
Listener<Integer> realCallListener = listenerCaptor.getValue();
381+
realCallListener.onHeaders(new Metadata());
382+
verify(mockRealCall).cancel(eq("Failed to read headers"), eq(boom));
383+
}
384+
385+
@Test
386+
public void listenerThrowsInPassThroughOnReady_cancelsRealCall() {
387+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
388+
callExecutor, fakeClock.getScheduledExecutorService(), null);
389+
final RuntimeException boom = new RuntimeException("boom");
390+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
391+
@Override
392+
public void onReady() {
393+
throw boom;
394+
}
395+
};
396+
delayedClientCall.start(throwingListener, new Metadata());
397+
Runnable r = delayedClientCall.setCall(mockRealCall);
398+
assertThat(r).isNotNull();
399+
r.run();
400+
@SuppressWarnings("unchecked")
401+
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(Listener.class);
402+
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
403+
Listener<Integer> realCallListener = listenerCaptor.getValue();
404+
realCallListener.onReady();
405+
verify(mockRealCall).cancel(eq("Failed to call onReady."), eq(boom));
406+
}
407+
408+
@Test
409+
public void listenerThrowsInPassThrough_subsequentCallbacksSwallowedAndOnCloseOverridden() {
410+
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
411+
callExecutor, fakeClock.getScheduledExecutorService(), null);
412+
final RuntimeException boom = new RuntimeException("boom");
413+
final AtomicReference<Integer> lastMessage = new AtomicReference<>();
414+
final AtomicReference<Status> closeStatus = new AtomicReference<>();
415+
final AtomicReference<Metadata> closeTrailers = new AtomicReference<>();
416+
ClientCall.Listener<Integer> throwingListener = new ClientCall.Listener<Integer>() {
417+
@Override
418+
public void onMessage(Integer msg) {
419+
lastMessage.set(msg);
420+
if (msg == 1) {
421+
throw boom;
422+
}
423+
}
424+
425+
@Override
426+
public void onClose(Status status, Metadata trailers) {
427+
closeStatus.set(status);
428+
closeTrailers.set(trailers);
429+
}
430+
};
431+
delayedClientCall.start(throwingListener, new Metadata());
432+
Runnable r = delayedClientCall.setCall(mockRealCall);
433+
assertThat(r).isNotNull();
434+
r.run();
435+
@SuppressWarnings("unchecked")
436+
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(Listener.class);
437+
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
438+
Listener<Integer> realCallListener = listenerCaptor.getValue();
439+
440+
realCallListener.onMessage(1); // throws -> exceptionStatus captured
441+
assertThat(lastMessage.get()).isEqualTo(1);
442+
verify(mockRealCall).cancel(eq("Failed to read message."), eq(boom));
443+
444+
// Later callbacks are swallowed — the listener must not see message 2.
445+
realCallListener.onMessage(2);
446+
assertThat(lastMessage.get()).isEqualTo(1);
447+
448+
// Transport onClose with OK must be overridden by the captured CANCELLED status.
449+
Metadata serverTrailers = new Metadata();
450+
serverTrailers.put(Metadata.Key.of("k", Metadata.ASCII_STRING_MARSHALLER), "v");
451+
realCallListener.onClose(Status.OK, serverTrailers);
452+
assertThat(closeStatus.get().getCode()).isEqualTo(Status.Code.CANCELLED);
453+
assertThat(closeStatus.get().getCause()).isEqualTo(boom);
454+
// Trailers replaced to avoid mixing sources.
455+
assertThat(closeTrailers.get()).isNotSameInstanceAs(serverTrailers);
456+
}
457+
232458
private void callMeMaybe(Runnable r) {
233459
if (r != null) {
234460
r.run();

0 commit comments

Comments
 (0)