3030import io .grpc .xds .internal .grpcservice .GrpcServiceConfigParser ;
3131import io .grpc .xds .internal .grpcservice .GrpcServiceParseException ;
3232import io .grpc .xds .internal .grpcservice .HeaderValue ;
33+ import io .grpc .xds .internal .headermutations .HeaderMutationDisallowedException ;
34+ import io .grpc .xds .internal .headermutations .HeaderMutationFilter ;
35+ import io .grpc .xds .internal .headermutations .HeaderMutationRulesConfig ;
36+ import io .grpc .xds .internal .headermutations .HeaderMutationRulesParseException ;
37+ import io .grpc .xds .internal .headermutations .HeaderMutationRulesParser ;
38+ import io .grpc .xds .internal .headermutations .HeaderMutations ;
39+ import io .grpc .xds .internal .headermutations .HeaderMutator ;
40+ import io .grpc .xds .internal .headermutations .HeaderValueOption ;
3341import java .io .ByteArrayInputStream ;
3442import java .io .IOException ;
3543import java .io .InputStream ;
3644import java .util .Locale ;
45+ import java .util .Optional ;
3746import java .util .concurrent .Executor ;
3847import java .util .concurrent .ScheduledExecutorService ;
3948import java .util .concurrent .TimeUnit ;
@@ -86,10 +95,20 @@ public ConfigOrError<ExternalProcessorFilterConfig> parseFilterConfig(
8695 return ConfigOrError .fromError ("Invalid response_body_mode: " + mode .getResponseBodyMode () + ". Only GRPC is supported." );
8796 }
8897
98+ HeaderMutationRulesConfig mutationRulesConfig = null ;
99+ if (externalProcessor .hasMutationRules ()) {
100+ try {
101+ mutationRulesConfig = HeaderMutationRulesParser .parse (externalProcessor .getMutationRules ());
102+ } catch (HeaderMutationRulesParseException e ) {
103+ return ConfigOrError .fromError ("Error parsing HeaderMutationRules: " + e .getMessage ());
104+ }
105+ }
106+
89107 try {
90108 GrpcServiceConfig grpcServiceConfig = GrpcServiceConfigParser .parse (
91109 externalProcessor .getGrpcService (), context .bootstrapInfo (), context .serverInfo ());
92- return ConfigOrError .fromConfig (new ExternalProcessorFilterConfig (externalProcessor , grpcServiceConfig ));
110+ return ConfigOrError .fromConfig (new ExternalProcessorFilterConfig (
111+ externalProcessor , grpcServiceConfig , Optional .ofNullable (mutationRulesConfig )));
93112 } catch (GrpcServiceParseException e ) {
94113 return ConfigOrError .fromError ("Error parsing GrpcService config: " + e .getMessage ());
95114 }
@@ -113,10 +132,13 @@ static final class ExternalProcessorFilterConfig implements FilterConfig {
113132
114133 private final ExternalProcessor externalProcessor ;
115134 private final GrpcServiceConfig grpcServiceConfig ;
135+ private final Optional <HeaderMutationRulesConfig > mutationRulesConfig ;
116136
117- ExternalProcessorFilterConfig (ExternalProcessor externalProcessor , GrpcServiceConfig grpcServiceConfig ) {
137+ ExternalProcessorFilterConfig (ExternalProcessor externalProcessor ,
138+ GrpcServiceConfig grpcServiceConfig , Optional <HeaderMutationRulesConfig > mutationRulesConfig ) {
118139 this .externalProcessor = externalProcessor ;
119140 this .grpcServiceConfig = grpcServiceConfig ;
141+ this .mutationRulesConfig = mutationRulesConfig ;
120142 }
121143
122144 @ Override
@@ -208,7 +230,8 @@ public void start(Listener<ExtRespT> responseListener, Metadata headers) {
208230 new ExtProcDelayedCall <>(
209231 callOptions .getExecutor (), scheduler , callOptions .getDeadline ());
210232
211- ExtProcClientCall extProcCall = new ExtProcClientCall (delayedCall , rawCall , stub , config );
233+ ExtProcClientCall extProcCall = new ExtProcClientCall (
234+ delayedCall , rawCall , stub , config , filterConfig .mutationRulesConfig );
212235
213236 return new ClientCall <ReqT , RespT >() {
214237 @ Override
@@ -312,31 +335,6 @@ private static io.envoyproxy.envoy.config.core.v3.HeaderMap toHeaderMap(Metadata
312335 return builder .build ();
313336 }
314337
315- private static void applyHeaderMutations (Metadata metadata , io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation mutation ) {
316- for (io .envoyproxy .envoy .config .core .v3 .HeaderValueOption setHeader : mutation .getSetHeadersList ()) {
317- String key = setHeader .getHeader ().getKey ();
318- String value = setHeader .getHeader ().getValue ();
319- try {
320- Metadata .Key <String > metadataKey = Metadata .Key .of (key , Metadata .ASCII_STRING_MARSHALLER );
321- if (setHeader .getAppendAction () == io .envoyproxy .envoy .config .core .v3 .HeaderValueOption .HeaderAppendAction .APPEND_IF_EXISTS_OR_ADD
322- || setHeader .getAppendAction () == io .envoyproxy .envoy .config .core .v3 .HeaderValueOption .HeaderAppendAction .OVERWRITE_IF_EXISTS_OR_ADD ) {
323- metadata .removeAll (metadataKey );
324- }
325- metadata .put (metadataKey , value );
326- } catch (IllegalArgumentException e ) {
327- // Skip
328- }
329- }
330- for (String removeHeader : mutation .getRemoveHeadersList ()) {
331- try {
332- Metadata .Key <String > metadataKey = Metadata .Key .of (removeHeader , Metadata .ASCII_STRING_MARSHALLER );
333- metadata .removeAll (metadataKey );
334- } catch (IllegalArgumentException e ) {
335- // Skip
336- }
337- }
338- }
339-
340338 /**
341339 * A local subclass to expose the protected constructor of DelayedClientCall.
342340 */
@@ -358,6 +356,8 @@ private static class ExtProcClientCall extends SimpleForwardingClientCall<InputS
358356 private final Object streamLock = new Object ();
359357 private io .grpc .stub .ClientCallStreamObserver <ProcessingRequest > extProcClientCallRequestObserver ;
360358 private ExtProcListener wrappedListener ;
359+ private final HeaderMutationFilter mutationFilter ;
360+ private final HeaderMutator mutator = HeaderMutator .create ();
361361
362362 private Metadata requestHeaders ;
363363 final AtomicBoolean extProcStreamFailed = new AtomicBoolean (false );
@@ -369,12 +369,14 @@ protected ExtProcClientCall(
369369 ExtProcDelayedCall <InputStream , InputStream > delayedCall ,
370370 ClientCall <InputStream , InputStream > rawCall ,
371371 ExternalProcessorGrpc .ExternalProcessorStub stub ,
372- ExternalProcessor config ) {
372+ ExternalProcessor config ,
373+ Optional <HeaderMutationRulesConfig > mutationRulesConfig ) {
373374 super (delayedCall );
374375 this .delayedCall = delayedCall ;
375376 this .rawCall = rawCall ;
376377 this .stub = stub ;
377378 this .config = config ;
379+ this .mutationFilter = new HeaderMutationFilter (mutationRulesConfig );
378380 }
379381
380382 private void activateCall () {
@@ -384,6 +386,34 @@ private void activateCall() {
384386 }
385387 }
386388
389+ private void applyHeaderMutations (Metadata metadata ,
390+ io .envoyproxy .envoy .service .ext_proc .v3 .HeaderMutation mutation )
391+ throws HeaderMutationDisallowedException {
392+ ImmutableList .Builder <HeaderValueOption > headersToModify = ImmutableList .builder ();
393+ for (io .envoyproxy .envoy .config .core .v3 .HeaderValueOption protoOption : mutation .getSetHeadersList ()) {
394+ io .envoyproxy .envoy .config .core .v3 .HeaderValue protoHeader = protoOption .getHeader ();
395+ HeaderValue headerValue ;
396+ if (protoHeader .getKey ().endsWith (Metadata .BINARY_HEADER_SUFFIX )) {
397+ headerValue = HeaderValue .create (protoHeader .getKey (),
398+ com .google .protobuf .ByteString .copyFrom (
399+ com .google .common .io .BaseEncoding .base64 ().decode (protoHeader .getValue ())));
400+ } else {
401+ headerValue = HeaderValue .create (protoHeader .getKey (), protoHeader .getValue ());
402+ }
403+ headersToModify .add (HeaderValueOption .create (
404+ headerValue ,
405+ HeaderValueOption .HeaderAppendAction .valueOf (protoOption .getAppendAction ().name ()),
406+ protoOption .getKeepEmptyValue ()));
407+ }
408+
409+ HeaderMutations mutations = HeaderMutations .create (
410+ headersToModify .build (),
411+ ImmutableList .copyOf (mutation .getRemoveHeadersList ()));
412+
413+ HeaderMutations filteredMutations = mutationFilter .filter (mutations );
414+ mutator .applyMutations (filteredMutations , metadata );
415+ }
416+
387417 @ Override
388418 public void start (Listener <InputStream > responseListener , Metadata headers ) {
389419 this .requestHeaders = headers ;
@@ -485,9 +515,9 @@ else if (response.hasResponseBody()) {
485515 extProcClientCallRequestObserver .onCompleted ();
486516 }
487517 }
488- // For robustness. For any internal processing failure make sure the internal state
489- // machine is notified and the dataplane call is properly cancelled (or failed-open if
490- // configured)
518+ // For robustness. For any internal processing failure, including
519+ // HeaderMutationDisallowedException, make sure the internal state machine is notified
520+ // and the dataplane call is properly cancelled (or failed-open if configured)
491521 } catch (Throwable t ) {
492522 onError (t );
493523 }
0 commit comments