Skip to content

Commit 27fdbb0

Browse files
committed
end_of_stream field setting for both request and response message handling using one message buffered approach.
1 parent cc49944 commit 27fdbb0

File tree

1 file changed

+45
-22
lines changed

1 file changed

+45
-22
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
public class ExternalProcessorFilter implements Filter {
3131
static final String TYPE_URL = "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor";
3232

33-
ManagedChannel extProcChannel;
3433
final String filterInstanceName;
3534
public ExternalProcessorFilter(String name) {
3635
filterInstanceName = checkNotNull(name, "name");
@@ -201,6 +200,7 @@ private static class ExtProcClientCall<ReqT, RespT> extends SimpleForwardingClie
201200
private boolean headersSent = false;
202201
private Metadata requestHeaders;
203202
private final java.util.Queue<Runnable> pendingActions = new java.util.concurrent.ConcurrentLinkedQueue<>();
203+
private ReqT lastRequestMessage;
204204

205205
protected ExtProcClientCall(ClientCall<ReqT, RespT> delegate,
206206
ExternalProcessorGrpc.ExternalProcessorStub stub,
@@ -285,16 +285,21 @@ public void sendMessage(ReqT message) {
285285
return;
286286
}
287287

288+
if (lastRequestMessage != null) {
289+
sendRequestBodyToExtProc(lastRequestMessage, false);
290+
}
291+
lastRequestMessage = message;
292+
}
293+
294+
private void sendRequestBodyToExtProc(ReqT message, boolean endOfStream) {
288295
try (InputStream is = method.streamRequest(message)) {
289-
// Correctly convert InputStream to byte array using Guava
290296
byte[] bodyBytes = ByteStreams.toByteArray(is);
291-
292297
requestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
293298
.setRequestBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
294299
.setBody(com.google.protobuf.ByteString.copyFrom(bodyBytes))
300+
.setEndOfStream(endOfStream)
295301
.build())
296302
.build());
297-
// The external processor is now responsible for the message. We don't send it from here.
298303
} catch (IOException e) {
299304
delegate().cancel("Failed to serialize message for External Processor", e);
300305
}
@@ -346,6 +351,11 @@ private void handleImmediateResponse(io.envoyproxy.envoy.service.ext_proc.v3.Imm
346351

347352
@Override
348353
public void halfClose() {
354+
if (lastRequestMessage != null) {
355+
sendRequestBodyToExtProc(lastRequestMessage, true);
356+
lastRequestMessage = null;
357+
}
358+
349359
// Event: Client Half-Close
350360
requestObserver.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
351361
.setRequestTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder().build())
@@ -358,10 +368,11 @@ private static class ExtProcListener<RespT> extends io.grpc.ForwardingClientCall
358368
private final MethodDescriptor<?, RespT> method;
359369
private final ClientCall<?, RespT> callDelegate; // The actual RPC call
360370
private io.grpc.stub.StreamObserver<io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest> stream;
361-
Metadata savedHeaders;
362-
Metadata savedTrailers;
363-
io.grpc.Status savedStatus;
371+
private Metadata savedHeaders;
372+
private Metadata savedTrailers;
373+
private io.grpc.Status savedStatus;
364374
private final java.util.Queue<RespT> messageQueue = new java.util.concurrent.ConcurrentLinkedQueue<>();
375+
private RespT lastMessage;
365376

366377
protected ExtProcListener(ClientCall.Listener<RespT> delegate, ClientCall<?, RespT> callDelegate, MethodDescriptor<?, RespT> method) {
367378
super(delegate);
@@ -385,16 +396,41 @@ public void onHeaders(Metadata headers) {
385396

386397
@Override
387398
public void onMessage(RespT message) {
399+
if (lastMessage != null) {
400+
sendResponseBodyToExtProc(lastMessage, false);
401+
}
402+
lastMessage = message;
403+
messageQueue.add(message);
404+
}
405+
406+
@Override
407+
public void onClose(io.grpc.Status status, Metadata trailers) {
408+
this.savedStatus = status;
409+
this.savedTrailers = trailers;
410+
411+
if (lastMessage != null) {
412+
sendResponseBodyToExtProc(lastMessage, true);
413+
lastMessage = null;
414+
}
415+
416+
// Event 6: Server Trailers with ACTUAL data
417+
stream.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
418+
.setResponseTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder()
419+
.setTrailers(toHeaderMap(savedTrailers)) // Map the captured trailers here
420+
.build())
421+
.build());
422+
}
423+
424+
private void sendResponseBodyToExtProc(RespT message, boolean endOfStream) {
388425
try (java.io.InputStream is = method.streamResponse(message)) {
389426
// Use Guava to convert the server's response message to bytes
390427
byte[] bodyBytes = ByteStreams.toByteArray(is);
391428

392-
messageQueue.add(message);
393-
394429
// Event 5: Server Message (Response Body) sent to Ext Proc
395430
stream.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
396431
.setResponseBody(io.envoyproxy.envoy.service.ext_proc.v3.HttpBody.newBuilder()
397432
.setBody(com.google.protobuf.ByteString.copyFrom(bodyBytes))
433+
.setEndOfStream(endOfStream)
398434
.build())
399435
.build());
400436

@@ -415,19 +451,6 @@ public void onMessage(RespT message) {
415451
}
416452
}
417453

418-
@Override
419-
public void onClose(io.grpc.Status status, Metadata trailers) {
420-
this.savedStatus = status;
421-
this.savedTrailers = trailers;
422-
423-
// Event 6: Server Trailers with ACTUAL data
424-
stream.onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingRequest.newBuilder()
425-
.setResponseTrailers(io.envoyproxy.envoy.service.ext_proc.v3.HttpTrailers.newBuilder()
426-
.setTrailers(toHeaderMap(savedTrailers)) // Map the captured trailers here
427-
.build())
428-
.build());
429-
}
430-
431454
/**
432455
* Called when ExtProc gives the final "OK" for the trailers phase.
433456
*/

0 commit comments

Comments
 (0)