mcp_transcoder: add response streaming option.#45076
Conversation
Signed-off-by: Yilin Guo <guoyilin@google.com>
|
CC @envoyproxy/api-shepherds: Your approval is needed for changes made to |
|
/gemini review cc @paulhong01 |
There was a problem hiding this comment.
Code Review
This pull request implements streaming transcoding for unstructured text responses in the MCP JSON-REST bridge filter, allowing responses to be delivered in JSON-escaped chunks without full buffering. The implementation includes updates to the filter's header and data encoding logic, along with comprehensive unit and integration tests. Review feedback suggests adding error handling when locating the JSON-RPC envelope split point, enforcing the maximum response body size limit during streaming to prevent resource exhaustion, and optimizing the escaping process to minimize data copies.
| size_t pos = ref_json.find(marker); | ||
| streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); | ||
| streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); |
There was a problem hiding this comment.
Relying on the exact string representation of a JSON dump to find a split point is fragile. While nlohmann::json is generally deterministic, changes in default formatting (e.g., whitespace or key ordering) could cause find to fail. At a minimum, you should check if the marker was found to avoid a crash or incorrect behavior.
| size_t pos = ref_json.find(marker); | |
| streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); | |
| streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); | |
| size_t pos = ref_json.find(marker); | |
| if (pos == std::string::npos) { | |
| IS_ENVOY_BUG("Failed to find marker in JSON-RPC envelope"); | |
| return; | |
| } | |
| streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); | |
| streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); |
|
|
||
| // Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without | ||
| // buffering the full response body. | ||
| if (!streaming_json_prefix_.empty()) { |
There was a problem hiding this comment.
The streaming path currently bypasses the max_response_body_size check. While streaming avoids the "unbounded buffering" issue mentioned in the proto documentation, it is still good practice to enforce a total response size limit to prevent resource exhaustion or abuse from a malicious upstream. Consider tracking the total bytes processed and enforcing the limit even in streaming mode.
| absl::string_view chunk(static_cast<const char*>(data.linearize(len)), len); | ||
| std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); |
There was a problem hiding this comment.
Calling data.linearize(len) followed by JsonEscaper::escapeString (which performs a 2-pass scan and returns a new std::string) results in multiple copies and full scans of the data. For large chunks, this can be inefficient. If performance becomes an issue, consider processing the buffer slice-by-slice using data.getRawSlices() and modifying the escaper to append directly to a target buffer.
| // extension or the backend itself to handle the tools/list request if they support it. | ||
| HttpRule tool_list_http_rule = 3; | ||
|
|
||
| // Enables streaming transcoding for unstructured text responses (``content`` field of a result). |
There was a problem hiding this comment.
I'm thinking through the implications of this change and would like to heard some feedbacks from @tyxia and @botengyao as well. With this added, it seems the only times we'd still need to buffer the response are for structured content or non-text content types.
To handle this moving forward, I'm thinking whether we should
- set this new improvement as the default behavior. For the original buffering approach, we add a TODO to improve it to handle structured content response
- use output_schema field (assume we will add it in the subsequent change) to decide whether the response is structured content type or not (instead of a separate flag). When the user configures the "output_schema" field, it implies that the tools call response is structured content and will be buffered.
| ENVOY_STREAM_LOG(debug, "Streaming: skipping empty intermediate chunk.", *encoder_callbacks_); | ||
| return Http::FilterDataStatus::Continue; | ||
| } | ||
| absl::string_view chunk(static_cast<const char*>(data.linearize(len)), len); |
There was a problem hiding this comment.
Consider adding a TODO to handle the edge case where the backend server returns text/event-stream format?
| } | ||
| } | ||
| })"; | ||
|
|
There was a problem hiding this comment.
some nits:
- consider using a more complicated test cases (e.g. same as the setup in ToolsCallTranscoding) and we can probably verify these 2 tests got the same response in the client in the non-structured content case.
- consider avoid using "streaming" in the test name. At first glance, I thought streaming is to simulate the case where the backend server returns a streaming response (i.e. text/event-stream)
Commit Message: mcp_transcoder: add response streaming option.
Additional Description:
This PR adds a
text_content_streaming_enabledconfiguration option to themcp_json_rest_bridgeHTTP filter.Currently, the transcoder buffers the entire response before sending it to the client. With this option enabled, the filter supports streaming unstructured text responses directly to the client chunk-by-chunk.
Implementation details:
encodeHeaders, the filter strips theContent-Lengthheader and pre-builds a JSON-RPC envelope (prefix and suffix).encodeData, it JSON-escapes each arriving text chunk on-the-fly and streams it to the client wrapped inside the pre-built JSON-RPC envelope.Risk Level: Low
Testing: Unit and Integration tests.
Docs Changes: N/A
Release Notes: N/A
Platform Specific Features: N/A