Skip to content

Commit f1ea3ec

Browse files
committed
Request header mutation unit test.
1 parent fe2c7cf commit f1ea3ec

File tree

2 files changed

+245
-0
lines changed

2 files changed

+245
-0
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.protobuf.Any;
88
import com.google.protobuf.InvalidProtocolBufferException;
99
import com.google.protobuf.Message;
10+
import io.envoyproxy.envoy.config.core.v3.GrpcService;
1011
import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor;
1112
import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ProcessingMode;
1213
import io.envoyproxy.envoy.service.ext_proc.v3.ExternalProcessorGrpc;
@@ -32,6 +33,7 @@
3233
import java.io.ByteArrayInputStream;
3334
import java.io.IOException;
3435
import java.io.InputStream;
36+
import java.util.List;
3537
import java.util.concurrent.ScheduledExecutorService;
3638
import java.util.concurrent.TimeUnit;
3739
import java.util.concurrent.atomic.AtomicBoolean;
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
package io.grpc.xds;
2+
3+
import static com.google.common.truth.Truth.assertThat;
4+
5+
import com.google.protobuf.Any;
6+
import io.envoyproxy.envoy.config.core.v3.GrpcService;
7+
import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor;
8+
import io.envoyproxy.envoy.extensions.filters.http.ext_proc.v3.ProcessingMode;
9+
import io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse;
10+
import io.envoyproxy.envoy.service.ext_proc.v3.CommonResponse;
11+
import io.envoyproxy.envoy.service.ext_proc.v3.ExternalProcessorGrpc;
12+
import io.envoyproxy.envoy.service.ext_proc.v3.HeaderMutation;
13+
import io.envoyproxy.envoy.service.ext_proc.v3.HeadersResponse;
14+
import io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest;
15+
import io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse;
16+
import io.grpc.CallOptions;
17+
import io.grpc.Channel;
18+
import io.grpc.ClientInterceptor;
19+
import io.grpc.ClientInterceptors;
20+
import io.grpc.InsecureChannelCredentials;
21+
import io.grpc.Metadata;
22+
import io.grpc.MethodDescriptor;
23+
import io.grpc.ServerCall;
24+
import io.grpc.ServerCallHandler;
25+
import io.grpc.ServerInterceptor;
26+
import io.grpc.ServerInterceptors;
27+
import io.grpc.ServerServiceDefinition;
28+
import io.grpc.inprocess.InProcessChannelBuilder;
29+
import io.grpc.inprocess.InProcessServerBuilder;
30+
import io.grpc.stub.ClientCalls;
31+
import io.grpc.stub.ServerCalls;
32+
import io.grpc.stub.StreamObserver;
33+
import io.grpc.testing.GrpcCleanupRule;
34+
import io.grpc.util.MutableHandlerRegistry;
35+
import io.grpc.xds.internal.grpcservice.ChannelCredsConfig;
36+
import io.grpc.xds.internal.grpcservice.ConfiguredChannelCredentials;
37+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContext;
38+
import io.grpc.xds.internal.grpcservice.GrpcServiceXdsContextProvider;
39+
import java.io.ByteArrayInputStream;
40+
import java.io.InputStream;
41+
import java.util.ArrayList;
42+
import java.util.List;
43+
import java.util.Optional;
44+
import java.util.concurrent.atomic.AtomicReference;
45+
import org.junit.Before;
46+
import org.junit.Rule;
47+
import org.junit.Test;
48+
import org.junit.runner.RunWith;
49+
import org.junit.runners.JUnit4;
50+
51+
/**
52+
* Unit tests for {@link ExternalProcessorFilter}.
53+
*/
54+
@RunWith(JUnit4.class)
55+
public class ExternalProcessorFilterTest {
56+
@Rule
57+
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
58+
59+
private final MutableHandlerRegistry dataPlaneServiceRegistry = new MutableHandlerRegistry();
60+
private final MutableHandlerRegistry extProcServiceRegistry = new MutableHandlerRegistry();
61+
62+
private Channel dataPlaneChannel;
63+
private String extProcServerName;
64+
private ExternalProcessorFilter filter;
65+
66+
// Define a simple test service
67+
private static final MethodDescriptor<String, String> METHOD_SAY_HELLO =
68+
MethodDescriptor.<String, String>newBuilder()
69+
.setType(MethodDescriptor.MethodType.UNARY)
70+
.setFullMethodName("test.TestService/SayHello")
71+
.setRequestMarshaller(new StringMarshaller())
72+
.setResponseMarshaller(new StringMarshaller())
73+
.build();
74+
75+
private static class StringMarshaller implements MethodDescriptor.Marshaller<String> {
76+
@Override
77+
public InputStream stream(String value) {
78+
return new ByteArrayInputStream(value.getBytes());
79+
}
80+
81+
@Override
82+
public String parse(InputStream stream) {
83+
try {
84+
java.io.ByteArrayOutputStream buffer = new java.io.ByteArrayOutputStream();
85+
int nRead;
86+
byte[] data = new byte[1024];
87+
while ((nRead = stream.read(data, 0, data.length)) != -1) {
88+
buffer.write(data, 0, nRead);
89+
}
90+
buffer.flush();
91+
return new String(buffer.toByteArray());
92+
} catch (java.io.IOException e) {
93+
throw new RuntimeException(e);
94+
}
95+
}
96+
}
97+
98+
private static class InProcessChannelCredsConfig implements ChannelCredsConfig {
99+
@Override
100+
public String type() {
101+
return "inprocess";
102+
}
103+
}
104+
105+
@Before
106+
public void setUp() throws Exception {
107+
String dataPlaneServerName = InProcessServerBuilder.generateName();
108+
grpcCleanup.register(InProcessServerBuilder.forName(dataPlaneServerName)
109+
.fallbackHandlerRegistry(dataPlaneServiceRegistry).directExecutor().build().start());
110+
111+
extProcServerName = InProcessServerBuilder.generateName();
112+
grpcCleanup.register(InProcessServerBuilder.forName(extProcServerName)
113+
.fallbackHandlerRegistry(extProcServiceRegistry).directExecutor().build().start());
114+
115+
dataPlaneChannel = grpcCleanup.register(
116+
InProcessChannelBuilder.forName(dataPlaneServerName).directExecutor().build());
117+
}
118+
119+
private ExternalProcessorFilter.ExternalProcessorFilterConfig createFilterConfig() {
120+
GrpcService grpcService = GrpcService.newBuilder()
121+
.setGoogleGrpc(GrpcService.GoogleGrpc.newBuilder()
122+
// Important: Use "in-process:" scheme so Grpc.newChannelBuilder resolves it correctly
123+
.setTargetUri("in-process:" + extProcServerName)
124+
.setStatPrefix("ext_proc")
125+
.build())
126+
.build();
127+
128+
ExternalProcessor externalProcessor = ExternalProcessor.newBuilder()
129+
.setGrpcService(grpcService)
130+
.setProcessingMode(ProcessingMode.newBuilder()
131+
.setRequestBodyMode(ProcessingMode.BodySendMode.GRPC)
132+
.setResponseBodyMode(ProcessingMode.BodySendMode.GRPC)
133+
.build())
134+
.build();
135+
136+
ExternalProcessorFilter.Provider provider = new ExternalProcessorFilter.Provider();
137+
138+
// Provide a context that supplies Insecure credentials for testing
139+
GrpcServiceXdsContextProvider contextProvider = targetUri -> {
140+
ConfiguredChannelCredentials credentials = ConfiguredChannelCredentials.create(
141+
InsecureChannelCredentials.create(),
142+
new InProcessChannelCredsConfig());
143+
144+
GrpcServiceXdsContext.AllowedGrpcService allowedGrpcService =
145+
GrpcServiceXdsContext.AllowedGrpcService.builder()
146+
.configuredChannelCredentials(credentials)
147+
.build();
148+
return GrpcServiceXdsContext.create(false, Optional.of(allowedGrpcService), true);
149+
};
150+
151+
// 1. Create the filter instance via the provider
152+
this.filter = provider.newInstance("ext-proc", contextProvider);
153+
154+
// 2. Parse the config using the provider
155+
ConfigOrError<ExternalProcessorFilter.ExternalProcessorFilterConfig> configOrError =
156+
provider.parseFilterConfig(Any.pack(externalProcessor));
157+
158+
assertThat(configOrError.errorDetail).isNull();
159+
return configOrError.config;
160+
}
161+
162+
@Test
163+
public void requestHeadersMutated() throws Exception {
164+
ExternalProcessorFilter.ExternalProcessorFilterConfig filterConfig = createFilterConfig();
165+
166+
// Use the filter instance created in createFilterConfig()
167+
ClientInterceptor interceptor = filter.buildClientInterceptor(filterConfig, null, null);
168+
Channel interceptedChannel = ClientInterceptors.intercept(dataPlaneChannel, interceptor);
169+
170+
// Data Plane Server
171+
AtomicReference<Metadata> receivedHeaders = new AtomicReference<>();
172+
173+
ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("test.TestService")
174+
.addMethod(METHOD_SAY_HELLO, ServerCalls.asyncUnaryCall(
175+
(request, responseObserver) -> {
176+
responseObserver.onNext("Hello " + request);
177+
responseObserver.onCompleted();
178+
}))
179+
.build();
180+
181+
ServerServiceDefinition interceptedServiceDef = ServerInterceptors.intercept(
182+
serviceDef,
183+
new ServerInterceptor() {
184+
@Override
185+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
186+
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
187+
receivedHeaders.set(headers);
188+
return next.startCall(call, headers);
189+
}
190+
});
191+
192+
dataPlaneServiceRegistry.addService(interceptedServiceDef);
193+
194+
// Ext-Proc Server
195+
List<ProcessingRequest> receivedRequests = new ArrayList<>();
196+
ExternalProcessorGrpc.ExternalProcessorImplBase extProcImpl = new ExternalProcessorGrpc.ExternalProcessorImplBase() {
197+
@Override
198+
public StreamObserver<ProcessingRequest> process(StreamObserver<ProcessingResponse> responseObserver) {
199+
return new StreamObserver<ProcessingRequest>() {
200+
@Override
201+
public void onNext(ProcessingRequest request) {
202+
receivedRequests.add(request);
203+
if (request.hasRequestHeaders()) {
204+
responseObserver.onNext(ProcessingResponse.newBuilder()
205+
.setRequestHeaders(HeadersResponse.newBuilder()
206+
.setResponse(CommonResponse.newBuilder()
207+
.setHeaderMutation(HeaderMutation.newBuilder()
208+
.addSetHeaders(io.envoyproxy.envoy.config.core.v3.HeaderValueOption.newBuilder()
209+
.setHeader(io.envoyproxy.envoy.config.core.v3.HeaderValue.newBuilder()
210+
.setKey("x-custom-header")
211+
.setValue("custom-value")
212+
.build())
213+
.build())
214+
.build())
215+
.build())
216+
.build())
217+
.build());
218+
} else if (request.hasRequestBody()) {
219+
responseObserver.onNext(ProcessingResponse.newBuilder()
220+
.setRequestBody(BodyResponse.newBuilder().build())
221+
.build());
222+
}
223+
}
224+
225+
@Override
226+
public void onError(Throwable t) {}
227+
228+
@Override
229+
public void onCompleted() {
230+
responseObserver.onCompleted();
231+
}
232+
};
233+
}
234+
};
235+
extProcServiceRegistry.addService(extProcImpl);
236+
237+
String reply = ClientCalls.blockingUnaryCall(interceptedChannel, METHOD_SAY_HELLO, CallOptions.DEFAULT, "World");
238+
239+
assertThat(reply).isEqualTo("Hello World");
240+
Metadata.Key<String> customHeaderKey = Metadata.Key.of("x-custom-header", Metadata.ASCII_STRING_MARSHALLER);
241+
assertThat(receivedHeaders.get().get(customHeaderKey)).isEqualTo("custom-value");
242+
}
243+
}

0 commit comments

Comments
 (0)