88import com .google .protobuf .InvalidProtocolBufferException ;
99import com .google .protobuf .Message ;
1010import io .envoyproxy .envoy .config .core .v3 .GrpcService ;
11+ import io .envoyproxy .envoy .config .core .v3 .HeaderValueOption ;
1112import io .envoyproxy .envoy .extensions .filters .http .ext_proc .v3 .ExternalProcessor ;
1213import io .envoyproxy .envoy .service .ext_proc .v3 .ExternalProcessorGrpc ;
1314import io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest ;
@@ -202,6 +203,7 @@ private static class ExtProcClientCall<ReqT, RespT> extends SimpleForwardingClie
202203 private final java .util .Queue <Runnable > pendingActions = new java .util .concurrent .ConcurrentLinkedQueue <>();
203204 private ReqT lastRequestMessage ;
204205 final AtomicBoolean extProcStreamFailed = new AtomicBoolean (false );
206+ final AtomicBoolean extProcStreamCompleted = new AtomicBoolean (false );
205207
206208 protected ExtProcClientCall (ClientCall <ReqT , RespT > delegate ,
207209 ExternalProcessorGrpc .ExternalProcessorStub stub ,
@@ -272,7 +274,23 @@ public void onError(Throwable t) {
272274 }
273275 }
274276
275- @ Override public void onCompleted () {}
277+ @ Override
278+ public void onCompleted () {
279+ if (extProcStreamCompleted .compareAndSet (false , true )) {
280+ // The ext_proc server has gracefully closed the stream.
281+ // Unblock any part of the interceptor that is currently waiting.
282+ if (!headersSent ) {
283+ headersSent = true ;
284+ delegate ().start (wrappedListener , requestHeaders );
285+ drainQueue ();
286+ }
287+ if (lastRequestMessage != null ) {
288+ super .sendMessage (lastRequestMessage );
289+ lastRequestMessage = null ;
290+ }
291+ wrappedListener .unblockAfterStreamComplete ();
292+ }
293+ }
276294 });
277295
278296 wrappedListener .setStream (requestObserver );
@@ -286,6 +304,15 @@ public void onError(Throwable t) {
286304
287305 @ Override
288306 public void sendMessage (ReqT message ) {
307+ if (extProcStreamCompleted .get ()) {
308+ if (lastRequestMessage != null ) {
309+ super .sendMessage (lastRequestMessage );
310+ lastRequestMessage = null ;
311+ }
312+ super .sendMessage (message );
313+ return ;
314+ }
315+
289316 if (!headersSent ) {
290317 // If headers haven't been cleared by ext_proc yet, buffer the whole action
291318 pendingActions .add (() -> sendMessage (message ));
@@ -299,6 +326,9 @@ public void sendMessage(ReqT message) {
299326 }
300327
301328 private void sendRequestBodyToExtProc (ReqT message , boolean endOfStream ) {
329+ if (extProcStreamCompleted .get ()) {
330+ return ;
331+ }
302332 try (InputStream is = method .streamRequest (message )) {
303333 byte [] bodyBytes = ByteStreams .toByteArray (is );
304334 requestObserver .onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
@@ -358,6 +388,15 @@ private void handleImmediateResponse(io.envoyproxy.envoy.service.ext_proc.v3.Imm
358388
359389 @ Override
360390 public void halfClose () {
391+ if (extProcStreamCompleted .get ()) {
392+ if (lastRequestMessage != null ) {
393+ super .sendMessage (lastRequestMessage );
394+ lastRequestMessage = null ;
395+ }
396+ super .halfClose ();
397+ return ;
398+ }
399+
361400 if (lastRequestMessage != null ) {
362401 sendRequestBodyToExtProc (lastRequestMessage , true );
363402 lastRequestMessage = null ;
@@ -394,6 +433,10 @@ protected ExtProcListener(ClientCall.Listener<RespT> delegate, ClientCall<?, Res
394433
395434 @ Override
396435 public void onHeaders (Metadata headers ) {
436+ if (call .extProcStreamCompleted .get ()) {
437+ super .onHeaders (headers );
438+ return ;
439+ }
397440 this .savedHeaders = headers ;
398441 stream .onNext (ProcessingRequest .newBuilder ()
399442 .setResponseHeaders (io .envoyproxy .envoy .service .ext_proc .v3 .HttpHeaders .newBuilder ()
@@ -406,6 +449,15 @@ public void onHeaders(Metadata headers) {
406449
407450 @ Override
408451 public void onMessage (RespT message ) {
452+ if (call .extProcStreamCompleted .get ()) {
453+ if (lastMessage != null ) {
454+ super .onMessage (lastMessage );
455+ lastMessage = null ;
456+ }
457+ super .onMessage (message );
458+ return ;
459+ }
460+
409461 if (lastMessage != null ) {
410462 sendResponseBodyToExtProc (lastMessage , false );
411463 }
@@ -423,6 +475,15 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
423475 super .onClose (Status .UNAVAILABLE .withDescription ("External processor stream failed" ).withCause (status .getCause ()), new Metadata ());
424476 return ;
425477 }
478+ if (call .extProcStreamCompleted .get ()) {
479+ if (lastMessage != null ) {
480+ super .onMessage (lastMessage );
481+ lastMessage = null ;
482+ }
483+ super .onClose (status , trailers );
484+ return ;
485+ }
486+
426487 this .savedStatus = status ;
427488 this .savedTrailers = trailers ;
428489
@@ -440,6 +501,9 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
440501 }
441502
442503 private void sendResponseBodyToExtProc (RespT message , boolean endOfStream ) {
504+ if (call .extProcStreamCompleted .get ()) {
505+ return ;
506+ }
443507 try (java .io .InputStream is = method .streamResponse (message )) {
444508 // Use Guava to convert the server's response message to bytes
445509 byte [] bodyBytes = ByteStreams .toByteArray (is );
@@ -480,6 +544,20 @@ void proceedWithNextMessage() {
480544 RespT msg = messageQueue .poll ();
481545 if (msg != null ) super .onMessage (msg );
482546 }
547+
548+ void unblockAfterStreamComplete () {
549+ // This is called when the ext_proc stream is gracefully completed.
550+ // We need to flush any pending state that is waiting for a response from ext_proc.
551+ if (savedHeaders != null ) {
552+ proceedWithHeaders ();
553+ }
554+ while (messageQueue .peek () != null ) {
555+ proceedWithNextMessage ();
556+ }
557+ if (savedStatus != null ) {
558+ proceedWithClose ();
559+ }
560+ }
483561 }
484562
485563 @ VisibleForTesting
0 commit comments