Skip to content

Commit cc49944

Browse files
committed
Fixes for correctly using only the request body response as received from the external processor.
1 parent 9247f4a commit cc49944

File tree

1 file changed

+24
-8
lines changed

1 file changed

+24
-8
lines changed

xds/src/main/java/io/grpc/xds/ExternalProcessorFilter.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.grpc.ManagedChannel;
2222
import io.grpc.Metadata;
2323
import io.grpc.MethodDescriptor;
24+
import java.io.ByteArrayInputStream;
2425
import java.io.IOException;
2526
import java.io.InputStream;
2627
import javax.annotation.Nullable;
@@ -236,7 +237,6 @@ public void onNext(io.envoyproxy.envoy.service.ext_proc.v3.ProcessingResponse re
236237
// 2. Client Message (Request Body)
237238
else if (response.hasRequestBody()) {
238239
handleRequestBodyResponse(response.getRequestBody());
239-
drainQueue();
240240
}
241241
// 3. We don't send request trailers in gRPC for half close.
242242
// 4. Server Headers
@@ -294,21 +294,37 @@ public void sendMessage(ReqT message) {
294294
.setBody(com.google.protobuf.ByteString.copyFrom(bodyBytes))
295295
.build())
296296
.build());
297-
298-
// 3. Queue the ACTUAL delegate call.
299-
// We use super.sendMessage to bypass this interceptor's logic and move to the next call in the chain.
300-
pendingActions.add(() -> super.sendMessage(message));
297+
// The external processor is now responsible for the message. We don't send it from here.
301298
} catch (IOException e) {
302299
delegate().cancel("Failed to serialize message for External Processor", e);
303300
}
304301
}
305302

306303
private void handleRequestBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse bodyResponse) {
307-
// If ExtProc modified the body, you would deserialize it here.
308-
// For simplicity, we assume the original message is sent if no BodyMutation exists.
309304
if (bodyResponse.hasResponse() && bodyResponse.getResponse().hasBodyMutation()) {
310-
// Logic to deserialize modified bytes back to ReqT would go here
305+
io.envoyproxy.envoy.service.ext_proc.v3.BodyMutation mutation = bodyResponse.getResponse().getBodyMutation();
306+
if (mutation.hasBody()) {
307+
byte[] mutatedBody = mutation.getBody().toByteArray();
308+
try (InputStream is = new ByteArrayInputStream(mutatedBody)) {
309+
ReqT mutatedMessage = method.parseRequest(is);
310+
super.sendMessage(mutatedMessage);
311+
} catch (IOException e) {
312+
delegate().cancel("Failed to parse mutated message from External Processor", e);
313+
}
314+
} else if (mutation.getClearBody()) {
315+
// "clear_body" means we should send an empty message.
316+
try (InputStream is = new ByteArrayInputStream(new byte[0])) {
317+
ReqT emptyMessage = method.parseRequest(is);
318+
super.sendMessage(emptyMessage);
319+
} catch (IOException e) {
320+
// This should not happen with an empty stream.
321+
delegate().cancel("Failed to create empty message", e);
322+
}
323+
}
324+
// If body mutation is present but has no body and clear_body is false, do nothing.
325+
// This means the processor chose to drop the message.
311326
}
327+
// If no response is present, the processor chose to drop the message.
312328
}
313329

314330
private void handleResponseBodyResponse(io.envoyproxy.envoy.service.ext_proc.v3.BodyResponse bodyResponse, ExternalProcessorInterceptor.ExtProcListener<RespT> listener) {

0 commit comments

Comments
 (0)