From 396b26d8599e33dffad913379b01d61000e97d63 Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Wed, 13 May 2026 23:45:34 +0000 Subject: [PATCH 1/6] mcp_transcoder: add response streaming option. Signed-off-by: Yilin Guo --- .../v3/mcp_json_rest_bridge.proto | 15 ++ .../mcp_json_rest_bridge_filter.cc | 75 ++++++++- .../mcp_json_rest_bridge_filter.h | 11 ++ .../mcp_json_rest_bridge_filter_test.cc | 146 ++++++++++++++++ .../mcp_json_rest_bridge_integration_test.cc | 157 ++++++++++++++++++ 5 files changed, 401 insertions(+), 3 deletions(-) diff --git a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto index a70e4cb4b4f8d..b6fe4fb5813cd 100644 --- a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto +++ b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto @@ -168,6 +168,21 @@ message ServerToolConfig { // - If not provided: The ``tools/list`` request is passed through. This allows subsequent // extension or the backend itself to handle the tools/list request if they support it. HttpRule tool_list_http_rule = 3; + + // Enables streaming transcoding for unstructured text responses (``content`` field of a result). + // + // When enabled, the response body is streamed directly to the client without buffering. Each + // chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix. + // + // Streaming flow: + // + // .. code-block:: text + // + // input: [chunk1] → [chunk2] → [chunk3] + // output: [prefix+escaped_chunk1] → [escaped_chunk2] → [escaped_chunk3+suffix] + // + // Disabled by default. + bool text_content_streaming_enabled = 4; } // Configuration for a specific MCP tool. diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index 4d7c03d3f5139..c12487e0c0210 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -6,6 +6,8 @@ #include "envoy/http/filter.h" #include "envoy/http/header_map.h" +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/json_escape_string.h" #include "source/common/http/headers.h" #include "source/common/protobuf/utility.h" #include "source/extensions/filters/common/mcp/constants.h" @@ -61,7 +63,7 @@ json translateJsonRestResponseToJsonRpc(absl::string_view tool_call_response, {McpConstants::RESULT_FIELD, { {McpConstants::CONTENT_FIELD, - json::array({{{McpConstants::TYPE_FIELD, "text"}, + json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD}, {McpConstants::TEXT_FIELD, tool_call_response}}})}, {McpConstants::IS_ERROR_FIELD, is_error}, }}, @@ -165,6 +167,10 @@ McpJsonRestBridgeFilterConfig::getToolsListHttpRule() const { return proto_config_.tool_config().tool_list_http_rule(); } +bool McpJsonRestBridgeFilterConfig::textContentStreamingEnabled() const { + return proto_config_.tool_config().text_content_streaming_enabled(); +} + Http::FilterHeadersStatus McpJsonRestBridgeFilter::decodeHeaders(Http::RequestHeaderMap& request_headers, bool) { absl::string_view path = request_headers.getPathValue(); @@ -248,8 +254,8 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::decodeData(Buffer::Instance& dat return Http::FilterDataStatus::Continue; } -Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap&, - bool end_stream) { +Http::FilterHeadersStatus +McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers, bool end_stream) { switch (mcp_operation_) { case McpOperation::Unspecified: case McpOperation::Undecided: @@ -262,6 +268,17 @@ Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseH break; } + // Streaming mode: pre-build the JSON-RPC prefix/suffix, strip Content-Length + // (final size is unknown), and let the headers flow through immediately so + // the client can start receiving data without waiting for the full body. + if (mcp_operation_ == McpOperation::ToolsCall && config_->textContentStreamingEnabled()) { + buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >= + static_cast(Http::Code::BadRequest)); + response_headers.removeContentLength(); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + return Http::FilterHeadersStatus::Continue; + } + // TODO(guoyilin42): Handle headers-only upstream responses (e.g., 204 No Content). // Currently, these cases bypass transcoding, which can cause MCP SDKs to timeout // or throw exceptions because they expect a valid JSON-RPC response with a @@ -280,6 +297,36 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat return Http::FilterDataStatus::Continue; } + // Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without + // buffering the full response body. + if (!streaming_json_prefix_.empty()) { + uint64_t len = data.length(); + if (len == 0 && !end_stream) { + ENVOY_STREAM_LOG(debug, "Streaming: skipping empty intermediate chunk.", *encoder_callbacks_); + return Http::FilterDataStatus::Continue; + } + absl::string_view chunk(static_cast(data.linearize(len)), len); + std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); + + data.drain(len); + if (is_first_streaming_chunk_) { + ENVOY_STREAM_LOG(debug, + "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, escaped_chunk.size()); + data.add(streaming_json_prefix_); + is_first_streaming_chunk_ = false; + } else { + ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, escaped_chunk.size()); + } + data.add(escaped_chunk); + if (end_stream) { + ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); + data.add(streaming_json_suffix_); + } + return Http::FilterDataStatus::Continue; + } + const uint32_t max_response_body_size = config_->maxResponseBodySize(); if (max_response_body_size > 0 && (response_body_.length() + data.length()) > max_response_body_size) { @@ -321,6 +368,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons return Http::FilterTrailersStatus::Continue; } +void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) { + // Build a reference JSON-RPC envelope with an empty text placeholder. + json ref = { + {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, + {McpConstants::ID_FIELD, *session_id_}, + {McpConstants::RESULT_FIELD, + { + {McpConstants::CONTENT_FIELD, + json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD}, + {McpConstants::TEXT_FIELD, ""}}})}, + {McpConstants::IS_ERROR_FIELD, is_error}, + }}, + }; + std::string ref_json = ref.dump(); + + // Locate the empty-string placeholder for the text value: `"text":""`. + std::string marker = absl::StrCat("\"", McpConstants::TEXT_FIELD, "\":\"\""); + size_t pos = ref_json.find(marker); + streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); + streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); +} + void McpJsonRestBridgeFilter::handleMcpMethod(const nlohmann::json& json_rpc, Http::RequestHeaderMapOptRef request_headers) { ENVOY_STREAM_LOG(debug, "Handling MCP JSON-RPC: {}", *decoder_callbacks_, json_rpc.dump()); diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h index 3a7efeb8bc1f1..093a4259daffa 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h @@ -42,6 +42,7 @@ class McpJsonRestBridgeFilterConfig : public Logger::Loggable(proto_config); + filter_ = std::make_unique(config_); + filter_->setDecoderFilterCallbacks(decoder_callbacks_); + filter_->setEncoderFilterCallbacks(encoder_callbacks_); + EXPECT_CALL(decoder_callbacks_, requestHeaders()) + .WillRepeatedly(Return(Http::RequestHeaderMapOptRef(request_headers_))); + EXPECT_CALL(encoder_callbacks_, responseHeaders()) + .WillRepeatedly(Return(Http::ResponseHeaderMapOptRef(response_headers_))); + } + + void sendToolsCallRequest() { + request_headers_ = {{":method", "POST"}, {":path", "/mcp"}}; + ASSERT_EQ(filter_->decodeHeaders(request_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::StopIteration); + EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache()); + Buffer::OwnedImpl req( + R"json({"jsonrpc":"2.0","id":123,"method":"tools/call","params":{"name":"get_api_key"}})json"); + ASSERT_EQ(filter_->decodeData(req, /*end_stream=*/true), Http::FilterDataStatus::Continue); + } + + McpJsonRestBridgeFilterConfigSharedPtr config_; + std::unique_ptr filter_; + NiceMock decoder_callbacks_; + NiceMock encoder_callbacks_; + Http::TestRequestHeaderMapImpl request_headers_; + Http::TestResponseHeaderMapImpl response_headers_; +}; + +TEST_F(McpJsonRestBridgeStreamingFilterTest, SingleChunkReturnsFullJsonRpcResponse) { + sendToolsCallRequest(); + + response_headers_ = { + {":status", "200"}, {"content-type", "text/plain"}, {"content-length", "11"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json")); + EXPECT_FALSE(response_headers_.has(Http::Headers::get().ContentLength)); + + Buffer::OwnedImpl chunk("hello world"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue); + EXPECT_EQ( + nlohmann::json::parse(chunk.toString()), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"hello world","type":"text"}],"isError":false}})json")); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, MultipleChunksAreStreamedCorrectly) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}, {"content-length", "100"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + Buffer::OwnedImpl chunk1("part1"); + EXPECT_EQ(filter_->encodeData(chunk1, /*end_stream=*/false), Http::FilterDataStatus::Continue); + // First chunk contains the prefix + escaped "part1". + EXPECT_THAT(chunk1.toString(), testing::StartsWith("{\"id\":")); + + Buffer::OwnedImpl chunk2("part2"); + EXPECT_EQ(filter_->encodeData(chunk2, /*end_stream=*/false), Http::FilterDataStatus::Continue); + // Middle chunk is just "part2" (no JSON wrapper). + EXPECT_THAT(chunk2.toString(), StrEq("part2")); + + Buffer::OwnedImpl chunk3("part3"); + EXPECT_EQ(filter_->encodeData(chunk3, /*end_stream=*/true), Http::FilterDataStatus::Continue); + // Last chunk contains "part3" + the closing suffix. + EXPECT_THAT(chunk3.toString(), testing::EndsWith("}}")); + + // Reassemble and verify the full JSON-RPC response. + const std::string full = chunk1.toString() + chunk2.toString() + chunk3.toString(); + EXPECT_EQ( + nlohmann::json::parse(full), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"part1part2part3","type":"text"}],"isError":false}})json")); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, SpecialCharactersAreEscaped) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + // Content contains double-quotes, a backslash, a newline, and a tab. + Buffer::OwnedImpl chunk("{\"key\":\"val\\path\"\n\t}"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue); + + const nlohmann::json response = nlohmann::json::parse(chunk.toString()); + EXPECT_EQ(response["id"], 123); + EXPECT_EQ(response["result"]["isError"], false); + EXPECT_EQ(response["result"]["content"][0]["text"].get(), + "{\"key\":\"val\\path\"\n\t}"); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, ErrorResponseSetsIsErrorTrue) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "500"}, {"content-length", "21"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json")); + EXPECT_FALSE(response_headers_.has(Http::Headers::get().ContentLength)); + + Buffer::OwnedImpl chunk("Internal Server Error"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue); + EXPECT_EQ( + nlohmann::json::parse(chunk.toString()), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"Internal Server Error","type":"text"}],"isError":true}})json")); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, EmptyIntermediateChunkIsSkipped) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + Buffer::OwnedImpl empty_chunk; + EXPECT_EQ(filter_->encodeData(empty_chunk, /*end_stream=*/false), + Http::FilterDataStatus::Continue); + EXPECT_TRUE(empty_chunk.toString().empty()); + + // The subsequent real chunk should still get the prefix. + Buffer::OwnedImpl final_chunk("content"); + EXPECT_EQ(filter_->encodeData(final_chunk, /*end_stream=*/true), + Http::FilterDataStatus::Continue); + EXPECT_EQ( + nlohmann::json::parse(final_chunk.toString()), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"content","type":"text"}],"isError":false}})json")); +} + class McpHttpMethodFilterTest : public testing::TestWithParam { public: void SetUp() override { diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc index f79be0c43454b..dd7ae496ccb3d 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc @@ -548,5 +548,162 @@ TEST_P(McpJsonRestBridgeIntegrationTest, InitializeUnsupportedProtocolVersionFal EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_response)); } +TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingTranscoding) { + const std::string config = R"EOF( + name: envoy.filters.http.mcp_json_rest_bridge + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.mcp_json_rest_bridge.v3.McpJsonRestBridge + tool_config: + tools: + - name: "create_api_key" + http_rule: + post: "/v1/{parent=projects/*}/keys" + body: "key" + text_content_streaming_enabled: true + )EOF"; + + initializeFilter(config); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + const std::string request_body = R"({ + "jsonrpc": "2.0", + "id": 321, + "method": "tools/call", + "params": { + "name": "create_api_key", + "arguments": { + "parent": "projects/foo", + "key": { + "displayName": "bar" + } + } + } + })"; + + auto response = codec_client_->makeRequestWithBody( + Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/mcp"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-type", "application/json"}}, + request_body); + + waitForNextUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers().getMethodValue(), StrEq("POST")); + EXPECT_THAT(upstream_request_->headers().getPathValue(), StrEq("/v1/projects/foo/keys")); + + Http::TestResponseHeaderMapImpl response_headers; + response_headers.setStatus(200); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + response_headers.setContentLength(21); + upstream_request_->encodeHeaders(response_headers, false); + + Buffer::OwnedImpl chunk1; + chunk1.add(R"({"displayName":)"); + upstream_request_->encodeData(chunk1, false); + + Buffer::OwnedImpl chunk2; + chunk2.add(R"("bar"})"); + upstream_request_->encodeData(chunk2, true); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + + EXPECT_THAT(response->headers().getStatusValue(), StrEq("200")); + EXPECT_THAT(response->headers().getContentTypeValue(), StrEq("application/json")); + EXPECT_THAT(response->headers().getContentLengthValue(), IsEmpty()); + + const std::string expected_rpc_response = R"({ + "jsonrpc": "2.0", + "id": 321, + "result": { + "content": [ + { + "type": "text", + "text": "{\"displayName\":\"bar\"}" + } + ], + "isError": false + } + })"; + EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); +} + +TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingErrorResponse) { + const std::string config = R"EOF( + name: envoy.filters.http.mcp_json_rest_bridge + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.mcp_json_rest_bridge.v3.McpJsonRestBridge + tool_config: + tools: + - name: "create_api_key" + http_rule: + post: "/v1/{parent=projects/*}/keys" + body: "key" + text_content_streaming_enabled: true + )EOF"; + + initializeFilter(config); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + const std::string request_body = R"({ + "jsonrpc": "2.0", + "id": 42, + "method": "tools/call", + "params": { + "name": "create_api_key", + "arguments": { + "parent": "projects/foo", + "key": { + "displayName": "bar" + } + } + } + })"; + + auto response = codec_client_->makeRequestWithBody( + Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/mcp"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-type", "application/json"}}, + request_body); + + waitForNextUpstreamRequest(); + + Http::TestResponseHeaderMapImpl response_headers; + response_headers.setStatus(500); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + upstream_request_->encodeHeaders(response_headers, false); + + Buffer::OwnedImpl response_data; + response_data.add("Internal Server Error"); + upstream_request_->encodeData(response_data, true); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + + EXPECT_THAT(response->headers().getStatusValue(), StrEq("500")); + EXPECT_THAT(response->headers().getContentTypeValue(), StrEq("application/json")); + EXPECT_THAT(response->headers().getContentLengthValue(), IsEmpty()); + + const std::string expected_rpc_response = R"({ + "jsonrpc": "2.0", + "id": 42, + "result": { + "content": [ + { + "type": "text", + "text": "Internal Server Error" + } + ], + "isError": true + } + })"; + EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); +} + } // namespace } // namespace Envoy From 2d044306ea5e7ec71a8b6b73acad6bb9abc541ec Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Mon, 18 May 2026 22:23:51 +0000 Subject: [PATCH 2/6] Address comments. Signed-off-by: Yilin Guo --- .../mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc | 6 ++---- .../http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h | 5 ++++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index 49b413036b65c..ce9953e8db622 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -167,10 +167,6 @@ McpJsonRestBridgeFilterConfig::getToolsListHttpRule() const { return proto_config_.tool_config().tool_list_http_rule(); } -bool McpJsonRestBridgeFilterConfig::textContentStreamingEnabled() const { - return proto_config_.tool_config().text_content_streaming_enabled(); -} - Http::FilterHeadersStatus McpJsonRestBridgeFilter::decodeHeaders(Http::RequestHeaderMap& request_headers, bool) { absl::string_view path = request_headers.getPathValue(); @@ -306,6 +302,8 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat return Http::FilterDataStatus::Continue; } absl::string_view chunk(static_cast(data.linearize(len)), len); + // TODO(guoyilin42): Consider adding text/event-stream backend response support and explore if + // it needs buffering. std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); data.drain(len); diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h index 9cf00fbd01250..e6f16641d6fca 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h @@ -44,13 +44,16 @@ class McpJsonRestBridgeFilterConfig : public Logger::Loggable From bd8455cdd08bea05422fca40c5a7546d75e2c9aa Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Wed, 20 May 2026 21:35:40 +0000 Subject: [PATCH 3/6] Address comments. Signed-off-by: Yilin Guo --- .../v3/mcp_json_rest_bridge.proto | 20 +++++++++--------- .../mcp_json_rest_bridge_filter.cc | 21 ++++++++++++++----- .../mcp_json_rest_bridge_filter.h | 15 +++++++------ .../mcp_json_rest_bridge_filter_test.cc | 2 +- .../mcp_json_rest_bridge_integration_test.cc | 4 ++-- 5 files changed, 38 insertions(+), 24 deletions(-) diff --git a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto index ea69a151866e7..68990c2004bb2 100644 --- a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto +++ b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto @@ -188,6 +188,15 @@ message ServerToolConfig { // - If not provided: The ``tools/list`` request is passed through. This allows subsequent // extension or the backend itself to handle the tools/list request if they support it. HttpRule tool_list_http_rule = 3; +} + +// Configuration for a specific MCP tool. +message ToolConfig { + // Name of the tool. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // The HTTP configuration rules that apply to the normal backend. + HttpRule http_rule = 2; // Enables streaming transcoding for unstructured text responses (``content`` field of a result). // @@ -202,16 +211,7 @@ message ServerToolConfig { // output: [prefix+escaped_chunk1] → [escaped_chunk2] → [escaped_chunk3+suffix] // // Disabled by default. - bool text_content_streaming_enabled = 4; -} - -// Configuration for a specific MCP tool. -message ToolConfig { - // Name of the tool. - string name = 1 [(validate.rules).string = {min_len: 1}]; - - // The HTTP configuration rules that apply to the normal backend. - HttpRule http_rule = 2; + bool text_content_streaming_enabled = 3; } // Defines the schema of the JSON-RPC to REST mapping. It specifies how the "arguments" diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index ce9953e8db622..b8897cd8b8c0e 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -144,19 +144,27 @@ McpJsonRestBridgeFilterConfig::McpJsonRestBridgeFilterConfig( max_response_body_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config_, max_response_body_size, DEFAULT_MAX_RESPONSE_BODY_SIZE)) { for (const auto& tool : proto_config.tool_config().tools()) { - tool_to_http_rule_[tool.name()] = tool.http_rule(); + tool_entries_[tool.name()] = {tool.http_rule(), tool.text_content_streaming_enabled()}; } ENVOY_LOG(debug, "Received MCP JSON REST Bridge config: {}", proto_config_.DebugString()); } absl::StatusOr McpJsonRestBridgeFilterConfig::getHttpRule(absl::string_view tool_name) const { - auto it = tool_to_http_rule_.find(tool_name); - if (it == tool_to_http_rule_.end()) { + auto it = tool_entries_.find(tool_name); + if (it == tool_entries_.end()) { return absl::InvalidArgumentError( fmt::format("Failed to find http rule for tool_name: {}", tool_name)); } - return it->second; + return it->second.http_rule; +} + +bool McpJsonRestBridgeFilterConfig::textContentStreamingEnabled(absl::string_view tool_name) const { + auto it = tool_entries_.find(tool_name); + if (it == tool_entries_.end()) { + return false; + } + return it->second.text_content_streaming_enabled; } absl::StatusOr @@ -267,7 +275,7 @@ McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers // Streaming mode: pre-build the JSON-RPC prefix/suffix, strip Content-Length // (final size is unknown), and let the headers flow through immediately so // the client can start receiving data without waiting for the full body. - if (mcp_operation_ == McpOperation::ToolsCall && config_->textContentStreamingEnabled()) { + if (mcp_operation_ == McpOperation::ToolsCall && text_content_streaming_enabled_) { buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >= static_cast(Http::Code::BadRequest)); response_headers.removeContentLength(); @@ -593,6 +601,9 @@ void McpJsonRestBridgeFilter::mapMcpToolToApiBackend(const nlohmann::json& json_ return; } + // Set the per-request streaming flag based on the tool's config. + text_content_streaming_enabled_ = config_->textContentStreamingEnabled(tool_name); + const auto arguments_it = params.find(McpConstants::ARGUMENTS_FIELD); if (arguments_it != params.end() && !arguments_it->is_object()) { ENVOY_STREAM_LOG(error, "The arguments of the tool call request must be an object.", diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h index e6f16641d6fca..ff7aee1420293 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h @@ -50,14 +50,14 @@ class McpJsonRestBridgeFilterConfig : public Logger::Loggable - tool_to_http_rule_; + struct ToolEntry { + envoy::extensions::filters::http::mcp_json_rest_bridge::v3::HttpRule http_rule; + bool text_content_streaming_enabled; + }; + absl::flat_hash_map tool_entries_; envoy::extensions::filters::http::mcp_json_rest_bridge::v3::McpJsonRestBridge proto_config_; std::string fallback_protocol_version_; uint32_t max_request_body_size_; @@ -133,6 +133,9 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter, Buffer::OwnedImpl response_body_; std::string response_body_str_; + // Per-request streaming flag, set during tool lookup in mapMcpToolToApiBackend. + bool text_content_streaming_enabled_ = false; + // Streaming state for text_content_streaming_enabled. // prefix/suffix are pre-built once in encodeHeaders; an empty prefix signals // that the non-streaming (buffered) path is active. diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc index abfac3997ee2b..11bc7114625c2 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc @@ -1320,8 +1320,8 @@ class McpJsonRestBridgeStreamingFilterTest : public testing::Test { tools { name: "get_api_key" http_rule: { get: "/v1/apiKeys" } + text_content_streaming_enabled: true } - text_content_streaming_enabled: true } )pb"); config_ = std::make_shared(proto_config); diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc index dd7ae496ccb3d..788c52e1b32b5 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc @@ -559,7 +559,7 @@ TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingTranscoding) { http_rule: post: "/v1/{parent=projects/*}/keys" body: "key" - text_content_streaming_enabled: true + text_content_streaming_enabled: true )EOF"; initializeFilter(config); @@ -641,7 +641,7 @@ TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingErrorResponse) { http_rule: post: "/v1/{parent=projects/*}/keys" body: "key" - text_content_streaming_enabled: true + text_content_streaming_enabled: true )EOF"; initializeFilter(config); From d57c623bf392b9aa421556a995646fc741ccc1ee Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Tue, 26 May 2026 23:53:20 +0000 Subject: [PATCH 4/6] Address comments. Signed-off-by: Yilin Guo --- .../http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index b8897cd8b8c0e..41c5ba0948749 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -315,6 +315,11 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); data.drain(len); + // Note: UTF-8 structural validation (i.e., utf8_range::IsStructurallyValid) is omitted + // in the streaming fast-path due to the stateless nature of chunk processing (which lacks + // a stateful UTF-8 validator to track multi-byte character boundaries across chunk limits). + // If the upstream backend returns invalid UTF-8, it will be streamed to the client as-is, + // which may cause the client to fail parsing the final JSON. if (is_first_streaming_chunk_) { ENVOY_STREAM_LOG(debug, "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", From 4fc06bf40617d5ced5a17ddbf2e19ac17bda9ce2 Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Wed, 27 May 2026 18:31:52 +0000 Subject: [PATCH 5/6] Address comments. Signed-off-by: Yilin Guo --- .../mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index 41c5ba0948749..56c663e8b53bc 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -331,6 +331,9 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat *encoder_callbacks_, len, escaped_chunk.size()); } data.add(escaped_chunk); + // TODO(guoyilin42): There will be a case that end_stream is not set in the encodeData call. + // This is body + trailer case where encodeTrailer call represents the end of response body. + // In that case, we should add the streaming_json_suffix at encodeTrailer call. if (end_stream) { ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); data.add(streaming_json_suffix_); @@ -397,6 +400,10 @@ void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) { // Locate the empty-string placeholder for the text value: `"text":""`. std::string marker = absl::StrCat("\"", McpConstants::TEXT_FIELD, "\":\"\""); size_t pos = ref_json.find(marker); + if (pos == std::string::npos) { + IS_ENVOY_BUG("JSON-RPC streaming marker not found in serialized envelope"); + return; + } streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); } From 4038b237cf0a53d6ef7705bf62969fff858d001f Mon Sep 17 00:00:00 2001 From: Yilin Guo Date: Wed, 27 May 2026 21:50:28 +0000 Subject: [PATCH 6/6] Address comments. Signed-off-by: Yilin Guo --- .../mcp_json_rest_bridge_filter.cc | 7 +++--- .../mcp_json_rest_bridge_filter_test.cc | 22 ------------------- 2 files changed, 3 insertions(+), 26 deletions(-) diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index 56c663e8b53bc..3f1234f3cd579 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -305,10 +305,9 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat // buffering the full response body. if (!streaming_json_prefix_.empty()) { uint64_t len = data.length(); - if (len == 0 && !end_stream) { - ENVOY_STREAM_LOG(debug, "Streaming: skipping empty intermediate chunk.", *encoder_callbacks_); - return Http::FilterDataStatus::Continue; - } + // Note: An empty chunk can arrive when the upstream uses the body + trailer pattern (end_stream + // is false on the last data frame). It is a no-op here; the suffix will be appended in + // encodeTrailers. absl::string_view chunk(static_cast(data.linearize(len)), len); // TODO(guoyilin42): Consider adding text/event-stream backend response support and explore if // it needs buffering. diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc index 11bc7114625c2..c01d42a8350df 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter_test.cc @@ -1435,28 +1435,6 @@ TEST_F(McpJsonRestBridgeStreamingFilterTest, ErrorResponseSetsIsErrorTrue) { R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"Internal Server Error","type":"text"}],"isError":true}})json")); } -TEST_F(McpJsonRestBridgeStreamingFilterTest, EmptyIntermediateChunkIsSkipped) { - sendToolsCallRequest(); - - response_headers_ = {{":status", "200"}}; - EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), - Http::FilterHeadersStatus::Continue); - - Buffer::OwnedImpl empty_chunk; - EXPECT_EQ(filter_->encodeData(empty_chunk, /*end_stream=*/false), - Http::FilterDataStatus::Continue); - EXPECT_TRUE(empty_chunk.toString().empty()); - - // The subsequent real chunk should still get the prefix. - Buffer::OwnedImpl final_chunk("content"); - EXPECT_EQ(filter_->encodeData(final_chunk, /*end_stream=*/true), - Http::FilterDataStatus::Continue); - EXPECT_EQ( - nlohmann::json::parse(final_chunk.toString()), - nlohmann::json::parse( - R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"content","type":"text"}],"isError":false}})json")); -} - class McpHttpMethodFilterTest : public testing::TestWithParam { public: void SetUp() override {