-
Notifications
You must be signed in to change notification settings - Fork 5.4k
mcp_transcoder: add response streaming option. #45076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,8 @@ | |||||||||||||||||||||
| #include "envoy/http/filter.h" | ||||||||||||||||||||||
| #include "envoy/http/header_map.h" | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| #include "source/common/buffer/buffer_impl.h" | ||||||||||||||||||||||
| #include "source/common/common/json_escape_string.h" | ||||||||||||||||||||||
| #include "source/common/http/headers.h" | ||||||||||||||||||||||
| #include "source/common/protobuf/utility.h" | ||||||||||||||||||||||
| #include "source/extensions/filters/common/mcp/constants.h" | ||||||||||||||||||||||
|
|
@@ -61,7 +63,7 @@ json translateJsonRestResponseToJsonRpc(absl::string_view tool_call_response, | |||||||||||||||||||||
| {McpConstants::RESULT_FIELD, | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
| {McpConstants::CONTENT_FIELD, | ||||||||||||||||||||||
| json::array({{{McpConstants::TYPE_FIELD, "text"}, | ||||||||||||||||||||||
| json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD}, | ||||||||||||||||||||||
| {McpConstants::TEXT_FIELD, tool_call_response}}})}, | ||||||||||||||||||||||
| {McpConstants::IS_ERROR_FIELD, is_error}, | ||||||||||||||||||||||
| }}, | ||||||||||||||||||||||
|
|
@@ -165,6 +167,10 @@ McpJsonRestBridgeFilterConfig::getToolsListHttpRule() const { | |||||||||||||||||||||
| return proto_config_.tool_config().tool_list_http_rule(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| bool McpJsonRestBridgeFilterConfig::textContentStreamingEnabled() const { | ||||||||||||||||||||||
| return proto_config_.tool_config().text_content_streaming_enabled(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Http::FilterHeadersStatus | ||||||||||||||||||||||
| McpJsonRestBridgeFilter::decodeHeaders(Http::RequestHeaderMap& request_headers, bool) { | ||||||||||||||||||||||
| absl::string_view path = request_headers.getPathValue(); | ||||||||||||||||||||||
|
|
@@ -248,8 +254,8 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::decodeData(Buffer::Instance& dat | |||||||||||||||||||||
| return Http::FilterDataStatus::Continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap&, | ||||||||||||||||||||||
| bool end_stream) { | ||||||||||||||||||||||
| Http::FilterHeadersStatus | ||||||||||||||||||||||
| McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers, bool end_stream) { | ||||||||||||||||||||||
| switch (mcp_operation_) { | ||||||||||||||||||||||
| case McpOperation::Unspecified: | ||||||||||||||||||||||
| case McpOperation::Undecided: | ||||||||||||||||||||||
|
|
@@ -262,6 +268,17 @@ Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseH | |||||||||||||||||||||
| break; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Streaming mode: pre-build the JSON-RPC prefix/suffix, strip Content-Length | ||||||||||||||||||||||
| // (final size is unknown), and let the headers flow through immediately so | ||||||||||||||||||||||
| // the client can start receiving data without waiting for the full body. | ||||||||||||||||||||||
| if (mcp_operation_ == McpOperation::ToolsCall && config_->textContentStreamingEnabled()) { | ||||||||||||||||||||||
| buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >= | ||||||||||||||||||||||
| static_cast<int>(Http::Code::BadRequest)); | ||||||||||||||||||||||
| response_headers.removeContentLength(); | ||||||||||||||||||||||
| response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json); | ||||||||||||||||||||||
| return Http::FilterHeadersStatus::Continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // TODO(guoyilin42): Handle headers-only upstream responses (e.g., 204 No Content). | ||||||||||||||||||||||
| // Currently, these cases bypass transcoding, which can cause MCP SDKs to timeout | ||||||||||||||||||||||
| // or throw exceptions because they expect a valid JSON-RPC response with a | ||||||||||||||||||||||
|
|
@@ -280,6 +297,36 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat | |||||||||||||||||||||
| return Http::FilterDataStatus::Continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without | ||||||||||||||||||||||
| // buffering the full response body. | ||||||||||||||||||||||
| if (!streaming_json_prefix_.empty()) { | ||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The streaming path currently bypasses the |
||||||||||||||||||||||
| uint64_t len = data.length(); | ||||||||||||||||||||||
| if (len == 0 && !end_stream) { | ||||||||||||||||||||||
| ENVOY_STREAM_LOG(debug, "Streaming: skipping empty intermediate chunk.", *encoder_callbacks_); | ||||||||||||||||||||||
| return Http::FilterDataStatus::Continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| absl::string_view chunk(static_cast<const char*>(data.linearize(len)), len); | ||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a TODO to handle the edge case where the backend server returns text/event-stream format? |
||||||||||||||||||||||
| std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); | ||||||||||||||||||||||
|
Comment on lines
+308
to
+309
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Calling |
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| data.drain(len); | ||||||||||||||||||||||
| if (is_first_streaming_chunk_) { | ||||||||||||||||||||||
| ENVOY_STREAM_LOG(debug, | ||||||||||||||||||||||
| "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", | ||||||||||||||||||||||
| *encoder_callbacks_, len, escaped_chunk.size()); | ||||||||||||||||||||||
| data.add(streaming_json_prefix_); | ||||||||||||||||||||||
| is_first_streaming_chunk_ = false; | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", | ||||||||||||||||||||||
| *encoder_callbacks_, len, escaped_chunk.size()); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| data.add(escaped_chunk); | ||||||||||||||||||||||
| if (end_stream) { | ||||||||||||||||||||||
| ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); | ||||||||||||||||||||||
| data.add(streaming_json_suffix_); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| return Http::FilterDataStatus::Continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| const uint32_t max_response_body_size = config_->maxResponseBodySize(); | ||||||||||||||||||||||
| if (max_response_body_size > 0 && | ||||||||||||||||||||||
| (response_body_.length() + data.length()) > max_response_body_size) { | ||||||||||||||||||||||
|
|
@@ -321,6 +368,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons | |||||||||||||||||||||
| return Http::FilterTrailersStatus::Continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) { | ||||||||||||||||||||||
| // Build a reference JSON-RPC envelope with an empty text placeholder. | ||||||||||||||||||||||
| json ref = { | ||||||||||||||||||||||
| {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, | ||||||||||||||||||||||
| {McpConstants::ID_FIELD, *session_id_}, | ||||||||||||||||||||||
| {McpConstants::RESULT_FIELD, | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
| {McpConstants::CONTENT_FIELD, | ||||||||||||||||||||||
| json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD}, | ||||||||||||||||||||||
| {McpConstants::TEXT_FIELD, ""}}})}, | ||||||||||||||||||||||
| {McpConstants::IS_ERROR_FIELD, is_error}, | ||||||||||||||||||||||
| }}, | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
| std::string ref_json = ref.dump(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Locate the empty-string placeholder for the text value: `"text":""`. | ||||||||||||||||||||||
| std::string marker = absl::StrCat("\"", McpConstants::TEXT_FIELD, "\":\"\""); | ||||||||||||||||||||||
| size_t pos = ref_json.find(marker); | ||||||||||||||||||||||
| streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); | ||||||||||||||||||||||
| streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); | ||||||||||||||||||||||
|
Comment on lines
+388
to
+390
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Relying on the exact string representation of a JSON dump to find a split point is fragile. While
Suggested change
|
||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| void McpJsonRestBridgeFilter::handleMcpMethod(const nlohmann::json& json_rpc, | ||||||||||||||||||||||
| Http::RequestHeaderMapOptRef request_headers) { | ||||||||||||||||||||||
| ENVOY_STREAM_LOG(debug, "Handling MCP JSON-RPC: {}", *decoder_callbacks_, json_rpc.dump()); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking through the implications of this change and would like to heard some feedbacks from @tyxia and @botengyao as well. With this added, it seems the only times we'd still need to buffer the response are for structured content or non-text content types.
To handle this moving forward, I'm thinking whether we should