Skip to content

Commit 11b1280

Browse files
committed
Implement Category 5 unit tests and fix response-side closure bug
1 parent 51e1184 commit 11b1280

File tree

2 files changed

+209
-14
lines changed

2 files changed

+209
-14
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -842,24 +842,28 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
842842
return;
843843
}
844844

845-
if (extProcClientCall.config.getProcessingMode().getResponseTrailerMode() != ProcessingMode.HeaderSendMode.SEND) {
846-
super.onClose(status, trailers);
847-
if (!extProcClientCall.config.getObservabilityMode()) {
848-
extProcClientCall.closeExtProcStream();
849-
}
850-
return;
851-
}
852-
853845
this.savedStatus = status;
854846
this.savedTrailers = trailers;
855847

856-
sendResponseBodyToExtProc(null, true);
848+
if (extProcClientCall.config.getProcessingMode().getResponseBodyMode() == ProcessingMode.BodySendMode.GRPC) {
849+
sendResponseBodyToExtProc(null, true);
850+
}
857851

858-
extProcClientCall.sendToExtProc(ProcessingRequest.newBuilder()
859-
.setResponseTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder()
860-
.setTrailers(toHeaderMap(savedTrailers))
861-
.build())
862-
.build());
852+
if (extProcClientCall.config.getProcessingMode().getResponseTrailerMode() == ProcessingMode.HeaderSendMode.SEND) {
853+
extProcClientCall.sendToExtProc(ProcessingRequest.newBuilder()
854+
.setResponseTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder()
855+
.setTrailers(toHeaderMap(savedTrailers))
856+
.build())
857+
.build());
858+
} else {
859+
// If we are not sending trailers, and not waiting for body EOS, proceed with close.
860+
if (extProcClientCall.config.getProcessingMode().getResponseBodyMode() != ProcessingMode.BodySendMode.GRPC) {
861+
proceedWithClose();
862+
if (!extProcClientCall.config.getObservabilityMode()) {
863+
extProcClientCall.closeExtProcStream();
864+
}
865+
}
866+
}
863867

864868
if (extProcClientCall.config.getObservabilityMode()) {
865869
super.onClose(status, trailers);

xds/src/test/java/io/grpc/xds/ExternalProcessorFilterTest.java

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,197 @@ public void givenDeferredHalfClose_whenExtProcRespondsWithEndOfStream_thenSuperH
800800
Mockito.verify(mockRawCall).halfClose();
801801
}
802802

