1414import io .envoyproxy .envoy .service .ext_proc .v3 .ExternalProcessorGrpc ;
1515import io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest ;
1616import io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingResponse ;
17+ import io .grpc .Attributes ;
1718import io .grpc .CallOptions ;
1819import io .grpc .Channel ;
1920import io .grpc .ClientCall ;
@@ -141,6 +142,14 @@ static final class ExternalProcessorInterceptor implements ClientInterceptor {
141142 private final FilterConfig overrideConfig ;
142143 private final ScheduledExecutorService scheduler ;
143144
145+ private static final MethodDescriptor .Marshaller <InputStream > RAW_MARSHALLER =
146+ new MethodDescriptor .Marshaller <InputStream >() {
147+ @ Override
148+ public InputStream stream (InputStream value ) { return value ; }
149+ @ Override
150+ public InputStream parse (InputStream stream ) { return stream ; }
151+ };
152+
144153 ExternalProcessorInterceptor (ExternalProcessorFilter filter ,
145154 ExternalProcessorFilterConfig filterConfig ,
146155 @ Nullable FilterConfig overrideConfig , ScheduledExecutorService scheduler ) {
@@ -157,8 +166,73 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
157166 Channel next ) {
158167 ExternalProcessorGrpc .ExternalProcessorStub stub = filter .getExternalProcessorStub (filterConfig .externalProcessor );
159168 ExternalProcessor config = filterConfig .externalProcessor ;
160- // Wrap the outgoing call to intercept client events
161- return new ExtProcClientCall <>(next .newCall (method , callOptions ), stub , method , config );
169+
170+ MethodDescriptor <InputStream , InputStream > rawMethod = method .toBuilder (RAW_MARSHALLER , RAW_MARSHALLER ).build ();
171+ ClientCall <InputStream , InputStream > rawCall = next .newCall (rawMethod , callOptions );
172+
173+ ExtProcClientCall extProcCall = new ExtProcClientCall (rawCall , stub , config );
174+
175+ return new ClientCall <ReqT , RespT >() {
176+ @ Override
177+ public void start (Listener <RespT > responseListener , Metadata headers ) {
178+ extProcCall .start (new Listener <InputStream >() {
179+ @ Override
180+ public void onHeaders (Metadata headers ) {
181+ responseListener .onHeaders (headers );
182+ }
183+
184+ @ Override
185+ public void onMessage (InputStream message ) {
186+ responseListener .onMessage (method .getResponseMarshaller ().parse (message ));
187+ }
188+
189+ @ Override
190+ public void onClose (Status status , Metadata trailers ) {
191+ responseListener .onClose (status , trailers );
192+ }
193+
194+ @ Override
195+ public void onReady () {
196+ responseListener .onReady ();
197+ }
198+ }, headers );
199+ }
200+
201+ @ Override
202+ public void request (int numMessages ) {
203+ extProcCall .request (numMessages );
204+ }
205+
206+ @ Override
207+ public void cancel (@ Nullable String message , @ Nullable Throwable cause ) {
208+ extProcCall .cancel (message , cause );
209+ }
210+
211+ @ Override
212+ public void halfClose () {
213+ extProcCall .halfClose ();
214+ }
215+
216+ @ Override
217+ public void sendMessage (ReqT message ) {
218+ extProcCall .sendMessage (method .getRequestMarshaller ().stream (message ));
219+ }
220+
221+ @ Override
222+ public boolean isReady () {
223+ return extProcCall .isReady ();
224+ }
225+
226+ @ Override
227+ public void setMessageCompression (boolean enabled ) {
228+ extProcCall .setMessageCompression (enabled );
229+ }
230+
231+ @ Override
232+ public Attributes getAttributes () {
233+ return extProcCall .getAttributes ();
234+ }
235+ };
162236 }
163237
164238 // --- SHARED UTILITY METHODS ---
@@ -237,33 +311,30 @@ private static void applyHeaderMutations(Metadata headers, io.envoyproxy.envoy.s
237311 * Handles the bidirectional stream with the External Processor.
238312 * Buffers the actual RPC start until the Ext Proc header response is received.
239313 */
240- private static class ExtProcClientCall < ReqT , RespT > extends SimpleForwardingClientCall <ReqT , RespT > {
314+ private static class ExtProcClientCall extends SimpleForwardingClientCall <InputStream , InputStream > {
241315 private final ExternalProcessorGrpc .ExternalProcessorStub stub ;
242- private final MethodDescriptor <ReqT , RespT > method ;
243316 private final ExternalProcessor config ;
244317 private ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
245- private ExtProcListener < ReqT , RespT > wrappedListener ;
318+ private ExtProcListener wrappedListener ;
246319
247320 private boolean headersSent = false ;
248321 private Metadata requestHeaders ;
249322 private final java .util .Queue <Runnable > pendingActions = new java .util .concurrent .ConcurrentLinkedQueue <>();
250323 final AtomicBoolean extProcStreamFailed = new AtomicBoolean (false );
251324 final AtomicBoolean extProcStreamCompleted = new AtomicBoolean (false );
252325
253- protected ExtProcClientCall (ClientCall <ReqT , RespT > delegate ,
326+ protected ExtProcClientCall (ClientCall <InputStream , InputStream > delegate ,
254327 ExternalProcessorGrpc .ExternalProcessorStub stub ,
255- MethodDescriptor <ReqT , RespT > method ,
256328 ExternalProcessor config ) {
257329 super (delegate );
258330 this .stub = stub ;
259- this .method = method ;
260331 this .config = config ;
261332 }
262333
263334 @ Override
264- public void start (Listener <RespT > responseListener , Metadata headers ) {
335+ public void start (Listener <InputStream > responseListener , Metadata headers ) {
265336 this .requestHeaders = headers ;
266- this .wrappedListener = new ExternalProcessorInterceptor . ExtProcListener <> (responseListener , delegate (), method , this );
337+ this .wrappedListener = new ExtProcListener (responseListener , delegate (), this );
267338
268339 extProcClientCallRequestObserver = (ClientCallStreamObserver <ProcessingRequest >) stub .process (new io .grpc .stub .StreamObserver <ProcessingResponse >() {
269340 @ Override
@@ -396,33 +467,38 @@ public boolean isReady() {
396467 }
397468
398469 @ Override
399- public void sendMessage (ReqT message ) {
470+ public void sendMessage (InputStream message ) {
400471 if (extProcStreamCompleted .get ()) {
401472 super .sendMessage (message );
402473 return ;
403474 }
404475
405476 if (!headersSent && !config .getObservabilityMode ()) {
406477 // If headers haven't been cleared by ext_proc yet, buffer the whole action
407- pendingActions .add (() -> sendMessage (message ));
478+ try {
479+ byte [] bodyBytes = ByteStreams .toByteArray (message );
480+ pendingActions .add (() -> sendMessage (new ByteArrayInputStream (bodyBytes )));
481+ } catch (IOException e ) {
482+ delegate ().cancel ("Failed to read message" , e );
483+ }
408484 return ;
409485 }
410486
411- try ( InputStream is = method . streamRequest ( message )) {
412- byte [] bodyBytes = ByteStreams .toByteArray (is );
487+ try {
488+ byte [] bodyBytes = ByteStreams .toByteArray (message );
413489 extProcClientCallRequestObserver .onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
414490 .setRequestBody (io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .newBuilder ()
415491 .setBody (com .google .protobuf .ByteString .copyFrom (bodyBytes ))
416492 .setEndOfStream (false )
417493 .build ())
418494 .build ());
495+
496+ if (config .getObservabilityMode ()) {
497+ super .sendMessage (new ByteArrayInputStream (bodyBytes ));
498+ }
419499 } catch (IOException e ) {
420500 delegate ().cancel ("Failed to serialize message for External Processor" , e );
421501 }
422-
423- if (config .getObservabilityMode ()) {
424- super .sendMessage (message );
425- }
426502 }
427503
428504 @ Override
@@ -446,29 +522,18 @@ private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.B
446522 io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation = bodyResponse .getResponse ().getBodyMutation ();
447523 if (mutation .hasBody ()) {
448524 byte [] mutatedBody = mutation .getBody ().toByteArray ();
449- try (InputStream is = new ByteArrayInputStream (mutatedBody )) {
450- ReqT mutatedMessage = method .parseRequest (is );
451- super .sendMessage (mutatedMessage );
452- } catch (IOException e ) {
453- delegate ().cancel ("Failed to parse mutated message from External Processor" , e );
454- }
525+ super .sendMessage (new ByteArrayInputStream (mutatedBody ));
455526 } else if (mutation .getClearBody ()) {
456527 // "clear_body" means we should send an empty message.
457- try (InputStream is = new ByteArrayInputStream (new byte [0 ])) {
458- ReqT emptyMessage = method .parseRequest (is );
459- super .sendMessage (emptyMessage );
460- } catch (IOException e ) {
461- // This should not happen with an empty stream.
462- delegate ().cancel ("Failed to create empty message" , e );
463- }
528+ super .sendMessage (new ByteArrayInputStream (new byte [0 ]));
464529 }
465530 // If body mutation is present but has no body and clear_body is false, do nothing.
466531 // This means the processor chose to drop the message.
467532 }
468533 // If no response is present, the processor chose to drop the message.
469534 }
470535
471- private void handleResponseBodyResponse (io .envoyproxy .envoy .service .ext_proc .v3 .BodyResponse bodyResponse , ExternalProcessorInterceptor . ExtProcListener < ReqT , RespT > listener ) {
536+ private void handleResponseBodyResponse (io .envoyproxy .envoy .service .ext_proc .v3 .BodyResponse bodyResponse , ExtProcListener listener ) {
472537 if (bodyResponse .hasResponse () && bodyResponse .getResponse ().hasBodyMutation ()) {
473538 io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation = bodyResponse .getResponse ().getBodyMutation ();
474539 if (mutation .hasBody ()) {
@@ -484,14 +549,14 @@ private void drainQueue() {
484549 while ((action = pendingActions .poll ()) != null ) action .run ();
485550 }
486551
487- private void handleImmediateResponse (io .envoyproxy .envoy .service .ext_proc .v3 .ImmediateResponse immediate , Listener <RespT > listener ) {
552+ private void handleImmediateResponse (io .envoyproxy .envoy .service .ext_proc .v3 .ImmediateResponse immediate , Listener <InputStream > listener ) {
488553 io .grpc .Status status = io .grpc .Status .fromCodeValue (immediate .getGrpcStatus ().getStatus ());
489554 delegate ().cancel ("Rejected by ExtProc" , null );
490555 listener .onClose (status , new Metadata ());
491556 extProcClientCallRequestObserver .onCompleted ();
492557 }
493558
494- private void handleFailOpen (ExtProcListener < ReqT , RespT > listener ) {
559+ private void handleFailOpen (ExtProcListener listener ) {
495560 if (extProcStreamCompleted .compareAndSet (false , true )) {
496561 // The ext_proc stream is gone. "Fail open" means we proceed with the RPC
497562 // without any more processing.
@@ -505,19 +570,17 @@ private void handleFailOpen(ExtProcListener<ReqT, RespT> listener) {
505570 }
506571 }
507572
508- private static class ExtProcListener <ReqT , RespT > extends ForwardingClientCallListener .SimpleForwardingClientCallListener <RespT > {
509- private final MethodDescriptor <?, RespT > method ;
510- private final ClientCall <?, RespT > callDelegate ; // The actual RPC call
511- private final ExtProcClientCall <ReqT , RespT > extProcClientCall ;
573+ private static class ExtProcListener extends ForwardingClientCallListener .SimpleForwardingClientCallListener <InputStream > {
574+ private final ClientCall <?, ?> callDelegate ; // The actual RPC call
575+ private final ExtProcClientCall extProcClientCall ;
512576 private ClientCallStreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ;
513577 private Metadata savedHeaders ;
514578 private Metadata savedTrailers ;
515579 private io .grpc .Status savedStatus ;
516580
517- protected ExtProcListener (ClientCall .Listener <RespT > delegate , ClientCall <?, RespT > callDelegate ,
518- MethodDescriptor <?, RespT > method , ExtProcClientCall < ReqT , RespT > extProcClientCall ) {
581+ protected ExtProcListener (ClientCall .Listener <InputStream > delegate , ClientCall <?, ? > callDelegate ,
582+ ExtProcClientCall extProcClientCall ) {
519583 super (delegate );
520- this .method = method ;
521584 this .callDelegate = callDelegate ;
522585 this .extProcClientCall = extProcClientCall ;
523586 }
@@ -552,15 +615,21 @@ public void onHeaders(Metadata headers) {
552615 void proceedWithHeaders () { super .onHeaders (savedHeaders ); }
553616
554617 @ Override
555- public void onMessage (RespT message ) {
618+ public void onMessage (InputStream message ) {
556619 if (extProcClientCall .extProcStreamCompleted .get ()) {
557620 super .onMessage (message );
558621 return ;
559622 }
560- sendResponseBodyToExtProc (message , false );
561-
562- if (extProcClientCall .config .getObservabilityMode ()) {
563- super .onMessage (message );
623+
624+ try {
625+ byte [] bodyBytes = ByteStreams .toByteArray (message );
626+ sendResponseBodyToExtProc (bodyBytes , false );
627+
628+ if (extProcClientCall .config .getObservabilityMode ()) {
629+ super .onMessage (new ByteArrayInputStream (bodyBytes ));
630+ }
631+ } catch (IOException e ) {
632+ callDelegate .cancel ("Failed to read server response" , e );
564633 }
565634 }
566635
@@ -597,40 +666,21 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
597666 }
598667 }
599668
600- private void sendResponseBodyToExtProc (@ Nullable RespT message , boolean endOfStream ) {
669+ private void sendResponseBodyToExtProc (@ Nullable byte [] bodyBytes , boolean endOfStream ) {
601670 if (extProcClientCall .extProcStreamCompleted .get ()) {
602671 return ;
603672 }
604- try {
605- io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .Builder bodyBuilder =
606- io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .newBuilder ();
607- if (message != null ) {
608- try (java .io .InputStream is = method .streamResponse (message )) {
609- byte [] bodyBytes = ByteStreams .toByteArray (is );
610- bodyBuilder .setBody (com .google .protobuf .ByteString .copyFrom (bodyBytes ));
611- }
612- }
613- bodyBuilder .setEndOfStream (endOfStream );
614-
615- extProcClientCall .extProcClientCallRequestObserver .onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
616- .setResponseBody (bodyBuilder .build ())
617- .build ());
618673
619- } catch (java .io .IOException e ) {
620- // 1. Notify the external processor stream of the failure
621- stream .onError (io .grpc .Status .INTERNAL
622- .withDescription ("Failed to serialize server response for ExtProc" )
623- .withCause (e )
624- .asRuntimeException ());
625-
626- // 2. Kill the RPC toward the remote service
627- // This tells the transport to stop receiving/sending data immediately.
628- callDelegate .cancel ("Serialization error in interceptor" , e );
629-
630- // 3. Notify the local application
631- // This triggers the client's StreamObserver.onError()
632- super .onClose (io .grpc .Status .INTERNAL .withDescription ("Failed to process server response" ), new Metadata ());
674+ io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .Builder bodyBuilder =
675+ io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .newBuilder ();
676+ if (bodyBytes != null ) {
677+ bodyBuilder .setBody (com .google .protobuf .ByteString .copyFrom (bodyBytes ));
633678 }
679+ bodyBuilder .setEndOfStream (endOfStream );
680+
681+ extProcClientCall .extProcClientCallRequestObserver .onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
682+ .setResponseBody (bodyBuilder .build ())
683+ .build ());
634684 }
635685
636686 /**
@@ -641,14 +691,7 @@ void proceedWithClose() {
641691 }
642692
643693 void onExternalBody (com .google .protobuf .ByteString body ) {
644- try (InputStream is = body .newInput ()) {
645- RespT message = method .parseResponse (is );
646- super .onMessage (message );
647- } catch (Exception e ) {
648- // This will happen if the ext_proc server sends invalid protobuf data.
649- // We should probably fail the call.
650- super .onClose (Status .INTERNAL .withDescription ("Failed to parse response from ext_proc" ).withCause (e ), new Metadata ());
651- }
694+ super .onMessage (body .newInput ());
652695 }
653696
654697 void unblockAfterStreamComplete () {
@@ -657,7 +700,6 @@ void unblockAfterStreamComplete() {
657700 if (savedHeaders != null ) {
658701 proceedWithHeaders ();
659702 }
660- // No message queue to flush anymore.
661703 if (savedStatus != null ) {
662704 proceedWithClose ();
663705 }
0 commit comments