@@ -189,10 +189,9 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
189189 .withExecutor (callOptions .getExecutor ());
190190
191191 if (filterConfig .grpcServiceConfig .timeout () != null && filterConfig .grpcServiceConfig .timeout ().isPresent ()) {
192- long timeoutSeconds = filterConfig .grpcServiceConfig .timeout ().get ().getSeconds ();
193- int timeoutNanos = filterConfig .grpcServiceConfig .timeout ().get ().getNano ();
194- if (timeoutSeconds > 0 || timeoutNanos > 0 ) {
195- stub = stub .withDeadlineAfter (timeoutSeconds * 1_000_000_000L + timeoutNanos , TimeUnit .NANOSECONDS );
192+ long timeoutNanos = filterConfig .grpcServiceConfig .timeout ().get ().toNanos ();
193+ if (timeoutNanos > 0 ) {
194+ stub = stub .withDeadlineAfter (timeoutNanos , TimeUnit .NANOSECONDS );
196195 }
197196 }
198197
@@ -452,7 +451,7 @@ public void onNext(ProcessingResponse response) {
452451
453452 if (response .getRequestDrain ()) {
454453 drainingExtProcStream .set (true );
455- closeExtProcStream ();
454+ halfCloseExtProcStream ();
456455 return ;
457456 }
458457
@@ -589,9 +588,7 @@ private void sendToExtProc(ProcessingRequest request) {
589588
590589 private void onExtProcStreamReady () {
591590 drainPendingRequests ();
592- if (isReady ()) {
593- wrappedListener .onReadyNotify ();
594- }
591+ onReadyNotify ();
595592 }
596593
597594 private void drainPendingRequests () {
@@ -617,6 +614,24 @@ private void closeExtProcStream() {
617614 }
618615 }
619616
617+ private void halfCloseExtProcStream () {
618+ synchronized (streamLock ) {
619+ if (!extProcStreamCompleted .get () && extProcClientCallRequestObserver != null ) {
620+ try {
621+ extProcClientCallRequestObserver .onCompleted ();
622+ } catch (IllegalStateException | io .grpc .StatusRuntimeException e ) {
623+ // Ignore
624+ }
625+ }
626+ }
627+ }
628+
629+ private void onReadyNotify () {
630+ if (isReady ()) {
631+ wrappedListener .onReadyNotify ();
632+ }
633+ }
634+
620635 @ Override
621636 public boolean isReady () {
622637 if (extProcStreamCompleted .get ()) {
@@ -625,9 +640,7 @@ public boolean isReady() {
625640 if (drainingExtProcStream .get ()) {
626641 return false ;
627642 }
628- boolean sendingBody = config .getProcessingMode ().getRequestBodyMode ()
629- == ProcessingMode .BodySendMode .GRPC ;
630- if (config .getObservabilityMode () || sendingBody ) {
643+ if (config .getObservabilityMode ()) {
631644 synchronized (streamLock ) {
632645 return super .isReady () && extProcClientCallRequestObserver != null
633646 && extProcClientCallRequestObserver .isReady ();
@@ -638,14 +651,26 @@ public boolean isReady() {
638651
639652 @ Override
640653 public void request (int numMessages ) {
654+ if (extProcStreamCompleted .get ()) {
655+ super .request (numMessages );
656+ return ;
657+ }
641658 // If the external processor is backed up with flow control, we need to stop requesting
642659 // messages from the remote side.
643- synchronized ( streamLock ) {
644- if (! isReady () ) {
660+ if ( drainingExtProcStream . get () ) {
661+ synchronized ( streamLock ) {
645662 pendingRequests += numMessages ;
646663 return ;
647664 }
648665 }
666+ if (config .getObservabilityMode ()) {
667+ synchronized (streamLock ) {
668+ if (!isReady ()) {
669+ pendingRequests += numMessages ;
670+ return ;
671+ }
672+ }
673+ }
649674 super .request (numMessages );
650675 }
651676
@@ -779,9 +804,6 @@ public void onReady() {
779804 }
780805
781806 void onReadyNotify () {
782- if (extProcClientCall .drainingExtProcStream .get ()) {
783- return ;
784- }
785807 if (extProcClientCall .isReady ()) {
786808 super .onReady ();
787809 }
@@ -901,6 +923,7 @@ void onExternalBody(com.google.protobuf.ByteString body) {
901923
902924 void unblockAfterStreamComplete () {
903925 proceedWithHeaders ();
926+ onReadyNotify ();
904927 proceedWithClose ();
905928 }
906929 }
0 commit comments