Skip to content

Commit b93499d

Browse files
committed
The stream between the filter and the external processor was never being closed on the client side, causing the InProcessChannel and InProcessServer to hang during shutdown while waiting for the active RPC to terminate.
To fix this, I have updated ExternalProcessorFilter.java to ensure the control plane stream is gracefully closed when the data plane RPC completes or is cancelled. Changes made: 1. Closing on Completion: In ExtProcClientCall.onNext, once the ResponseTrailers handshake is finished and the application has been notified via proceedWithClose(), I now call extProcClientCallRequestObserver.onCompleted(). 2. Handling Cancellation: I overridden the cancel() method in ExtProcClientCall. If the data plane RPC is cancelled by the application, the filter now also cancels the external processor stream with an error, ensuring all resources are freed. 3. Observability Mode Fix: In observability mode, since we don't wait for a ResponseTrailers message from the server, I added logic to ExtProcListener.onClose() to close the external processor stream immediately after sending the final trailers. These changes ensure proper lifecycle management of the side-channel RPC.
1 parent 3b22a86 commit b93499d

File tree

1 file changed

+10
-0
lines changed

1 file changed

+10
-0
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ else if (response.hasResponseBody()) {
441441
}
442442
// Finally notify the local app of the completion
443443
wrappedListener.proceedWithClose();
444+
extProcClientCallRequestObserver.onCompleted();
444445
}
445446
}
446447

@@ -544,6 +545,14 @@ public void halfClose() {
544545
super.halfClose();
545546
}
546547

548+
@Override
549+
public void cancel(@Nullable String message, @Nullable Throwable cause) {
550+
if (extProcClientCallRequestObserver != null) {
551+
extProcClientCallRequestObserver.onError(Status.CANCELLED.withDescription(message).withCause(cause).asRuntimeException());
552+
}
553+
super.cancel(message, cause);
554+
}
555+
547556
private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse bodyResponse) {
548557
if (bodyResponse.hasResponse() && bodyResponse.getResponse().hasBodyMutation()) {
549558
io.envoyproxy.envoy.service.ext_proc.v3.BodyMutation mutation = bodyResponse.getResponse().getBodyMutation();
@@ -691,6 +700,7 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
691700

692701
if (extProcClientCall.config.getObservabilityMode()) {
693702
super.onClose(status, trailers);
703+
extProcClientCall.extProcClientCallRequestObserver.onCompleted();
694704
}
695705
}
696706

0 commit comments

Comments
 (0)