@@ -485,8 +485,14 @@ private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.B
485485 }
486486
487487 private void handleResponseBodyResponse (io .envoyproxy .envoy .service .ext_proc .v3 .BodyResponse bodyResponse , ExternalProcessorInterceptor .ExtProcListener <ReqT , RespT > listener ) {
488- // Pass the (potentially modified) message to the real listener
489- listener .proceedWithNextMessage ();
488+ if (bodyResponse .hasResponse () && bodyResponse .getResponse ().hasBodyMutation ()) {
489+ io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation = bodyResponse .getResponse ().getBodyMutation ();
490+ if (mutation .hasBody ()) {
491+ listener .onExternalBody (mutation .getBody ());
492+ } else if (mutation .getClearBody ()) {
493+ listener .onExternalBody (com .google .protobuf .ByteString .EMPTY );
494+ }
495+ }
490496 }
491497
492498 private void drainQueue () {
@@ -523,7 +529,6 @@ private static class ExtProcListener<ReqT, RespT> extends ForwardingClientCallLi
523529 private Metadata savedHeaders ;
524530 private Metadata savedTrailers ;
525531 private io .grpc .Status savedStatus ;
526- private final java .util .Queue <RespT > messageQueue = new java .util .concurrent .ConcurrentLinkedQueue <>();
527532
528533 protected ExtProcListener (ClientCall .Listener <RespT > delegate , ClientCall <?, RespT > callDelegate ,
529534 MethodDescriptor <?, RespT > method , ExtProcClientCall <ReqT , RespT > call ) {
@@ -565,8 +570,6 @@ public void onMessage(RespT message) {
565570
566571 if (call .config .getObservabilityMode ()) {
567572 super .onMessage (message );
568- } else {
569- messageQueue .add (message );
570573 }
571574 }
572575
@@ -646,9 +649,15 @@ void proceedWithClose() {
646649 super .onClose (savedStatus , savedTrailers );
647650 }
648651
649- void proceedWithNextMessage () {
650- RespT msg = messageQueue .poll ();
651- if (msg != null ) super .onMessage (msg );
652+ void onExternalBody (com .google .protobuf .ByteString body ) {
653+ try (InputStream is = body .newInput ()) {
654+ RespT message = method .parseResponse (is );
655+ super .onMessage (message );
656+ } catch (Exception e ) {
657+ // This will happen if the ext_proc server sends invalid protobuf data.
658+ // We should probably fail the call.
659+ super .onClose (Status .INTERNAL .withDescription ("Failed to parse response from ext_proc" ).withCause (e ), new Metadata ());
660+ }
652661 }
653662
654663 void unblockAfterStreamComplete () {
@@ -657,9 +666,7 @@ void unblockAfterStreamComplete() {
657666 if (savedHeaders != null ) {
658667 proceedWithHeaders ();
659668 }
660- while (messageQueue .peek () != null ) {
661- proceedWithNextMessage ();
662- }
669+ // No message queue to flush anymore.
663670 if (savedStatus != null ) {
664671 proceedWithClose ();
665672 }
0 commit comments