Skip to content

Commit b1a3a64

Browse files
committed
nit
1 parent 23f840b commit b1a3a64

File tree

1 file changed

+31
-11
lines changed

1 file changed

+31
-11
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@
1515
import io.grpc.Channel;
1616
import io.grpc.ClientCall;
1717
import io.grpc.ClientInterceptor;
18-
1918
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
19+
import io.grpc.ForwardingClientCallListener;
2020
import io.grpc.Metadata;
2121
import io.grpc.MethodDescriptor;
22+
import io.grpc.Status;
2223
import java.io.ByteArrayInputStream;
2324
import java.io.IOException;
2425
import java.io.InputStream;
25-
import javax.annotation.Nullable;
2626
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import javax.annotation.Nullable;
2729

2830
public class ExternalProcessorFilter implements Filter {
2931
static final String TYPE_URL = "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor";
@@ -72,7 +74,7 @@ public ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message r
7274
@Nullable
7375
@Override
7476
public ClientInterceptor buildClientInterceptor(FilterConfig filterConfig,
75-
@Nullable FilterConfig overrideConfig, ScheduledExecutorService scheduler) {
77+
@Nullable FilterConfig overrideConfig, ScheduledExecutorService scheduler) {
7678
return new ExternalProcessorInterceptor((ExternalProcessorFilterConfig) filterConfig, overrideConfig, scheduler);
7779
}
7880

@@ -96,7 +98,7 @@ static final class ExternalProcessorInterceptor implements ClientInterceptor {
9698
private final ScheduledExecutorService scheduler;
9799

98100
ExternalProcessorInterceptor(ExternalProcessorFilterConfig filterConfig,
99-
@Nullable FilterConfig overrideConfig, ScheduledExecutorService scheduler) {
101+
@Nullable FilterConfig overrideConfig, ScheduledExecutorService scheduler) {
100102
this.filterConfig = filterConfig;
101103
this.overrideConfig = overrideConfig;
102104
this.scheduler = scheduler;
@@ -199,10 +201,11 @@ private static class ExtProcClientCall<ReqT, RespT> extends SimpleForwardingClie
199201
private Metadata requestHeaders;
200202
private final java.util.Queue<Runnable> pendingActions = new java.util.concurrent.ConcurrentLinkedQueue<>();
201203
private ReqT lastRequestMessage;
204+
final AtomicBoolean extProcStreamFailed = new AtomicBoolean(false);
202205

203206
protected ExtProcClientCall(ClientCall<ReqT, RespT> delegate,
204-
ExternalProcessorGrpc.ExternalProcessorStub stub,
205-
MethodDescriptor<ReqT, RespT> method) {
207+
ExternalProcessorGrpc.ExternalProcessorStub stub,
208+
MethodDescriptor<ReqT, RespT> method) {
206209
super(delegate);
207210
this.stub = stub;
208211
this.method = method;
@@ -211,7 +214,7 @@ protected ExtProcClientCall(ClientCall<ReqT, RespT> delegate,
211214
@Override
212215
public void start(Listener<RespT> responseListener, Metadata headers) {
213216
this.requestHeaders = headers;
214-
ExternalProcessorInterceptor.ExtProcListener<RespT> wrappedListener = new ExternalProcessorInterceptor.ExtProcListener<>(responseListener, delegate(), method);
217+
ExternalProcessorInterceptor.ExtProcListener<ReqT, RespT> wrappedListener = new ExternalProcessorInterceptor.ExtProcListener<>(responseListener, delegate(), method, this);
215218

216219
requestObserver = stub.process(new io.grpc.stub.StreamObserver<io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse>() {
217220
@Override
@@ -262,7 +265,13 @@ else if (response.hasResponseBody()) {
262265
}
263266
}
264267

265-
@Override public void onError(Throwable t) { delegate().cancel("ExtProc failed", t); }
268+
@Override
269+
public void onError(Throwable t) {
270+
if (extProcStreamFailed.compareAndSet(false, true)) {
271+
delegate().cancel("External processor stream failed", t);
272+
}
273+
}
274+
266275
@Override public void onCompleted() {}
267276
});
268277

@@ -330,7 +339,7 @@ private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.B
330339
// If no response is present, the processor chose to drop the message.
331340
}
332341

333-
private void handleResponseBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse bodyResponse, ExternalProcessorInterceptor.ExtProcListener<RespT> listener) {
342+
private void handleResponseBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse bodyResponse, ExternalProcessorInterceptor.ExtProcListener<ReqT, RespT> listener) {
334343
// Pass the (potentially modified) message to the real listener
335344
listener.proceedWithNextMessage();
336345
}
@@ -362,20 +371,23 @@ public void halfClose() {
362371
}
363372
}
364373

365-
private static class ExtProcListener<RespT> extends io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
374+
private static class ExtProcListener<ReqT, RespT> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT> {
366375
private final MethodDescriptor<?, RespT> method;
367376
private final ClientCall<?, RespT> callDelegate; // The actual RPC call
377+
private final ExtProcClientCall<ReqT, RespT> call;
368378
private io.grpc.stub.StreamObserver<io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest> stream;
369379
private Metadata savedHeaders;
370380
private Metadata savedTrailers;
371381
private io.grpc.Status savedStatus;
372382
private final java.util.Queue<RespT> messageQueue = new java.util.concurrent.ConcurrentLinkedQueue<>();
373383
private RespT lastMessage;
374384

375-
protected ExtProcListener(ClientCall.Listener<RespT> delegate, ClientCall<?, RespT> callDelegate, MethodDescriptor<?, RespT> method) {
385+
protected ExtProcListener(ClientCall.Listener<RespT> delegate, ClientCall<?, RespT> callDelegate,
386+
MethodDescriptor<?, RespT> method, ExtProcClientCall<ReqT, RespT> call) {
376387
super(delegate);
377388
this.method = method;
378389
this.callDelegate = callDelegate;
390+
this.call = call;
379391
}
380392

381393
void setStream(io.grpc.stub.StreamObserver<io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest> stream) { this.stream = stream; }
@@ -403,6 +415,14 @@ public void onMessage(RespT message) {
403415

404416
@Override
405417
public void onClose(io.grpc.Status status, Metadata trailers) {
418+
if (call.extProcStreamFailed.get()) {
419+
// The ext_proc stream died, which caused delegate().cancel() to be called, leading here.
420+
// The incoming status will be CANCELLED. We must not attempt to forward the server's
421+
// response trailers to the now-dead ext_proc stream. Instead, we close the
422+
// application's call with UNAVAILABLE as per the gRFC.
423+
super.onClose(Status.UNAVAILABLE.withDescription("External processor stream failed").withCause(status.getCause()), new Metadata());
424+
return;
425+
}
406426
this.savedStatus = status;
407427
this.savedTrailers = trailers;
408428

0 commit comments

Comments
 (0)