Skip to content

Commit 9708c0b

Browse files
committed
The refactoring to improve concurrency in the External Processor filter is complete. The implementation now employs a more granular three-lock strategy:
1. streamLock: Guards all interactions with the extProcClientCallRequestObserver. This ensures the gRPC StreamObserver to the external processor is never accessed concurrently, protecting its internal state. 2. requestLock: Manages the outbound flow control. It guards the headersSent flag and the pendingActions queue, coordinating the transition from the initial buffering phase to active delivery to the backend server. 3. responseLock: Serializes all callbacks to the application's Listener (onHeaders, onMessage, onClose, onReady). It also guards shared response state like savedHeaders and savedStatus. This ensures strict compliance with the gRPC contract while fixing a potential race condition in onExternalBody. By decoupling the request and response data planes, the filter now supports full-duplex concurrency where outbound messages do not block inbound server responses. All lock acquisitions were carefully refactored to be sequential, maintaining a consistent order and guaranteeing deadlock-free execution.
1 parent 61db033 commit 9708c0b

File tree

1 file changed

+59
-38
lines changed

1 file changed

+59
-38
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 59 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,9 @@ private static void applyHeaderMutations(Metadata headers, io.envoyproxy.envoy.s
341341
private static class ExtProcClientCall extends SimpleForwardingClientCall<InputStream, InputStream> {
342342
private final ExternalProcessorGrpc.ExternalProcessorStub stub;
343343
private final ExternalProcessor config;
344-
private final Object lock = new Object();
344+
private final Object requestLock = new Object();
345+
private final Object responseLock = new Object();
346+
private final Object streamLock = new Object();
345347
private ClientCallStreamObserver<ProcessingRequest> extProcClientCallRequestObserver;
346348
private ExtProcListener wrappedListener;
347349

@@ -361,7 +363,7 @@ protected ExtProcClientCall(ClientCall<InputStream, InputStream> delegate,
361363
}
362364

363365
private void sendToDataPlane(Runnable action) {
364-
synchronized (lock) {
366+
synchronized (requestLock) {
365367
if (headersSent) {
366368
action.run();
367369
} else {
@@ -389,7 +391,7 @@ public void onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse re
389391

390392
if (response.getRequestDrain()) {
391393
drainingExtProcStream.set(true);
392-
synchronized (lock) {
394+
synchronized (streamLock) {
393395
extProcClientCallRequestObserver.onCompleted(); // Sends half-close to ext_proc
394396
}
395397
return;
@@ -402,7 +404,7 @@ public void onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse re
402404
if (response.getRequestHeaders().hasResponse()) {
403405
applyHeaderMutations(requestHeaders, response.getRequestHeaders().getResponse().getHeaderMutation());
404406
}
405-
synchronized (lock) {
407+
synchronized (requestLock) {
406408
headersSent = true;
407409
delegate().start(wrappedListener, requestHeaders);
408410
drainQueue();
@@ -417,7 +419,7 @@ else if (response.hasRequestBody()) {
417419
io.grpc.StatusRuntimeException ex = io.grpc.Status.INTERNAL
418420
.withDescription("gRPC message compression not supported in ext_proc")
419421
.asRuntimeException();
420-
synchronized (lock) {
422+
synchronized (streamLock) {
421423
extProcClientCallRequestObserver.onError(ex);
422424
}
423425
onError(ex);
@@ -442,7 +444,7 @@ else if (response.hasResponseBody()) {
442444
io.grpc.StatusRuntimeException ex = io.grpc.Status.INTERNAL
443445
.withDescription("gRPC message compression not supported in ext_proc")
444446
.asRuntimeException();
445-
synchronized (lock) {
447+
synchronized (streamLock) {
446448
extProcClientCallRequestObserver.onError(ex);
447449
}
448450
onError(ex);
@@ -461,7 +463,7 @@ else if (response.hasResponseBody()) {
461463
}
462464
// Finally notify the local app of the completion
463465
wrappedListener.proceedWithClose();
464-
synchronized (lock) {
466+
synchronized (streamLock) {
465467
extProcClientCallRequestObserver.onCompleted();
466468
}
467469
}
@@ -491,7 +493,7 @@ public void onCompleted() {
491493

492494
wrappedListener.setStream(extProcClientCallRequestObserver);
493495

494-
synchronized (lock) {
496+
synchronized (streamLock) {
495497
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
496498
.setRequestHeaders(io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders.newBuilder()
497499
.setHeaders(toHeaderMap(headers))
@@ -500,7 +502,7 @@ public void onCompleted() {
500502
}
501503

502504
if (config.getObservabilityMode()) {
503-
synchronized (lock) {
505+
synchronized (requestLock) {
504506
headersSent = true;
505507
delegate().start(wrappedListener, headers);
506508
}
@@ -522,7 +524,7 @@ public boolean isReady() {
522524
return false;
523525
}
524526
if (config.getObservabilityMode()) {
525-
synchronized (lock) {
527+
synchronized (streamLock) {
526528
return super.isReady() && extProcClientCallRequestObserver != null
527529
&& extProcClientCallRequestObserver.isReady();
528530
}
@@ -547,7 +549,7 @@ public void sendMessage(InputStream message) {
547549

548550
try {
549551
byte[] bodyBytes = ByteStreams.toByteArray(message);
550-
synchronized (lock) {
552+
synchronized (streamLock) {
551553
if (!extProcStreamCompleted.get()) {
552554
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
553555
.setRequestBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
@@ -574,7 +576,7 @@ public void halfClose() {
574576
}
575577

576578
// Signal end of request body stream to the external processor.
577-
synchronized (lock) {
579+
synchronized (streamLock) {
578580
if (!extProcStreamCompleted.get()) {
579581
extProcClientCallRequestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
580582
.setRequestBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
@@ -589,7 +591,7 @@ public void halfClose() {
589591

590592
@Override
591593
public void cancel(@Nullable String message, @Nullable Throwable cause) {
592-
synchronized (lock) {
594+
synchronized (streamLock) {
593595
if (extProcClientCallRequestObserver != null) {
594596
extProcClientCallRequestObserver.onError(Status.CANCELLED.withDescription(message).withCause(cause).asRuntimeException());
595597
}
@@ -628,8 +630,10 @@ private void drainQueue() {
628630
private void handleImmediateResponse(io.envoyproxy.envoy.service.ext_proc.v3.ImmediateResponse immediate, Listener<InputStream> listener) {
629631
io.grpc.Status status = io.grpc.Status.fromCodeValue(immediate.getGrpcStatus().getStatus());
630632
delegate().cancel("Rejected by ExtProc", null);
631-
listener.onClose(status, new Metadata());
632-
synchronized (lock) {
633+
synchronized (responseLock) {
634+
listener.onClose(status, new Metadata());
635+
}
636+
synchronized (streamLock) {
633637
extProcClientCallRequestObserver.onCompleted();
634638
}
635639
}
@@ -638,7 +642,7 @@ private void handleFailOpen(ExtProcListener listener) {
638642
if (extProcStreamCompleted.compareAndSet(false, true)) {
639643
// The ext_proc stream is gone. "Fail open" means we proceed with the RPC
640644
// without any more processing.
641-
synchronized (lock) {
645+
synchronized (requestLock) {
642646
if (!headersSent) {
643647
headersSent = true;
644648
delegate().start(listener, requestHeaders);
@@ -665,11 +669,6 @@ protected ExtProcListener(ClientCall.Listener<InputStream> delegate, ClientCall<
665669
this.extProcClientCall = extProcClientCall;
666670
}
667671

668-
private void sendToApp(Runnable action) {
669-
// Response messages are delivered to the app listener, which gRPC handles via serialization.
670-
action.run();
671-
}
672-
673672
void setStream(ClientCallStreamObserver<io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest> stream) { this.stream = stream; }
674673

675674
@Override
@@ -678,18 +677,24 @@ public void onReady() {
678677
return;
679678
}
680679
if (extProcClientCall.isReady()) {
681-
super.onReady();
680+
synchronized (extProcClientCall.responseLock) {
681+
super.onReady();
682+
}
682683
}
683684
}
684685

685686
@Override
686687
public void onHeaders(Metadata headers) {
687688
if (extProcClientCall.extProcStreamCompleted.get()) {
688-
super.onHeaders(headers);
689+
synchronized (extProcClientCall.responseLock) {
690+
super.onHeaders(headers);
691+
}
689692
return;
690693
}
691-
synchronized (extProcClientCall.lock) {
694+
synchronized (extProcClientCall.responseLock) {
692695
this.savedHeaders = headers;
696+
}
697+
synchronized (extProcClientCall.streamLock) {
693698
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
694699
.setResponseHeaders(io.envoyproxy.envoy.service.ext_proc.v3.HttpHeaders.newBuilder()
695700
.setHeaders(toHeaderMap(headers))
@@ -698,12 +703,14 @@ public void onHeaders(Metadata headers) {
698703
}
699704

700705
if (extProcClientCall.config.getObservabilityMode()) {
701-
super.onHeaders(headers);
706+
synchronized (extProcClientCall.responseLock) {
707+
super.onHeaders(headers);
708+
}
702709
}
703710
}
704711

705712
void proceedWithHeaders() {
706-
synchronized (extProcClientCall.lock) {
713+
synchronized (extProcClientCall.responseLock) {
707714
if (savedHeaders != null) {
708715
super.onHeaders(savedHeaders);
709716
savedHeaders = null;
@@ -714,7 +721,9 @@ void proceedWithHeaders() {
714721
@Override
715722
public void onMessage(InputStream message) {
716723
if (extProcClientCall.extProcStreamCompleted.get()) {
717-
super.onMessage(message);
724+
synchronized (extProcClientCall.responseLock) {
725+
super.onMessage(message);
726+
}
718727
return;
719728
}
720729

@@ -723,7 +732,9 @@ public void onMessage(InputStream message) {
723732
sendResponseBodyToExtProc(bodyBytes, false);
724733

725734
if (extProcClientCall.config.getObservabilityMode()) {
726-
sendToApp(() -> super.onMessage(new ByteArrayInputStream(bodyBytes)));
735+
synchronized (extProcClientCall.responseLock) {
736+
super.onMessage(new ByteArrayInputStream(bodyBytes));
737+
}
727738
}
728739
} catch (IOException e) {
729740
callDelegate.cancel("Failed to read server response", e);
@@ -737,21 +748,27 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
737748
// The incoming status will be CANCELLED. We must not attempt to forward the server's
738749
// response trailers to the now-dead ext_proc stream. Instead, we close the
739750
// application's call with UNAVAILABLE as per the gRFC.
740-
super.onClose(Status.UNAVAILABLE.withDescription("External processor stream failed").withCause(status.getCause()), new Metadata());
751+
synchronized (extProcClientCall.responseLock) {
752+
super.onClose(Status.UNAVAILABLE.withDescription("External processor stream failed").withCause(status.getCause()), new Metadata());
753+
}
741754
return;
742755
}
743756
if (extProcClientCall.extProcStreamCompleted.get()) {
744-
super.onClose(status, trailers);
757+
synchronized (extProcClientCall.responseLock) {
758+
super.onClose(status, trailers);
759+
}
745760
return;
746761
}
747762

748-
synchronized (extProcClientCall.lock) {
763+
synchronized (extProcClientCall.responseLock) {
749764
this.savedStatus = status;
750765
this.savedTrailers = trailers;
766+
}
751767

752-
// Signal end of response body stream to the external processor.
753-
sendResponseBodyToExtProc(null, true);
768+
// Signal end of response body stream to the external processor.
769+
sendResponseBodyToExtProc(null, true);
754770

771+
synchronized (extProcClientCall.streamLock) {
755772
// Event 6: Server Trailers with ACTUAL data
756773
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
757774
.setResponseTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder()
@@ -761,8 +778,10 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
761778
}
762779

763780
if (extProcClientCall.config.getObservabilityMode()) {
764-
super.onClose(status, trailers);
765-
synchronized (extProcClientCall.lock) {
781+
synchronized (extProcClientCall.responseLock) {
782+
super.onClose(status, trailers);
783+
}
784+
synchronized (extProcClientCall.streamLock) {
766785
extProcClientCall.extProcClientCallRequestObserver.onCompleted();
767786
}
768787
}
@@ -780,7 +799,7 @@ private void sendResponseBodyToExtProc(@Nullable byte[] bodyBytes, boolean endOf
780799
}
781800
bodyBuilder.setEndOfStream(endOfStream);
782801

783-
synchronized (extProcClientCall.lock) {
802+
synchronized (extProcClientCall.streamLock) {
784803
extProcClientCall.extProcClientCallRequestObserver.onNext(ProcessingRequest.newBuilder()
785804
.setResponseBody(bodyBuilder.build())
786805
.build());
@@ -791,7 +810,7 @@ private void sendResponseBodyToExtProc(@Nullable byte[] bodyBytes, boolean endOf
791810
* Called when ExtProc gives the final "OK" for the trailers phase.
792811
*/
793812
void proceedWithClose() {
794-
synchronized (extProcClientCall.lock) {
813+
synchronized (extProcClientCall.responseLock) {
795814
if (savedStatus != null) {
796815
super.onClose(savedStatus, savedTrailers);
797816
savedStatus = null;
@@ -801,7 +820,9 @@ void proceedWithClose() {
801820
}
802821

803822
void onExternalBody(com.google.protobuf.ByteString body) {
804-
sendToApp(() -> super.onMessage(body.newInput()));
823+
synchronized (extProcClientCall.responseLock) {
824+
super.onMessage(body.newInput());
825+
}
805826
}
806827

807828
void unblockAfterStreamComplete() {

0 commit comments

Comments
 (0)