@@ -104,8 +104,8 @@ public ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message r
104104 @ Nullable
105105 @ Override
106106 public ClientInterceptor buildClientInterceptor (FilterConfig filterConfig ,
107- @ Nullable FilterConfig overrideConfig , ScheduledExecutorService scheduler ) {
108- return new ExternalProcessorInterceptor ((ExternalProcessorFilterConfig ) filterConfig );
107+ @ Nullable FilterConfig overrideConfig , java . util . concurrent . ScheduledExecutorService scheduler ) {
108+ return new ExternalProcessorInterceptor ((ExternalProcessorFilterConfig ) filterConfig , scheduler );
109109 }
110110
111111 static final class ExternalProcessorFilterConfig implements FilterConfig {
@@ -127,6 +127,7 @@ public String typeUrl() {
127127 static final class ExternalProcessorInterceptor implements ClientInterceptor {
128128 private final CachedChannelManager cachedChannelManager ;
129129 private final ExternalProcessorFilterConfig filterConfig ;
130+ private final java .util .concurrent .ScheduledExecutorService scheduler ;
130131
131132 private static final MethodDescriptor .Marshaller <InputStream > RAW_MARSHALLER =
132133 new MethodDescriptor .Marshaller <InputStream >() {
@@ -136,14 +137,17 @@ static final class ExternalProcessorInterceptor implements ClientInterceptor {
136137 public InputStream parse (InputStream stream ) { return stream ; }
137138 };
138139
139- ExternalProcessorInterceptor (ExternalProcessorFilterConfig filterConfig ) {
140- this (filterConfig , new CachedChannelManager ());
140+ ExternalProcessorInterceptor (ExternalProcessorFilterConfig filterConfig ,
141+ java .util .concurrent .ScheduledExecutorService scheduler ) {
142+ this (filterConfig , new CachedChannelManager (), scheduler );
141143 }
142144
143145 ExternalProcessorInterceptor (ExternalProcessorFilterConfig filterConfig ,
144- CachedChannelManager cachedChannelManager ) {
146+ CachedChannelManager cachedChannelManager ,
147+ java .util .concurrent .ScheduledExecutorService scheduler ) {
145148 this .filterConfig = filterConfig ;
146149 this .cachedChannelManager = checkNotNull (cachedChannelManager , "cachedChannelManager" );
150+ this .scheduler = checkNotNull (scheduler , "scheduler" );
147151 }
148152
149153 @ Override
@@ -197,7 +201,12 @@ public void start(Listener<ExtRespT> responseListener, Metadata headers) {
197201 MethodDescriptor <InputStream , InputStream > rawMethod = method .toBuilder (RAW_MARSHALLER , RAW_MARSHALLER ).build ();
198202 ClientCall <InputStream , InputStream > rawCall = next .newCall (rawMethod , callOptions );
199203
200- ExtProcClientCall extProcCall = new ExtProcClientCall (rawCall , stub , config );
204+ // Create a local subclass instance to buffer outbound actions
205+ ExtProcDelayedCall <InputStream , InputStream > delayedCall =
206+ new ExtProcDelayedCall <>(
207+ callOptions .getExecutor (), scheduler , callOptions .getDeadline ());
208+
209+ ExtProcClientCall extProcCall = new ExtProcClientCall (delayedCall , rawCall , stub , config );
201210
202211 return new ClientCall <ReqT , RespT >() {
203212 @ Override
@@ -334,48 +343,60 @@ private static void applyHeaderMutations(Metadata headers, io.envoyproxy.envoy.s
334343 }
335344 }
336345
346+ /**
347+ * A local subclass to expose the protected constructor of DelayedClientCall.
348+ */
349+ private static class ExtProcDelayedCall <ReqT , RespT > extends io .grpc .internal .DelayedClientCall <ReqT , RespT > {
350+ ExtProcDelayedCall (java .util .concurrent .Executor executor , java .util .concurrent .ScheduledExecutorService scheduler , @ Nullable io .grpc .Deadline deadline ) {
351+ super (executor , scheduler , deadline );
352+ }
353+ }
354+
337355 /**
338356 * Handles the bidirectional stream with the External Processor.
339357 * Buffers the actual RPC start until the Ext Proc header response is received.
340358 */
341359 private static class ExtProcClientCall extends SimpleForwardingClientCall <InputStream , InputStream > {
342360 private final ExternalProcessorGrpc .ExternalProcessorStub stub ;
343361 private final ExternalProcessor config ;
344- private final Object requestLock = new Object ();
362+ private final ClientCall <InputStream , InputStream > rawCall ;
363+ private final ExtProcDelayedCall <InputStream , InputStream > delayedCall ;
345364 private final Object responseLock = new Object ();
346365 private final Object streamLock = new Object ();
347366 private ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
348367 private ExtProcListener wrappedListener ;
349368
350- private volatile boolean headersSent = false ;
351369 private Metadata requestHeaders ;
352- private final java .util .Queue <Runnable > pendingActions = new java .util .concurrent .ConcurrentLinkedQueue <>();
353370 final AtomicBoolean extProcStreamFailed = new AtomicBoolean (false );
354371 final AtomicBoolean extProcStreamCompleted = new AtomicBoolean (false );
355372 final AtomicBoolean drainingExtProcStream = new AtomicBoolean (false );
356373
357- protected ExtProcClientCall (ClientCall <InputStream , InputStream > delegate ,
374+ protected ExtProcClientCall (
375+ ExtProcDelayedCall <InputStream , InputStream > delayedCall ,
376+ ClientCall <InputStream , InputStream > rawCall ,
358377 ExternalProcessorGrpc .ExternalProcessorStub stub ,
359378 ExternalProcessor config ) {
360- super (delegate );
379+ super (delayedCall );
380+ this .delayedCall = delayedCall ;
381+ this .rawCall = rawCall ;
361382 this .stub = stub ;
362383 this .config = config ;
363384 }
364385
365- private void sendToDataPlane (Runnable action ) {
366- synchronized (requestLock ) {
367- if (headersSent ) {
368- action .run ();
369- } else {
370- pendingActions .add (action );
371- }
386+ private void activateCall () {
387+ Runnable toRun = delayedCall .setCall (rawCall );
388+ if (toRun != null ) {
389+ toRun .run ();
372390 }
373391 }
374392
375393 @ Override
376394 public void start (Listener <InputStream > responseListener , Metadata headers ) {
377395 this .requestHeaders = headers ;
378- this .wrappedListener = new ExtProcListener (responseListener , delegate (), this );
396+ this .wrappedListener = new ExtProcListener (responseListener , rawCall , this );
397+
398+ // DelayedClientCall.start will buffer the listener and headers until setCall is called.
399+ super .start (wrappedListener , headers );
379400
380401 extProcClientCallRequestObserver = (ClientCallStreamObserver <ProcessingRequest >) stub .process (new io .grpc .stub .StreamObserver <ProcessingResponse >() {
381402 @ Override
@@ -404,11 +425,7 @@ public void onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse re
404425 if (response .getRequestHeaders ().hasResponse ()) {
405426 applyHeaderMutations (requestHeaders , response .getRequestHeaders ().getResponse ().getHeaderMutation ());
406427 }
407- synchronized (requestLock ) {
408- headersSent = true ;
409- delegate ().start (wrappedListener , requestHeaders );
410- drainQueue ();
411- }
428+ activateCall ();
412429 }
413430 // 2. Client Message (Request Body)
414431 else if (response .hasRequestBody ()) {
@@ -475,7 +492,7 @@ public void onError(Throwable t) {
475492 handleFailOpen (wrappedListener );
476493 } else {
477494 if (extProcStreamFailed .compareAndSet (false , true )) {
478- delegate () .cancel ("External processor stream failed" , t );
495+ rawCall .cancel ("External processor stream failed" , t );
479496 }
480497 }
481498 }
@@ -502,10 +519,7 @@ public void onCompleted() {
502519 }
503520
504521 if (config .getObservabilityMode ()) {
505- synchronized (requestLock ) {
506- headersSent = true ;
507- delegate ().start (wrappedListener , headers );
508- }
522+ activateCall ();
509523 }
510524 }
511525
@@ -561,10 +575,10 @@ public void sendMessage(InputStream message) {
561575 }
562576
563577 if (config .getObservabilityMode ()) {
564- sendToDataPlane (() -> super .sendMessage (new ByteArrayInputStream (bodyBytes ) ));
578+ super .sendMessage (new ByteArrayInputStream (bodyBytes ));
565579 }
566580 } catch (IOException e ) {
567- delegate () .cancel ("Failed to serialize message for External Processor" , e );
581+ rawCall .cancel ("Failed to serialize message for External Processor" , e );
568582 }
569583 }
570584
@@ -586,7 +600,7 @@ public void halfClose() {
586600 }
587601 }
588602
589- sendToDataPlane ( super :: halfClose );
603+ super . halfClose ( );
590604 }
591605
592606 @ Override
@@ -604,9 +618,9 @@ private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.B
604618 io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation = bodyResponse .getResponse ().getBodyMutation ();
605619 if (mutation .hasBody () && !mutation .getBody ().isEmpty ()) { // Mutation present
606620 byte [] mutatedBody = mutation .getBody ().toByteArray ();
607- sendToDataPlane (() -> super .sendMessage (new ByteArrayInputStream (mutatedBody ) ));
621+ super .sendMessage (new ByteArrayInputStream (mutatedBody ));
608622 } else if (mutation .getClearBody ()) { // Explicitly clear body
609- sendToDataPlane (() -> super .sendMessage (new ByteArrayInputStream (new byte [0 ]) ));
623+ super .sendMessage (new ByteArrayInputStream (new byte [0 ]));
610624 }
611625 }
612626 }
@@ -622,14 +636,9 @@ private void handleResponseBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.
622636 }
623637 }
624638
625- private void drainQueue () {
626- Runnable action ;
627- while ((action = pendingActions .poll ()) != null ) action .run ();
628- }
629-
630639 private void handleImmediateResponse (io .envoyproxy .envoy .service .ext_proc .v3 .ImmediateResponse immediate , Listener <InputStream > listener ) {
631640 io .grpc .Status status = io .grpc .Status .fromCodeValue (immediate .getGrpcStatus ().getStatus ());
632- delegate () .cancel ("Rejected by ExtProc" , null );
641+ rawCall .cancel ("Rejected by ExtProc" , null );
633642 synchronized (responseLock ) {
634643 listener .onClose (status , new Metadata ());
635644 }
@@ -642,30 +651,24 @@ private void handleFailOpen(ExtProcListener listener) {
642651 if (extProcStreamCompleted .compareAndSet (false , true )) {
643652 // The ext_proc stream is gone. "Fail open" means we proceed with the RPC
644653 // without any more processing.
645- synchronized (requestLock ) {
646- if (!headersSent ) {
647- headersSent = true ;
648- delegate ().start (listener , requestHeaders );
649- }
650- drainQueue ();
651- }
654+ activateCall ();
652655 listener .unblockAfterStreamComplete ();
653656 }
654657 }
655658 }
656659
657660 private static class ExtProcListener extends ForwardingClientCallListener .SimpleForwardingClientCallListener <InputStream > {
658- private final ClientCall <?, ?> callDelegate ; // The actual RPC call
661+ private final ClientCall <?, ?> rawCall ; // The actual RPC call
659662 private final ExtProcClientCall extProcClientCall ;
660663 private ClientCallStreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ;
661664 private Metadata savedHeaders ;
662665 private Metadata savedTrailers ;
663666 private io .grpc .Status savedStatus ;
664667
665- protected ExtProcListener (ClientCall .Listener <InputStream > delegate , ClientCall <?, ?> callDelegate ,
668+ protected ExtProcListener (ClientCall .Listener <InputStream > delegate , ClientCall <?, ?> rawCall ,
666669 ExtProcClientCall extProcClientCall ) {
667670 super (delegate );
668- this .callDelegate = callDelegate ;
671+ this .rawCall = rawCall ;
669672 this .extProcClientCall = extProcClientCall ;
670673 }
671674
@@ -737,7 +740,7 @@ public void onMessage(InputStream message) {
737740 }
738741 }
739742 } catch (IOException e ) {
740- callDelegate .cancel ("Failed to read server response" , e );
743+ rawCall .cancel ("Failed to read server response" , e );
741744 }
742745 }
743746
0 commit comments