44
55import com .google .common .collect .ImmutableList ;
66import com .google .common .io .ByteStreams ;
7+ import com .google .common .util .concurrent .MoreExecutors ;
78import com .google .protobuf .Any ;
89import com .google .protobuf .InvalidProtocolBufferException ;
910import com .google .protobuf .Message ;
2526import io .grpc .MethodDescriptor ;
2627import io .grpc .Status ;
2728import io .grpc .stub .ClientCallStreamObserver ;
29+ import io .grpc .stub .ClientResponseObserver ;
2830import io .grpc .xds .internal .grpcservice .CachedChannelManager ;
2931import io .grpc .xds .internal .grpcservice .GrpcServiceConfig ;
3032import io .grpc .xds .internal .grpcservice .GrpcServiceConfigParser ;
3537import java .io .IOException ;
3638import java .io .InputStream ;
3739import java .util .List ;
40+ import java .util .Locale ;
3841import java .util .concurrent .Executor ;
3942import java .util .concurrent .ScheduledExecutorService ;
4043import java .util .concurrent .TimeUnit ;
@@ -156,16 +159,17 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
156159 MethodDescriptor <ReqT , RespT > method ,
157160 CallOptions callOptions ,
158161 Channel next ) {
159- Executor callExecutor = callOptions .getExecutor ();
162+ Executor callExecutor = callOptions .getExecutor () != null ? callOptions .getExecutor () : MoreExecutors .directExecutor ();
163+
160164 ExternalProcessorGrpc .ExternalProcessorStub stub = ExternalProcessorGrpc .newStub (
161165 cachedChannelManager .getChannel (filterConfig .grpcServiceConfig ))
162166 .withExecutor (callExecutor );
163167
164168 if (filterConfig .grpcServiceConfig .timeout () != null && filterConfig .grpcServiceConfig .timeout ().isPresent ()) {
165- long timeoutNanos = filterConfig .grpcServiceConfig .timeout ().get ().getSeconds () * 1_000_000_000L
166- + filterConfig .grpcServiceConfig .timeout ().get ().getNano ();
167- if (timeoutNanos > 0 ) {
168- stub = stub .withDeadlineAfter (timeoutNanos , TimeUnit .NANOSECONDS );
169+ long timeoutSeconds = filterConfig .grpcServiceConfig .timeout ().get ().getSeconds ();
170+ int timeoutNanos = filterConfig .grpcServiceConfig .timeout ().get ().getNano ();
171+ if (timeoutSeconds > 0 || timeoutNanos > 0 ) {
172+ stub = stub .withDeadlineAfter (timeoutSeconds * 1_000_000_000L + timeoutNanos , TimeUnit .NANOSECONDS );
169173 }
170174 }
171175
@@ -283,14 +287,17 @@ private static io.envoyproxy.envoy.config.core.v3.HeaderMap toHeaderMap(Metadata
283287 // Skip binary headers for this basic mapping
284288 if (key .endsWith (Metadata .BINARY_HEADER_SUFFIX )) {
285289 Metadata .Key <byte []> binKey = Metadata .Key .of (key , Metadata .BINARY_BYTE_MARSHALLER );
286- for (byte [] binValue : metadata .getAll (binKey )) {
287- String encoded = com .google .common .io .BaseEncoding .base64 ().encode (binValue );
288- io .envoyproxy .envoy .config .core .v3 .HeaderValue headerValue =
289- io .envoyproxy .envoy .config .core .v3 .HeaderValue .newBuilder ()
290- .setKey (key .toLowerCase ())
291- .setValue (encoded )
292- .build ();
293- builder .addHeaders (headerValue );
290+ Iterable <byte []> values = metadata .getAll (binKey );
291+ if (values != null ) {
292+ for (byte [] binValue : values ) {
293+ String encoded = com .google .common .io .BaseEncoding .base64 ().encode (binValue );
294+ io .envoyproxy .envoy .config .core .v3 .HeaderValue headerValue =
295+ io .envoyproxy .envoy .config .core .v3 .HeaderValue .newBuilder ()
296+ .setKey (key .toLowerCase (Locale .ROOT ))
297+ .setValue (encoded )
298+ .build ();
299+ builder .addHeaders (headerValue );
300+ }
294301 }
295302 } else {
296303 Metadata .Key <String > asciiKey = Metadata .Key .of (key , Metadata .ASCII_STRING_MARSHALLER );
@@ -299,7 +306,7 @@ private static io.envoyproxy.envoy.config.core.v3.HeaderMap toHeaderMap(Metadata
299306 for (String value : values ) {
300307 io .envoyproxy .envoy .config .core .v3 .HeaderValue headerValue =
301308 io .envoyproxy .envoy .config .core .v3 .HeaderValue .newBuilder ()
302- .setKey (key .toLowerCase ())
309+ .setKey (key .toLowerCase (Locale . ROOT ))
303310 .setValue (value )
304311 .build ();
305312 builder .addHeaders (headerValue );
@@ -310,34 +317,27 @@ private static io.envoyproxy.envoy.config.core.v3.HeaderMap toHeaderMap(Metadata
310317 return builder .build ();
311318 }
312319
313- private static void applyHeaderMutations (Metadata headers , io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation mutation ) {
314- for (io .envoyproxy .envoy .config .core .v3 .HeaderValueOption opt : mutation .getSetHeadersList ()) {
315- String keyStr = opt .getHeader ().getKey ().toLowerCase ();
316- String valueStr = opt .getHeader ().getValue ();
317- boolean isBinary = keyStr .endsWith (Metadata .BINARY_HEADER_SUFFIX );
318-
319- if (isBinary ) {
320- Metadata .Key <byte []> key = Metadata .Key .of (keyStr , Metadata .BINARY_BYTE_MARSHALLER );
321- if (!opt .getAppend ().getValue ()) {
322- headers .discardAll (key );
323- }
324- byte [] decodedValue = com .google .common .io .BaseEncoding .base64 ().decode (valueStr );
325- headers .put (key , decodedValue );
326- } else {
327- Metadata .Key <String > key = Metadata .Key .of (keyStr , Metadata .ASCII_STRING_MARSHALLER );
328- if (!opt .getAppend ().getValue ()) {
329- headers .discardAll (key );
320+ private static void applyHeaderMutations (Metadata metadata , io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation mutation ) {
321+ for (io .envoyproxy .envoy .config .core .v3 .HeaderValueOption setHeader : mutation .getSetHeadersList ()) {
322+ String key = setHeader .getHeader ().getKey ();
323+ String value = setHeader .getHeader ().getValue ();
324+ try {
325+ Metadata .Key <String > metadataKey = Metadata .Key .of (key , Metadata .ASCII_STRING_MARSHALLER );
326+ if (setHeader .getAppendAction () == io .envoyproxy .envoy .config .core .v3 .HeaderValueOption .HeaderAppendAction .APPEND_IF_EXISTS_OR_ADD
327+ || setHeader .getAppendAction () == io .envoyproxy .envoy .config .core .v3 .HeaderValueOption .HeaderAppendAction .OVERWRITE_IF_EXISTS_OR_ADD ) {
328+ metadata .removeAll (metadataKey );
330329 }
331- headers .put (key , valueStr );
330+ metadata .put (metadataKey , value );
331+ } catch (IllegalArgumentException e ) {
332+ // Skip
332333 }
333334 }
334-
335- for (String keyToRemove : mutation .getRemoveHeadersList ()) {
336- String lowKey = keyToRemove .toLowerCase ();
337- if (lowKey .endsWith (Metadata .BINARY_HEADER_SUFFIX )) {
338- headers .discardAll (Metadata .Key .of (lowKey , Metadata .BINARY_BYTE_MARSHALLER ));
339- } else {
340- headers .discardAll (Metadata .Key .of (lowKey , Metadata .ASCII_STRING_MARSHALLER ));
335+ for (String removeHeader : mutation .getRemoveHeadersList ()) {
336+ try {
337+ Metadata .Key <String > metadataKey = Metadata .Key .of (removeHeader , Metadata .ASCII_STRING_MARSHALLER );
338+ metadata .removeAll (metadataKey );
339+ } catch (IllegalArgumentException e ) {
340+ // Skip
341341 }
342342 }
343343 }
@@ -361,13 +361,14 @@ private static class ExtProcClientCall extends SimpleForwardingClientCall<InputS
361361 private final ClientCall <InputStream , InputStream > rawCall ;
362362 private final ExtProcDelayedCall <InputStream , InputStream > delayedCall ;
363363 private final Object streamLock = new Object ();
364- private ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
364+ private io . grpc . stub . ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
365365 private ExtProcListener wrappedListener ;
366366
367367 private Metadata requestHeaders ;
368368 final AtomicBoolean extProcStreamFailed = new AtomicBoolean (false );
369369 final AtomicBoolean extProcStreamCompleted = new AtomicBoolean (false );
370370 final AtomicBoolean drainingExtProcStream = new AtomicBoolean (false );
371+ final AtomicBoolean halfClosed = new AtomicBoolean (false );
371372
372373 protected ExtProcClientCall (
373374 ExtProcDelayedCall <InputStream , InputStream > delayedCall ,
@@ -396,86 +397,101 @@ public void start(Listener<InputStream> responseListener, Metadata headers) {
396397 // DelayedClientCall.start will buffer the listener and headers until setCall is called.
397398 super .start (wrappedListener , headers );
398399
399- extProcClientCallRequestObserver = ( ClientCallStreamObserver < ProcessingRequest >) stub .process (new io . grpc . stub . StreamObserver < ProcessingResponse >() {
400+ stub .process (new ClientResponseObserver < ProcessingRequest , ProcessingResponse >() {
400401 @ Override
401- public void onNext (ProcessingResponse response ) {
402- if (response .hasImmediateResponse ()) {
403- handleImmediateResponse (response .getImmediateResponse (), responseListener );
404- return ;
405- }
406-
407- if (config .getObservabilityMode ()) {
408- return ;
409- }
402+ public void beforeStart (ClientCallStreamObserver <ProcessingRequest > requestStream ) {
403+ extProcClientCallRequestObserver = requestStream ;
404+ }
410405
411- if (response .getRequestDrain ()) {
412- drainingExtProcStream .set (true );
413- synchronized (streamLock ) {
414- extProcClientCallRequestObserver .onCompleted ();
406+ @ Override
407+ public void onNext (ProcessingResponse response ) {
408+ try {
409+ if (response .hasImmediateResponse ()) {
410+ handleImmediateResponse (response .getImmediateResponse (), responseListener );
411+ return ;
415412 }
416- return ;
417- }
418413
419- // 1. Client Headers
420- if (response .hasRequestHeaders ()) {
421- if (response .getRequestHeaders ().hasResponse ()) {
422- applyHeaderMutations (requestHeaders , response .getRequestHeaders ().getResponse ().getHeaderMutation ());
414+ if (config .getObservabilityMode ()) {
415+ return ;
423416 }
424- activateCall ();
425- }
426- // 2. Client Message (Request Body)
427- else if (response .hasRequestBody ()) {
428- if (response .getRequestBody ().hasResponse ()
429- && response .getRequestBody ().getResponse ().hasBodyMutation ()
430- && response .getRequestBody ().getResponse ().getBodyMutation ().hasStreamedResponse ()
431- && response .getRequestBody ().getResponse ().getBodyMutation ().getStreamedResponse ().getGrpcMessageCompressed ()) {
432- io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
433- .withDescription ("gRPC message compression not supported in ext_proc" )
434- .asRuntimeException ();
417+
418+ if (response .getRequestDrain ()) {
419+ drainingExtProcStream .set (true );
435420 synchronized (streamLock ) {
436- extProcClientCallRequestObserver .onError ( ex );
421+ extProcClientCallRequestObserver .onCompleted ( );
437422 }
438- onError (ex );
439423 return ;
440424 }
441- handleRequestBodyResponse (response .getRequestBody ());
442- }
443- // 4. Server Headers
444- else if (response .hasResponseHeaders ()) {
445- if (response .getResponseHeaders ().hasResponse ()) {
446- applyHeaderMutations (wrappedListener .savedHeaders , response .getResponseHeaders ().getResponse ().getHeaderMutation ());
425+
426+ // 1. Client Headers
427+ if (response .hasRequestHeaders ()) {
428+ if (response .getRequestHeaders ().hasResponse ()) {
429+ applyHeaderMutations (requestHeaders , response .getRequestHeaders ().getResponse ().getHeaderMutation ());
430+ }
431+ activateCall ();
447432 }
448- wrappedListener .proceedWithHeaders ();
449- }
450- // 5. Server Message (Response Body)
451- else if (response .hasResponseBody ()) {
452- if (response .getResponseBody ().hasResponse ()
453- && response .getResponseBody ().getResponse ().hasBodyMutation ()
454- && response .getResponseBody ().getResponse ().getBodyMutation ().hasStreamedResponse ()
455- && response .getResponseBody ().getResponse ().getBodyMutation ().getStreamedResponse ().getGrpcMessageCompressed ()) {
456- io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
457- .withDescription ("gRPC message compression not supported in ext_proc" )
458- .asRuntimeException ();
459- synchronized (streamLock ) {
460- extProcClientCallRequestObserver .onError (ex );
433+ // 2. Client Message (Request Body)
434+ else if (response .hasRequestBody ()) {
435+ if (response .getRequestBody ().hasResponse ()
436+ && response .getRequestBody ().getResponse ().hasBodyMutation ()) {
437+ io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation =
438+ response .getRequestBody ().getResponse ().getBodyMutation ();
439+ if (mutation .hasStreamedResponse ()
440+ && mutation .getStreamedResponse ().getGrpcMessageCompressed ()) {
441+ io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
442+ .withDescription ("gRPC message compression not supported in ext_proc" )
443+ .asRuntimeException ();
444+ synchronized (streamLock ) {
445+ extProcClientCallRequestObserver .onError (ex );
446+ }
447+ onError (ex );
448+ return ;
449+ }
461450 }
462- onError (ex );
463- return ;
451+ handleRequestBodyResponse (response .getRequestBody ());
464452 }
465- handleResponseBodyResponse (response .getResponseBody (), wrappedListener );
466- }
467- // 6. Response Trailers
468- if (response .hasResponseTrailers ()) {
469- if (response .getResponseTrailers ().hasHeaderMutation ()) {
470- applyHeaderMutations (
471- wrappedListener .savedTrailers ,
472- response .getResponseTrailers ().getHeaderMutation ()
473- );
453+ // 4. Server Headers
454+ else if (response .hasResponseHeaders ()) {
455+ if (response .getResponseHeaders ().hasResponse ()) {
456+ applyHeaderMutations (wrappedListener .savedHeaders , response .getResponseHeaders ().getResponse ().getHeaderMutation ());
457+ }
458+ wrappedListener .proceedWithHeaders ();
474459 }
475- wrappedListener .proceedWithClose ();
476- synchronized (streamLock ) {
477- extProcClientCallRequestObserver .onCompleted ();
460+ // 5. Server Message (Response Body)
461+ else if (response .hasResponseBody ()) {
462+ if (response .getResponseBody ().hasResponse ()
463+ && response .getResponseBody ().getResponse ().hasBodyMutation ()) {
464+ io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation =
465+ response .getResponseBody ().getResponse ().getBodyMutation ();
466+ if (mutation .hasStreamedResponse ()
467+ && mutation .getStreamedResponse ().getGrpcMessageCompressed ()) {
468+ io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
469+ .withDescription ("gRPC message compression not supported in ext_proc" )
470+ .asRuntimeException ();
471+ synchronized (streamLock ) {
472+ extProcClientCallRequestObserver .onError (ex );
473+ }
474+ onError (ex );
475+ return ;
476+ }
477+ }
478+ handleResponseBodyResponse (response .getResponseBody (), wrappedListener );
478479 }
480+ // 6. Response Trailers
481+ if (response .hasResponseTrailers ()) {
482+ if (response .getResponseTrailers ().hasHeaderMutation ()) {
483+ applyHeaderMutations (
484+ wrappedListener .savedTrailers ,
485+ response .getResponseTrailers ().getHeaderMutation ()
486+ );
487+ }
488+ wrappedListener .proceedWithClose ();
489+ synchronized (streamLock ) {
490+ extProcClientCallRequestObserver .onCompleted ();
491+ }
492+ }
493+ } catch (Throwable t ) {
494+ onError (t );
479495 }
480496 }
481497
@@ -501,8 +517,8 @@ public void onCompleted() {
501517 extProcClientCallRequestObserver .setOnReadyHandler (this ::onExtProcStreamReady );
502518 }
503519
504- wrappedListener . setStream ( extProcClientCallRequestObserver );
505-
520+ // Send initial request headers. This is safe here because stub.process()
521+ // has started the call.
506522 synchronized (streamLock ) {
507523 extProcClientCallRequestObserver .onNext (ProcessingRequest .newBuilder ()
508524 .setRequestHeaders (io .envoyproxy .envoy .service .ext_proc .v3 .HttpHeaders .newBuilder ()
@@ -577,6 +593,7 @@ public void sendMessage(InputStream message) {
577593
578594 @ Override
579595 public void halfClose () {
596+ halfClosed .set (true );
580597 if (extProcStreamCompleted .get ()) {
581598 super .halfClose ();
582599 return ;
@@ -609,10 +626,14 @@ private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.B
609626 if (bodyResponse .hasResponse () && bodyResponse .getResponse ().hasBodyMutation ()) {
610627 io .envoyproxy .envoy .service .ext_proc .v3 .BodyMutation mutation = bodyResponse .getResponse ().getBodyMutation ();
611628 if (mutation .hasBody () && !mutation .getBody ().isEmpty ()) {
612- byte [] mutatedBody = mutation .getBody ().toByteArray ();
613- super .sendMessage (new ByteArrayInputStream (mutatedBody ));
629+ if (!halfClosed .get ()) {
630+ byte [] mutatedBody = mutation .getBody ().toByteArray ();
631+ super .sendMessage (new ByteArrayInputStream (mutatedBody ));
632+ }
614633 } else if (mutation .getClearBody ()) {
615- super .sendMessage (new ByteArrayInputStream (new byte [0 ]));
634+ if (!halfClosed .get ()) {
635+ super .sendMessage (new ByteArrayInputStream (new byte [0 ]));
636+ }
616637 }
617638 }
618639 }
@@ -648,7 +669,6 @@ private void handleFailOpen(ExtProcListener listener) {
648669 private static class ExtProcListener extends ForwardingClientCallListener .SimpleForwardingClientCallListener <InputStream > {
649670 private final ClientCall <?, ?> rawCall ;
650671 private final ExtProcClientCall extProcClientCall ;
651- private ClientCallStreamObserver <ProcessingRequest > stream ;
652672 private Metadata savedHeaders ;
653673 private Metadata savedTrailers ;
654674 private io .grpc .Status savedStatus ;
@@ -660,8 +680,6 @@ protected ExtProcListener(ClientCall.Listener<InputStream> delegate, ClientCall<
660680 this .extProcClientCall = extProcClientCall ;
661681 }
662682
663- void setStream (ClientCallStreamObserver <ProcessingRequest > stream ) { this .stream = stream ; }
664-
665683 @ Override
666684 public void onReady () {
667685 if (extProcClientCall .drainingExtProcStream .get ()) {
0 commit comments