2424import io .grpc .Metadata ;
2525import io .grpc .MethodDescriptor ;
2626import io .grpc .Status ;
27+ import io .grpc .stub .ClientCallStreamObserver ;
2728import io .grpc .xds .internal .grpcservice .GrpcServiceChannelCreator ;
2829import io .grpc .xds .internal .grpcservice .GrpcServiceChannelCreatorImpl ;
2930import java .io .ByteArrayInputStream ;
@@ -240,7 +241,9 @@ private static class ExtProcClientCall<ReqT, RespT> extends SimpleForwardingClie
240241 private final ExternalProcessorGrpc .ExternalProcessorStub stub ;
241242 private final MethodDescriptor <ReqT , RespT > method ;
242243 private final ExternalProcessor config ;
243- private io .grpc .stub .StreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > requestObserver ;
244+ private ClientCallStreamObserver <ProcessingRequest > clientCallRequestObserver ;
245+ private final Object extProcLock = new Object ();
246+ private boolean extProcStreamReady ;
244247
245248 private boolean headersSent = false ;
246249 private Metadata requestHeaders ;
@@ -263,17 +266,21 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
263266 this .requestHeaders = headers ;
264267 ExternalProcessorInterceptor .ExtProcListener <ReqT , RespT > wrappedListener = new ExternalProcessorInterceptor .ExtProcListener <>(responseListener , delegate (), method , this );
265268
266- requestObserver = stub .process (new io .grpc .stub .StreamObserver <ProcessingResponse >() {
269+ clientCallRequestObserver = ( ClientCallStreamObserver < ProcessingRequest >) stub .process (new io .grpc .stub .StreamObserver <ProcessingResponse >() {
267270 @ Override
268271 public void onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingResponse response ) {
269272 if (response .hasImmediateResponse ()) {
270273 handleImmediateResponse (response .getImmediateResponse (), responseListener );
271274 return ;
272275 }
273276
277+ if (config .getObservabilityMode ()) {
278+ return ;
279+ }
280+
274281 if (response .getRequestDrain ()) {
275282 handleFailOpen (wrappedListener );
276- requestObserver .onCompleted ();
283+ clientCallRequestObserver .onCompleted ();
277284 return ;
278285 }
279286
@@ -297,7 +304,7 @@ else if (response.hasRequestBody()) {
297304 io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
298305 .withDescription ("gRPC message compression not supported in ext_proc" )
299306 .asRuntimeException ();
300- requestObserver .onError (ex );
307+ clientCallRequestObserver .onError (ex );
301308 onError (ex );
302309 return ;
303310 }
@@ -320,7 +327,7 @@ else if (response.hasResponseBody()) {
320327 io .grpc .StatusRuntimeException ex = io .grpc .Status .INTERNAL
321328 .withDescription ("gRPC message compression not supported in ext_proc" )
322329 .asRuntimeException ();
323- requestObserver .onError (ex );
330+ clientCallRequestObserver .onError (ex );
324331 onError (ex );
325332 return ;
326333 }
@@ -357,13 +364,51 @@ public void onCompleted() {
357364 }
358365 });
359366
360- wrappedListener .setStream (requestObserver );
367+ if (config .getObservabilityMode ()) {
368+ this .extProcStreamReady = clientCallRequestObserver .isReady ();
369+ clientCallRequestObserver .setOnReadyHandler (this ::onExtProcStreamReady );
370+ }
371+
372+ wrappedListener .setStream (clientCallRequestObserver );
361373
362- requestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
374+ sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
363375 .setRequestHeaders (io .envoyproxy .envoy .service .ext_proc .v3 .HttpHeaders .newBuilder ()
364376 .setHeaders (toHeaderMap (headers ))
365377 .build ())
366378 .build ());
379+
380+ if (config .getObservabilityMode ()) {
381+ headersSent = true ;
382+ delegate ().start (wrappedListener , headers );
383+ }
384+ }
385+
386+ private void onExtProcStreamReady () {
387+ synchronized (extProcLock ) {
388+ extProcStreamReady = true ;
389+ extProcLock .notifyAll ();
390+ }
391+ }
392+
393+ private void sendToExtProc (ProcessingRequest request ) {
394+ if (!config .getObservabilityMode ()) {
395+ clientCallRequestObserver .onNext (request );
396+ return ;
397+ }
398+
399+ synchronized (extProcLock ) {
400+ while (!extProcStreamReady ) {
401+ try {
402+ extProcLock .wait ();
403+ } catch (InterruptedException e ) {
404+ Thread .currentThread ().interrupt ();
405+ delegate ().cancel ("Interrupted while waiting for ext_proc stream" , e );
406+ return ;
407+ }
408+ }
409+ clientCallRequestObserver .onNext (request );
410+ extProcStreamReady = clientCallRequestObserver .isReady ();
411+ }
367412 }
368413
369414 @ Override
@@ -373,15 +418,15 @@ public void sendMessage(ReqT message) {
373418 return ;
374419 }
375420
376- if (!headersSent ) {
421+ if (!headersSent && ! config . getObservabilityMode () ) {
377422 // If headers haven't been cleared by ext_proc yet, buffer the whole action
378423 pendingActions .add (() -> sendMessage (message ));
379424 return ;
380425 }
381426
382427 try (InputStream is = method .streamRequest (message )) {
383428 byte [] bodyBytes = ByteStreams .toByteArray (is );
384- requestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
429+ sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
385430 .setRequestBody (io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .newBuilder ()
386431 .setBody (com .google .protobuf .ByteString .copyFrom (bodyBytes ))
387432 .setEndOfStream (false )
@@ -390,6 +435,10 @@ public void sendMessage(ReqT message) {
390435 } catch (IOException e ) {
391436 delegate ().cancel ("Failed to serialize message for External Processor" , e );
392437 }
438+
439+ if (config .getObservabilityMode ()) {
440+ super .sendMessage (message );
441+ }
393442 }
394443
395444 @ Override
@@ -400,7 +449,7 @@ public void halfClose() {
400449 }
401450
402451 // Signal end of request body stream to the external processor.
403- requestObserver . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
452+ sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
404453 .setRequestBody (io .envoyproxy .envoy .service .ext_proc .v3 .HttpBody .newBuilder ()
405454 .setEndOfStream (true )
406455 .build ())
@@ -449,7 +498,7 @@ private void handleImmediateResponse(io.envoyproxy.envoy.service.ext_proc.v3.Imm
449498 io .grpc .Status status = io .grpc .Status .fromCodeValue (immediate .getGrpcStatus ().getStatus ());
450499 delegate ().cancel ("Rejected by ExtProc" , null );
451500 listener .onClose (status , new Metadata ());
452- requestObserver .onCompleted ();
501+ clientCallRequestObserver .onCompleted ();
453502 }
454503
455504 private void handleFailOpen (ExtProcListener <ReqT , RespT > listener ) {
@@ -470,7 +519,7 @@ private static class ExtProcListener<ReqT, RespT> extends ForwardingClientCallLi
470519 private final MethodDescriptor <?, RespT > method ;
471520 private final ClientCall <?, RespT > callDelegate ; // The actual RPC call
472521 private final ExtProcClientCall <ReqT , RespT > call ;
473- private io . grpc . stub . StreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ;
522+ private ClientCallStreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ;
474523 private Metadata savedHeaders ;
475524 private Metadata savedTrailers ;
476525 private io .grpc .Status savedStatus ;
@@ -484,7 +533,7 @@ protected ExtProcListener(ClientCall.Listener<RespT> delegate, ClientCall<?, Res
484533 this .call = call ;
485534 }
486535
487- void setStream (io . grpc . stub . StreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ) { this .stream = stream ; }
536+ void setStream (ClientCallStreamObserver <io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest > stream ) { this .stream = stream ; }
488537
489538 @ Override
490539 public void onHeaders (Metadata headers ) {
@@ -493,11 +542,15 @@ public void onHeaders(Metadata headers) {
493542 return ;
494543 }
495544 this .savedHeaders = headers ;
496- stream . onNext (ProcessingRequest .newBuilder ()
545+ call . sendToExtProc (ProcessingRequest .newBuilder ()
497546 .setResponseHeaders (io .envoyproxy .envoy .service .ext_proc .v3 .HttpHeaders .newBuilder ()
498547 .setHeaders (toHeaderMap (headers ))
499548 .build ())
500549 .build ());
550+
551+ if (call .config .getObservabilityMode ()) {
552+ super .onHeaders (headers );
553+ }
501554 }
502555
503556 void proceedWithHeaders () { super .onHeaders (savedHeaders ); }
@@ -509,7 +562,12 @@ public void onMessage(RespT message) {
509562 return ;
510563 }
511564 sendResponseBodyToExtProc (message , false );
512- messageQueue .add (message );
565+
566+ if (call .config .getObservabilityMode ()) {
567+ super .onMessage (message );
568+ } else {
569+ messageQueue .add (message );
570+ }
513571 }
514572
515573 @ Override
@@ -534,11 +592,15 @@ public void onClose(io.grpc.Status status, Metadata trailers) {
534592 sendResponseBodyToExtProc (null , true );
535593
536594 // Event 6: Server Trailers with ACTUAL data
537- stream . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
595+ call . sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
538596 .setResponseTrailers (io .envoyproxy .envoy .service .ext_proc .v3 .HttpTrailers .newBuilder ()
539597 .setTrailers (toHeaderMap (savedTrailers )) // Map the captured trailers here
540598 .build ())
541599 .build ());
600+
601+ if (call .config .getObservabilityMode ()) {
602+ super .onClose (status , trailers );
603+ }
542604 }
543605
544606 private void sendResponseBodyToExtProc (@ Nullable RespT message , boolean endOfStream ) {
@@ -556,7 +618,7 @@ private void sendResponseBodyToExtProc(@Nullable RespT message, boolean endOfStr
556618 }
557619 bodyBuilder .setEndOfStream (endOfStream );
558620
559- stream . onNext (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
621+ call . sendToExtProc (io .envoyproxy .envoy .service .ext_proc .v3 .ProcessingRequest .newBuilder ()
560622 .setResponseBody (bodyBuilder .build ())
561623 .build ());
562624
0 commit comments