803+
// --- Category 5: Body Mutation: Inbound/Response (GRPC Mode) ---
804+
805+
@Test
806+
@SuppressWarnings("unchecked")
807+
public void givenResponseBodyModeGrpc_whenOnMessageCalled_thenMessageIsSentToExtProc() throws Exception {
808+
ExternalProcessor proto = ExternalProcessor.newBuilder()
809+
.setGrpcService(GrpcService.newBuilder()
810+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
811+
.setTargetUri("in-process:///sidecar")
812+
.addChannelCredentialsPlugin(Any.newBuilder()
813+
.setTypeUrl("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials")
814+
.build())
815+
.build())
816+
.build())
817+
.setProcessingMode(ProcessingMode.newBuilder()
818+
.setRequestHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
819+
.setResponseHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
820+
.setResponseBodyMode(ProcessingMode.BodySendMode.GRPC).build())
821+
.build();
822+
ExternalProcessorFilterConfig filterConfig = provider.parseFilterConfig(Any.pack(proto), filterContext).config;
823+
824+
ManagedChannel mockSidecarChannel = Mockito.mock(ManagedChannel.class);
825+
ClientCall<ProcessingRequest, ProcessingResponse> mockSidecarCall = Mockito.mock(ClientCall.class);
826+
Mockito.when(mockSidecarChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
827+
.thenReturn(mockSidecarCall);
828+
829+
CachedChannelManager mockChannelManager = Mockito.mock(CachedChannelManager.class);
830+
Mockito.when(mockChannelManager.getChannel(Mockito.any())).thenReturn(mockSidecarChannel);
831+
832+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
833+
filterConfig, mockChannelManager, scheduler);
834+
835+
Channel mockNextChannel = Mockito.mock(Channel.class);
836+
ClientCall<InputStream, InputStream> mockRawCall = Mockito.mock(ClientCall.class);
837+
Mockito.when(mockNextChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
838+
.thenReturn(mockRawCall);
839+
840+
ArgumentCaptor<ClientCall.Listener<InputStream>> rawListenerCaptor = ArgumentCaptor.forClass(ClientCall.Listener.class);
841+
842+
CallOptions callOptions = CallOptions.DEFAULT.withExecutor(Executors.newSingleThreadExecutor());
843+
ClientCall<String, String> proxyCall = interceptor.interceptCall(METHOD_SAY_HELLO, callOptions, mockNextChannel);
844+
proxyCall.start(Mockito.mock(ClientCall.Listener.class), new Metadata());
845+
846+
Mockito.verify(mockRawCall).start(rawListenerCaptor.capture(), Mockito.any());
847+
848+
// Simulate server response message
849+
rawListenerCaptor.getValue().onMessage(new ByteArrayInputStream("Server Message".getBytes()));
850+
851+
// Verify sent to sidecar
852+
ArgumentCaptor<ProcessingRequest> requestCaptor = ArgumentCaptor.forClass(ProcessingRequest.class);
853+
Mockito.verify(mockSidecarCall).sendMessage(requestCaptor.capture());
854+
assertThat(requestCaptor.getValue().hasResponseBody()).isTrue();
855+
assertThat(requestCaptor.getValue().getResponseBody().getBody().toStringUtf8()).isEqualTo("Server Message");
856+
}
857+
858+
@Test
859+
@SuppressWarnings("unchecked")
860+
public void givenResponseBodyModeGrpc_whenExtProcRespondsWithMutatedBody_thenMutatedBodyIsDeliveredToClient() throws Exception {
861+
ExternalProcessor proto = ExternalProcessor.newBuilder()
862+
.setGrpcService(GrpcService.newBuilder()
863+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
864+
.setTargetUri("in-process:///sidecar")
865+
.addChannelCredentialsPlugin(Any.newBuilder()
866+
.setTypeUrl("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials")
867+
.build())
868+
.build())
869+
.build())
870+
.setProcessingMode(ProcessingMode.newBuilder()
871+
.setRequestHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
872+
.setResponseHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
873+
.setResponseBodyMode(ProcessingMode.BodySendMode.GRPC).build())
874+
.build();
875+
ExternalProcessorFilterConfig filterConfig = provider.parseFilterConfig(Any.pack(proto), filterContext).config;
876+
877+
ManagedChannel mockSidecarChannel = Mockito.mock(ManagedChannel.class);
878+
ClientCall<ProcessingRequest, ProcessingResponse> mockSidecarCall = Mockito.mock(ClientCall.class);
879+
Mockito.when(mockSidecarChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
880+
.thenReturn(mockSidecarCall);
881+
882+
CachedChannelManager mockChannelManager = Mockito.mock(CachedChannelManager.class);
883+
Mockito.when(mockChannelManager.getChannel(Mockito.any())).thenReturn(mockSidecarChannel);
884+
885+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
886+
filterConfig, mockChannelManager, scheduler);
887+
888+
Channel mockNextChannel = Mockito.mock(Channel.class);
889+
ClientCall<InputStream, InputStream> mockRawCall = Mockito.mock(ClientCall.class);
890+
Mockito.when(mockNextChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
891+
.thenReturn(mockRawCall);
892+
893+
ArgumentCaptor<ClientCall.Listener<InputStream>> rawListenerCaptor = ArgumentCaptor.forClass(ClientCall.Listener.class);
894+
ArgumentCaptor<ClientCall.Listener<ProcessingResponse>> sidecarListenerCaptor = ArgumentCaptor.forClass(ClientCall.Listener.class);
895+
ClientCall.Listener<String> mockAppListener = Mockito.mock(ClientCall.Listener.class);
896+
897+
CallOptions callOptions = CallOptions.DEFAULT.withExecutor(Executors.newSingleThreadExecutor());
898+
ClientCall<String, String> proxyCall = interceptor.interceptCall(METHOD_SAY_HELLO, callOptions, mockNextChannel);
899+
proxyCall.start(mockAppListener, new Metadata());
900+
901+
Mockito.verify(mockRawCall).start(rawListenerCaptor.capture(), Mockito.any());
902+
Mockito.verify(mockSidecarCall).start(sidecarListenerCaptor.capture(), Mockito.any());
903+
904+
rawListenerCaptor.getValue().onMessage(new ByteArrayInputStream("Original".getBytes()));
905+
906+
// Simulate sidecar response with mutated body
907+
ProcessingResponse resp = ProcessingResponse.newBuilder()
908+
.setResponseBody(BodyResponse.newBuilder()
909+
.setResponse(CommonResponse.newBuilder()
910+
.setBodyMutation(BodyMutation.newBuilder()
911+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
912+
.setBody(ByteString.copyFromUtf8("Mutated Server"))
913+
.build())
914+
.build())
915+
.build())
916+
.build())
917+
.build();
918+
sidecarListenerCaptor.getValue().onMessage(resp);
919+
920+
// Verify app listener received mutated body
921+
Mockito.verify(mockAppListener).onMessage("Mutated Server");
922+
}
923+
924+
@Test
925+
@SuppressWarnings("unchecked")
926+
public void givenResponseBodyModeGrpc_whenExtProcRespondsWithEndOfStream_thenClientListenerCloseIsPropagated() throws Exception {
927+
ExternalProcessor proto = ExternalProcessor.newBuilder()
928+
.setGrpcService(GrpcService.newBuilder()
929+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
930+
.setTargetUri("in-process:///sidecar")
931+
.addChannelCredentialsPlugin(Any.newBuilder()
932+
.setTypeUrl("type.googleapis.com/envoy.extensions.grpc_service.channel_credentials.insecure.v3.InsecureCredentials")
933+
.build())
934+
.build())
935+
.build())
936+
.setProcessingMode(ProcessingMode.newBuilder()
937+
.setRequestHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
938+
.setResponseHeaderMode(ProcessingMode.HeaderSendMode.SKIP)
939+
.setResponseBodyMode(ProcessingMode.BodySendMode.GRPC).build())
940+
.build();
941+
ExternalProcessorFilterConfig filterConfig = provider.parseFilterConfig(Any.pack(proto), filterContext).config;
942+
943+
ManagedChannel mockSidecarChannel = Mockito.mock(ManagedChannel.class);
944+
ClientCall<ProcessingRequest, ProcessingResponse> mockSidecarCall = Mockito.mock(ClientCall.class);
945+
Mockito.when(mockSidecarChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
946+
.thenReturn(mockSidecarCall);
947+
948+
CachedChannelManager mockChannelManager = Mockito.mock(CachedChannelManager.class);
949+
Mockito.when(mockChannelManager.getChannel(Mockito.any())).thenReturn(mockSidecarChannel);
950+
951+
ExternalProcessorInterceptor interceptor = new ExternalProcessorInterceptor(
952+
filterConfig, mockChannelManager, scheduler);
953+
954+
Channel mockNextChannel = Mockito.mock(Channel.class);
955+
ClientCall<InputStream, InputStream> mockRawCall = Mockito.mock(ClientCall.class);
956+
Mockito.when(mockNextChannel.newCall(Mockito.any(MethodDescriptor.class), Mockito.any(CallOptions.class)))
957+
.thenReturn(mockRawCall);
958+
959+
ArgumentCaptor<ClientCall.Listener<InputStream>> rawListenerCaptor = ArgumentCaptor.forClass(ClientCall.Listener.class);
960+
ArgumentCaptor<ClientCall.Listener<ProcessingResponse>> sidecarListenerCaptor = ArgumentCaptor.forClass(ClientCall.Listener.class);
961+
ClientCall.Listener<String> mockAppListener = Mockito.mock(ClientCall.Listener.class);
962+
963+
CallOptions callOptions = CallOptions.DEFAULT.withExecutor(Executors.newSingleThreadExecutor());
964+
ClientCall<String, String> proxyCall = interceptor.interceptCall(METHOD_SAY_HELLO, callOptions, mockNextChannel);
965+
proxyCall.start(mockAppListener, new Metadata());
966+
967+
Mockito.verify(mockRawCall).start(rawListenerCaptor.capture(), Mockito.any());
968+
Mockito.verify(mockSidecarCall).start(sidecarListenerCaptor.capture(), Mockito.any());
969+
970+
// Simulate server closing call
971+
rawListenerCaptor.getValue().onClose(Status.OK, new Metadata());
972+
973+
// Verify app listener NOT closed yet (waiting for sidecar EOS)
974+
Mockito.verify(mockAppListener, Mockito.never()).onClose(Mockito.any(), Mockito.any());
975+
976+
// Sidecar confirms EOS
977+
ProcessingResponse resp = ProcessingResponse.newBuilder()
978+
.setResponseBody(BodyResponse.newBuilder()
979+
.setResponse(CommonResponse.newBuilder()
980+
.setBodyMutation(BodyMutation.newBuilder()
981+
.setStreamedResponse(StreamedBodyResponse.newBuilder()
982+
.setEndOfStreamWithoutMessage(true)
983+
.build())
984+
.build())
985+
.build())
986+
.build())
987+
.build();
988+
sidecarListenerCaptor.getValue().onMessage(resp);
989+
990+
// Verify app listener finally closed
991+
Mockito.verify(mockAppListener).onClose(Mockito.eq(Status.OK), Mockito.any());
992+
}
993+
803994
@Test
804995
public void requestHeadersMutated() throws Exception {
805996
ExternalProcessor proto = ExternalProcessor.newBuilder()

0 commit comments

Comments
 (0)