22
33import static com .google .common .base .Preconditions .checkNotNull ;
44
5+ import com .google .common .collect .ImmutableList ;
56import com .google .common .io .ByteStreams ;
67import com .google .protobuf .Any ;
78import com .google .protobuf .InvalidProtocolBufferException ;
89import com .google .protobuf .Message ;
9- import io .envoyproxy .envoy .config .core .v3 .GrpcService ;
1010import io .envoyproxy .envoy .extensions .filters .http .ext_proc .v3 .ExternalProcessor ;
1111import io .envoyproxy .envoy .extensions .filters .http .ext_proc .v3 .ProcessingMode ;
1212import io .envoyproxy .envoy .service .ext_proc .v3 .ExternalProcessorGrpc ;
1919import io .grpc .ClientInterceptor ;
2020import io .grpc .ForwardingClientCall .SimpleForwardingClientCall ;
2121import io .grpc .ForwardingClientCallListener ;
22- import io .grpc .ManagedChannel ;
2322import io .grpc .Metadata ;
2423import io .grpc .MethodDescriptor ;
2524import io .grpc .Status ;
2928import io .grpc .xds .internal .grpcservice .GrpcServiceConfigParser ;
3029import io .grpc .xds .internal .grpcservice .GrpcServiceParseException ;
3130import io .grpc .xds .internal .grpcservice .GrpcServiceXdsContextProvider ;
31+ import io .grpc .xds .internal .grpcservice .HeaderValue ;
3232import java .io .ByteArrayInputStream ;
3333import java .io .IOException ;
3434import java .io .InputStream ;
@@ -41,7 +41,6 @@ public class ExternalProcessorFilter implements Filter {
4141 static final String TYPE_URL = "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor" ;
4242
4343 final String filterInstanceName ;
44- private final Object lock = new Object ();
4544
4645 public ExternalProcessorFilter (String name ) {
4746 filterInstanceName = checkNotNull (name , "name" );
@@ -162,6 +161,36 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
162161 }
163162 }
164163
164+ ImmutableList <HeaderValue > initialMetadata = filterConfig .grpcServiceConfig .initialMetadata ();
165+ if (initialMetadata != null && !initialMetadata .isEmpty ()) {
166+ stub = stub .withInterceptors (new ClientInterceptor () {
167+ @ Override
168+ public <ExtReqT , ExtRespT > ClientCall <ExtReqT , ExtRespT > interceptCall (
169+ MethodDescriptor <ExtReqT , ExtRespT > extMethod , CallOptions extCallOptions , Channel extNext ) {
170+ return new SimpleForwardingClientCall <ExtReqT , ExtRespT >(extNext .newCall (extMethod , extCallOptions )) {
171+ @ Override
172+ public void start (Listener <ExtRespT > responseListener , Metadata headers ) {
173+ for (HeaderValue headerValue : initialMetadata ) {
174+ String key = headerValue .key ();
175+ if (key .endsWith (Metadata .BINARY_HEADER_SUFFIX )) {
176+ if (headerValue .rawValue ().isPresent ()) {
177+ Metadata .Key <byte []> metadataKey = Metadata .Key .of (key , Metadata .BINARY_BYTE_MARSHALLER );
178+ headers .put (metadataKey , headerValue .rawValue ().get ().toByteArray ());
179+ }
180+ } else {
181+ if (headerValue .value ().isPresent ()) {
182+ Metadata .Key <String > metadataKey = Metadata .Key .of (key , Metadata .ASCII_STRING_MARSHALLER );
183+ headers .put (metadataKey , headerValue .value ().get ());
184+ }
185+ }
186+ }
187+ super .start (responseListener , headers );
188+ }
189+ };
190+ }
191+ });
192+ }
193+
165194 ExternalProcessor config = filterConfig .externalProcessor ;
166195
167196 MethodDescriptor <InputStream , InputStream > rawMethod = method .toBuilder (RAW_MARSHALLER , RAW_MARSHALLER ).build ();
0 commit comments