diff --git a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto index a70e4cb4b4f8d..b6fe4fb5813cd 100644 --- a/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto +++ b/api/envoy/extensions/filters/http/mcp_json_rest_bridge/v3/mcp_json_rest_bridge.proto @@ -168,6 +168,21 @@ message ServerToolConfig { // - If not provided: The ``tools/list`` request is passed through. This allows subsequent // extension or the backend itself to handle the tools/list request if they support it. HttpRule tool_list_http_rule = 3; + + // Enables streaming transcoding for unstructured text responses (``content`` field of a result). + // + // When enabled, the response body is streamed directly to the client without buffering. Each + // chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix. + // + // Streaming flow: + // + // .. code-block:: text + // + // input: [chunk1] → [chunk2] → [chunk3] + // output: [prefix+escaped_chunk1] → [escaped_chunk2] → [escaped_chunk3+suffix] + // + // Disabled by default. + bool text_content_streaming_enabled = 4; } // Configuration for a specific MCP tool. diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc index 4d7c03d3f5139..c12487e0c0210 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.cc @@ -6,6 +6,8 @@ #include "envoy/http/filter.h" #include "envoy/http/header_map.h" +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/json_escape_string.h" #include "source/common/http/headers.h" #include "source/common/protobuf/utility.h" #include "source/extensions/filters/common/mcp/constants.h" @@ -61,7 +63,7 @@ json translateJsonRestResponseToJsonRpc(absl::string_view tool_call_response, {McpConstants::RESULT_FIELD, { {McpConstants::CONTENT_FIELD, - json::array({{{McpConstants::TYPE_FIELD, "text"}, + json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD}, {McpConstants::TEXT_FIELD, tool_call_response}}})}, {McpConstants::IS_ERROR_FIELD, is_error}, }}, @@ -165,6 +167,10 @@ McpJsonRestBridgeFilterConfig::getToolsListHttpRule() const { return proto_config_.tool_config().tool_list_http_rule(); } +bool McpJsonRestBridgeFilterConfig::textContentStreamingEnabled() const { + return proto_config_.tool_config().text_content_streaming_enabled(); +} + Http::FilterHeadersStatus McpJsonRestBridgeFilter::decodeHeaders(Http::RequestHeaderMap& request_headers, bool) { absl::string_view path = request_headers.getPathValue(); @@ -248,8 +254,8 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::decodeData(Buffer::Instance& dat return Http::FilterDataStatus::Continue; } -Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap&, - bool end_stream) { +Http::FilterHeadersStatus +McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers, bool end_stream) { switch (mcp_operation_) { case McpOperation::Unspecified: case McpOperation::Undecided: @@ -262,6 +268,17 @@ Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseH break; } + // Streaming mode: pre-build the JSON-RPC prefix/suffix, strip Content-Length + // (final size is unknown), and let the headers flow through immediately so + // the client can start receiving data without waiting for the full body. + if (mcp_operation_ == McpOperation::ToolsCall && config_->textContentStreamingEnabled()) { + buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >= + static_cast(Http::Code::BadRequest)); + response_headers.removeContentLength(); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + return Http::FilterHeadersStatus::Continue; + } + // TODO(guoyilin42): Handle headers-only upstream responses (e.g., 204 No Content). // Currently, these cases bypass transcoding, which can cause MCP SDKs to timeout // or throw exceptions because they expect a valid JSON-RPC response with a @@ -280,6 +297,36 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat return Http::FilterDataStatus::Continue; } + // Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without + // buffering the full response body. + if (!streaming_json_prefix_.empty()) { + uint64_t len = data.length(); + if (len == 0 && !end_stream) { + ENVOY_STREAM_LOG(debug, "Streaming: skipping empty intermediate chunk.", *encoder_callbacks_); + return Http::FilterDataStatus::Continue; + } + absl::string_view chunk(static_cast(data.linearize(len)), len); + std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk)); + + data.drain(len); + if (is_first_streaming_chunk_) { + ENVOY_STREAM_LOG(debug, + "Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, escaped_chunk.size()); + data.add(streaming_json_prefix_); + is_first_streaming_chunk_ = false; + } else { + ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).", + *encoder_callbacks_, len, escaped_chunk.size()); + } + data.add(escaped_chunk); + if (end_stream) { + ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_); + data.add(streaming_json_suffix_); + } + return Http::FilterDataStatus::Continue; + } + const uint32_t max_response_body_size = config_->maxResponseBodySize(); if (max_response_body_size > 0 && (response_body_.length() + data.length()) > max_response_body_size) { @@ -321,6 +368,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons return Http::FilterTrailersStatus::Continue; } +void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) { + // Build a reference JSON-RPC envelope with an empty text placeholder. + json ref = { + {McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION}, + {McpConstants::ID_FIELD, *session_id_}, + {McpConstants::RESULT_FIELD, + { + {McpConstants::CONTENT_FIELD, + json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD}, + {McpConstants::TEXT_FIELD, ""}}})}, + {McpConstants::IS_ERROR_FIELD, is_error}, + }}, + }; + std::string ref_json = ref.dump(); + + // Locate the empty-string placeholder for the text value: `"text":""`. + std::string marker = absl::StrCat("\"", McpConstants::TEXT_FIELD, "\":\"\""); + size_t pos = ref_json.find(marker); + streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1); + streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1); +} + void McpJsonRestBridgeFilter::handleMcpMethod(const nlohmann::json& json_rpc, Http::RequestHeaderMapOptRef request_headers) { ENVOY_STREAM_LOG(debug, "Handling MCP JSON-RPC: {}", *decoder_callbacks_, json_rpc.dump()); diff --git a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h index 3a7efeb8bc1f1..093a4259daffa 100644 --- a/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h +++ b/source/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_filter.h @@ -42,6 +42,7 @@ class McpJsonRestBridgeFilterConfig : public Logger::Loggable(proto_config); + filter_ = std::make_unique(config_); + filter_->setDecoderFilterCallbacks(decoder_callbacks_); + filter_->setEncoderFilterCallbacks(encoder_callbacks_); + EXPECT_CALL(decoder_callbacks_, requestHeaders()) + .WillRepeatedly(Return(Http::RequestHeaderMapOptRef(request_headers_))); + EXPECT_CALL(encoder_callbacks_, responseHeaders()) + .WillRepeatedly(Return(Http::ResponseHeaderMapOptRef(response_headers_))); + } + + void sendToolsCallRequest() { + request_headers_ = {{":method", "POST"}, {":path", "/mcp"}}; + ASSERT_EQ(filter_->decodeHeaders(request_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::StopIteration); + EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache()); + Buffer::OwnedImpl req( + R"json({"jsonrpc":"2.0","id":123,"method":"tools/call","params":{"name":"get_api_key"}})json"); + ASSERT_EQ(filter_->decodeData(req, /*end_stream=*/true), Http::FilterDataStatus::Continue); + } + + McpJsonRestBridgeFilterConfigSharedPtr config_; + std::unique_ptr filter_; + NiceMock decoder_callbacks_; + NiceMock encoder_callbacks_; + Http::TestRequestHeaderMapImpl request_headers_; + Http::TestResponseHeaderMapImpl response_headers_; +}; + +TEST_F(McpJsonRestBridgeStreamingFilterTest, SingleChunkReturnsFullJsonRpcResponse) { + sendToolsCallRequest(); + + response_headers_ = { + {":status", "200"}, {"content-type", "text/plain"}, {"content-length", "11"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json")); + EXPECT_FALSE(response_headers_.has(Http::Headers::get().ContentLength)); + + Buffer::OwnedImpl chunk("hello world"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue); + EXPECT_EQ( + nlohmann::json::parse(chunk.toString()), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"hello world","type":"text"}],"isError":false}})json")); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, MultipleChunksAreStreamedCorrectly) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}, {"content-length", "100"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + Buffer::OwnedImpl chunk1("part1"); + EXPECT_EQ(filter_->encodeData(chunk1, /*end_stream=*/false), Http::FilterDataStatus::Continue); + // First chunk contains the prefix + escaped "part1". + EXPECT_THAT(chunk1.toString(), testing::StartsWith("{\"id\":")); + + Buffer::OwnedImpl chunk2("part2"); + EXPECT_EQ(filter_->encodeData(chunk2, /*end_stream=*/false), Http::FilterDataStatus::Continue); + // Middle chunk is just "part2" (no JSON wrapper). + EXPECT_THAT(chunk2.toString(), StrEq("part2")); + + Buffer::OwnedImpl chunk3("part3"); + EXPECT_EQ(filter_->encodeData(chunk3, /*end_stream=*/true), Http::FilterDataStatus::Continue); + // Last chunk contains "part3" + the closing suffix. + EXPECT_THAT(chunk3.toString(), testing::EndsWith("}}")); + + // Reassemble and verify the full JSON-RPC response. + const std::string full = chunk1.toString() + chunk2.toString() + chunk3.toString(); + EXPECT_EQ( + nlohmann::json::parse(full), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"part1part2part3","type":"text"}],"isError":false}})json")); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, SpecialCharactersAreEscaped) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + // Content contains double-quotes, a backslash, a newline, and a tab. + Buffer::OwnedImpl chunk("{\"key\":\"val\\path\"\n\t}"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue); + + const nlohmann::json response = nlohmann::json::parse(chunk.toString()); + EXPECT_EQ(response["id"], 123); + EXPECT_EQ(response["result"]["isError"], false); + EXPECT_EQ(response["result"]["content"][0]["text"].get(), + "{\"key\":\"val\\path\"\n\t}"); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, ErrorResponseSetsIsErrorTrue) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "500"}, {"content-length", "21"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json")); + EXPECT_FALSE(response_headers_.has(Http::Headers::get().ContentLength)); + + Buffer::OwnedImpl chunk("Internal Server Error"); + EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue); + EXPECT_EQ( + nlohmann::json::parse(chunk.toString()), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"Internal Server Error","type":"text"}],"isError":true}})json")); +} + +TEST_F(McpJsonRestBridgeStreamingFilterTest, EmptyIntermediateChunkIsSkipped) { + sendToolsCallRequest(); + + response_headers_ = {{":status", "200"}}; + EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false), + Http::FilterHeadersStatus::Continue); + + Buffer::OwnedImpl empty_chunk; + EXPECT_EQ(filter_->encodeData(empty_chunk, /*end_stream=*/false), + Http::FilterDataStatus::Continue); + EXPECT_TRUE(empty_chunk.toString().empty()); + + // The subsequent real chunk should still get the prefix. + Buffer::OwnedImpl final_chunk("content"); + EXPECT_EQ(filter_->encodeData(final_chunk, /*end_stream=*/true), + Http::FilterDataStatus::Continue); + EXPECT_EQ( + nlohmann::json::parse(final_chunk.toString()), + nlohmann::json::parse( + R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"content","type":"text"}],"isError":false}})json")); +} + class McpHttpMethodFilterTest : public testing::TestWithParam { public: void SetUp() override { diff --git a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc index f79be0c43454b..dd7ae496ccb3d 100644 --- a/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc +++ b/test/extensions/filters/http/mcp_json_rest_bridge/mcp_json_rest_bridge_integration_test.cc @@ -548,5 +548,162 @@ TEST_P(McpJsonRestBridgeIntegrationTest, InitializeUnsupportedProtocolVersionFal EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_response)); } +TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingTranscoding) { + const std::string config = R"EOF( + name: envoy.filters.http.mcp_json_rest_bridge + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.mcp_json_rest_bridge.v3.McpJsonRestBridge + tool_config: + tools: + - name: "create_api_key" + http_rule: + post: "/v1/{parent=projects/*}/keys" + body: "key" + text_content_streaming_enabled: true + )EOF"; + + initializeFilter(config); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + const std::string request_body = R"({ + "jsonrpc": "2.0", + "id": 321, + "method": "tools/call", + "params": { + "name": "create_api_key", + "arguments": { + "parent": "projects/foo", + "key": { + "displayName": "bar" + } + } + } + })"; + + auto response = codec_client_->makeRequestWithBody( + Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/mcp"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-type", "application/json"}}, + request_body); + + waitForNextUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers().getMethodValue(), StrEq("POST")); + EXPECT_THAT(upstream_request_->headers().getPathValue(), StrEq("/v1/projects/foo/keys")); + + Http::TestResponseHeaderMapImpl response_headers; + response_headers.setStatus(200); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + response_headers.setContentLength(21); + upstream_request_->encodeHeaders(response_headers, false); + + Buffer::OwnedImpl chunk1; + chunk1.add(R"({"displayName":)"); + upstream_request_->encodeData(chunk1, false); + + Buffer::OwnedImpl chunk2; + chunk2.add(R"("bar"})"); + upstream_request_->encodeData(chunk2, true); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + + EXPECT_THAT(response->headers().getStatusValue(), StrEq("200")); + EXPECT_THAT(response->headers().getContentTypeValue(), StrEq("application/json")); + EXPECT_THAT(response->headers().getContentLengthValue(), IsEmpty()); + + const std::string expected_rpc_response = R"({ + "jsonrpc": "2.0", + "id": 321, + "result": { + "content": [ + { + "type": "text", + "text": "{\"displayName\":\"bar\"}" + } + ], + "isError": false + } + })"; + EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); +} + +TEST_P(McpJsonRestBridgeIntegrationTest, ToolsCallStreamingErrorResponse) { + const std::string config = R"EOF( + name: envoy.filters.http.mcp_json_rest_bridge + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.mcp_json_rest_bridge.v3.McpJsonRestBridge + tool_config: + tools: + - name: "create_api_key" + http_rule: + post: "/v1/{parent=projects/*}/keys" + body: "key" + text_content_streaming_enabled: true + )EOF"; + + initializeFilter(config); + + codec_client_ = makeHttpConnection(lookupPort("http")); + + const std::string request_body = R"({ + "jsonrpc": "2.0", + "id": 42, + "method": "tools/call", + "params": { + "name": "create_api_key", + "arguments": { + "parent": "projects/foo", + "key": { + "displayName": "bar" + } + } + } + })"; + + auto response = codec_client_->makeRequestWithBody( + Http::TestRequestHeaderMapImpl{{":method", "POST"}, + {":path", "/mcp"}, + {":scheme", "http"}, + {":authority", "host"}, + {"content-type", "application/json"}}, + request_body); + + waitForNextUpstreamRequest(); + + Http::TestResponseHeaderMapImpl response_headers; + response_headers.setStatus(500); + response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json); + upstream_request_->encodeHeaders(response_headers, false); + + Buffer::OwnedImpl response_data; + response_data.add("Internal Server Error"); + upstream_request_->encodeData(response_data, true); + + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_TRUE(upstream_request_->complete()); + + EXPECT_THAT(response->headers().getStatusValue(), StrEq("500")); + EXPECT_THAT(response->headers().getContentTypeValue(), StrEq("application/json")); + EXPECT_THAT(response->headers().getContentLengthValue(), IsEmpty()); + + const std::string expected_rpc_response = R"({ + "jsonrpc": "2.0", + "id": 42, + "result": { + "content": [ + { + "type": "text", + "text": "Internal Server Error" + } + ], + "isError": true + } + })"; + EXPECT_EQ(nlohmann::json::parse(response->body()), nlohmann::json::parse(expected_rpc_response)); +} + } // namespace } // namespace Envoy