33import static com .google .common .truth .Truth .assertThat ;
44
55import com .google .protobuf .Any ;
6+ import com .google .protobuf .ByteString ;
67import io .envoyproxy .envoy .config .core .v3 .GrpcService ;
78import io .envoyproxy .envoy .extensions .filters .http .ext_proc .v3 .ExternalProcessor ;
89import io .envoyproxy .envoy .extensions .filters .http .ext_proc .v3 .ProcessingMode ;
10+ import io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation ;
911import io .envoyproxy .envoy .service .ext_proc .v3 .BodyResponse ;
1012import io .envoyproxy .envoy .service .ext_proc .v3 .CommonResponse ;
1113import io .envoyproxy .envoy .service .ext_proc .v3 .ExternalProcessorGrpc ;
1214import io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation ;
1315import io .envoyproxy .envoy .service .ext_proc .v3 .HeadersResponse ;
1416import io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest ;
1517import io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingResponse ;
18+ import io .envoyproxy .envoy .service .ext_proc .v3 .TrailersResponse ;
1619import io .grpc .CallOptions ;
1720import io .grpc .Channel ;
1821import io .grpc .ClientInterceptor ;
3538import io .grpc .xds .ExternalProcessorFilter .ExternalProcessorFilterConfig ;
3639import io .grpc .xds .ExternalProcessorFilter .ExternalProcessorInterceptor ;
3740import io .grpc .xds .internal .grpcservice .CachedChannelManager ;
41+ import io .grpc .xds .internal .grpcservice .ChannelCredsConfig ;
3842import io .grpc .xds .internal .grpcservice .ConfiguredChannelCredentials ;
3943import io .grpc .xds .internal .grpcservice .GrpcServiceXdsContext ;
4044import io .grpc .xds .internal .grpcservice .GrpcServiceXdsContextProvider ;
41- import io .grpc .xds .internal .grpcservice .ChannelCredsConfig ;
4245import java .io .ByteArrayInputStream ;
46+ import java .io .ByteArrayOutputStream ;
47+ import java .io .IOException ;
4348import java .io .InputStream ;
4449import java .util .ArrayList ;
4550import java .util .List ;
@@ -84,15 +89,15 @@ public InputStream stream(String value) {
8489 @ Override
8590 public String parse (InputStream stream ) {
8691 try {
87- java . io . ByteArrayOutputStream buffer = new java . io . ByteArrayOutputStream ();
92+ ByteArrayOutputStream buffer = new ByteArrayOutputStream ();
8893 int nRead ;
8994 byte [] data = new byte [1024 ];
9095 while ((nRead = stream .read (data , 0 , data .length )) != -1 ) {
9196 buffer .write (data , 0 , nRead );
9297 }
9398 buffer .flush ();
9499 return new String (buffer .toByteArray ());
95- } catch (java . io . IOException e ) {
100+ } catch (IOException e ) {
96101 throw new RuntimeException (e );
97102 }
98103 }
@@ -122,7 +127,7 @@ public void setUp() throws Exception {
122127 private ExternalProcessorFilterConfig createFilterConfig () {
123128 GrpcService grpcService = GrpcService .newBuilder ()
124129 .setGoogleGrpc (GrpcService .GoogleGrpc .newBuilder ()
125- .setTargetUri (extProcServerName )
130+ .setTargetUri ("in-process:" + extProcServerName )
126131 .setStatPrefix ("ext_proc" )
127132 .build ())
128133 .build ();
@@ -137,7 +142,6 @@ private ExternalProcessorFilterConfig createFilterConfig() {
137142
138143 ExternalProcessorFilter .Provider provider = new ExternalProcessorFilter .Provider ();
139144
140- // Provide a context that supplies Insecure credentials for testing
141145 GrpcServiceXdsContextProvider contextProvider = targetUri -> {
142146 ConfiguredChannelCredentials credentials = ConfiguredChannelCredentials .create (
143147 InsecureChannelCredentials .create (),
@@ -150,10 +154,8 @@ private ExternalProcessorFilterConfig createFilterConfig() {
150154 return GrpcServiceXdsContext .create (false , Optional .of (allowedGrpcService ), true );
151155 };
152156
153- // 1. Create the filter instance via the provider
154157 this .filter = provider .newInstance ("ext-proc" , contextProvider );
155158
156- // 2. Parse the config using the provider
157159 ConfigOrError <ExternalProcessorFilterConfig > configOrError =
158160 provider .parseFilterConfig (Any .pack (externalProcessor ));
159161
@@ -170,7 +172,7 @@ public void requestHeadersMutated() throws Exception {
170172 grpcCleanup .register (InProcessChannelBuilder .forName (extProcServerName ).directExecutor ().build ())
171173 );
172174 ClientInterceptor interceptor = new ExternalProcessorInterceptor (filterConfig , testChannelManager );
173-
175+
174176 Channel interceptedChannel = ClientInterceptors .intercept (dataPlaneChannel , interceptor );
175177
176178 // Data Plane Server
@@ -198,14 +200,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
198200 dataPlaneServiceRegistry .addService (interceptedServiceDef );
199201
200202 // Ext-Proc Server
201- List <ProcessingRequest > receivedRequests = new ArrayList <>();
202203 ExternalProcessorGrpc .ExternalProcessorImplBase extProcImpl = new ExternalProcessorGrpc .ExternalProcessorImplBase () {
203204 @ Override
204205 public StreamObserver <ProcessingRequest > process (StreamObserver <ProcessingResponse > responseObserver ) {
205206 return new StreamObserver <ProcessingRequest >() {
206207 @ Override
207208 public void onNext (ProcessingRequest request ) {
208- receivedRequests .add (request );
209209 if (request .hasRequestHeaders ()) {
210210 responseObserver .onNext (ProcessingResponse .newBuilder ()
211211 .setRequestHeaders (HeadersResponse .newBuilder ()
@@ -223,18 +223,39 @@ public void onNext(ProcessingRequest request) {
223223 .build ());
224224 } else if (request .hasRequestBody ()) {
225225 responseObserver .onNext (ProcessingResponse .newBuilder ()
226- .setRequestBody (BodyResponse .newBuilder ().build ())
226+ .setRequestBody (BodyResponse .newBuilder ()
227+ .setResponse (CommonResponse .newBuilder ()
228+ .setBodyMutation (BodyMutation .newBuilder ()
229+ .setBody (request .getRequestBody ().getBody ())
230+ .build ())
231+ .build ())
232+ .build ())
233+ .build ());
234+ } else if (request .hasResponseHeaders ()) {
235+ responseObserver .onNext (ProcessingResponse .newBuilder ()
236+ .setResponseHeaders (HeadersResponse .newBuilder ()
237+ .setResponse (CommonResponse .newBuilder ().build ())
238+ .build ())
239+ .build ());
240+ } else if (request .hasResponseBody ()) {
241+ responseObserver .onNext (ProcessingResponse .newBuilder ()
242+ .setResponseBody (BodyResponse .newBuilder ()
243+ .setResponse (CommonResponse .newBuilder ()
244+ .setBodyMutation (BodyMutation .newBuilder ()
245+ .setBody (request .getResponseBody ().getBody ())
246+ .build ())
247+ .build ())
248+ .build ())
249+ .build ());
250+ } else if (request .hasResponseTrailers ()) {
251+ responseObserver .onNext (ProcessingResponse .newBuilder ()
252+ .setResponseTrailers (TrailersResponse .newBuilder ().build ())
227253 .build ());
228254 }
229255 }
230256
231- @ Override
232- public void onError (Throwable t ) {}
233-
234- @ Override
235- public void onCompleted () {
236- responseObserver .onCompleted ();
237- }
257+ @ Override public void onError (Throwable t ) {}
258+ @ Override public void onCompleted () { responseObserver .onCompleted (); }
238259 };
239260 }
240261 };
0 commit comments