|
33 | 33 | import java.io.IOException; |
34 | 34 | import java.io.InputStream; |
35 | 35 | import java.util.concurrent.ScheduledExecutorService; |
| 36 | +import java.util.concurrent.TimeUnit; |
36 | 37 | import java.util.concurrent.atomic.AtomicBoolean; |
37 | 38 | import javax.annotation.Nullable; |
38 | 39 |
|
39 | 40 | public class ExternalProcessorFilter implements Filter { |
40 | 41 | static final String TYPE_URL = "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor"; |
41 | 42 |
|
42 | 43 | final String filterInstanceName; |
43 | | - ManagedChannel grpcServiceChannel; |
44 | | - ExternalProcessorGrpc.ExternalProcessorStub externalProcessorStub; |
45 | 44 | private final Object lock = new Object(); |
46 | 45 |
|
47 | 46 | public ExternalProcessorFilter(String name) { |
@@ -154,6 +153,15 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( |
154 | 153 | Channel next) { |
155 | 154 | ExternalProcessorGrpc.ExternalProcessorStub stub = ExternalProcessorGrpc.newStub( |
156 | 155 | cachedChannelManager.getChannel(filterConfig.grpcServiceConfig)); |
| 156 | + |
| 157 | + if (filterConfig.grpcServiceConfig.timeout() != null && filterConfig.grpcServiceConfig.timeout().isPresent()) { |
| 158 | + long timeoutNanos = filterConfig.grpcServiceConfig.timeout().get().getSeconds() * 1_000_000_000L |
| 159 | + + filterConfig.grpcServiceConfig.timeout().get().getNano(); |
| 160 | + if (timeoutNanos > 0) { |
| 161 | + stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS); |
| 162 | + } |
| 163 | + } |
| 164 | + |
157 | 165 | ExternalProcessor config = filterConfig.externalProcessor; |
158 | 166 |
|
159 | 167 | MethodDescriptor<InputStream, InputStream> rawMethod = method.toBuilder(RAW_MARSHALLER, RAW_MARSHALLER).build(); |
@@ -372,7 +380,7 @@ else if (response.hasRequestBody()) { |
372 | 380 | // 3. We don't send request trailers in gRPC for half close. |
373 | 381 | // 4. Server Headers |
374 | 382 | else if (response.hasResponseHeaders()) { |
375 | | - if (response.getResponseHeaders().hasResponse()) { |
| 383 | + if (response.hasResponseHeaders() && response.getResponseHeaders().hasResponse()) { |
376 | 384 | applyHeaderMutations(wrappedListener.savedHeaders, response.getResponseHeaders().getResponse().getHeaderMutation()); |
377 | 385 | } |
378 | 386 | wrappedListener.proceedWithHeaders(); |
|
0 commit comments