Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ message ServerToolConfig {
// - If not provided: The ``tools/list`` request is passed through. This allows subsequent
// extension or the backend itself to handle the tools/list request if they support it.
HttpRule tool_list_http_rule = 3;

// Enables streaming transcoding for unstructured text responses (``content`` field of a result).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking through the implications of this change and would like to heard some feedbacks from @tyxia and @botengyao as well. With this added, it seems the only times we'd still need to buffer the response are for structured content or non-text content types.

To handle this moving forward, I'm thinking whether we should

  1. set this new improvement as the default behavior. For the original buffering approach, we add a TODO to improve it to handle structured content response
  2. use output_schema field (assume we will add it in the subsequent change) to decide whether the response is structured content type or not (instead of a separate flag). When the user configures the "output_schema" field, it implies that the tools call response is structured content and will be buffered.

//
// When enabled, the response body is streamed directly to the client without buffering. Each
// chunk is JSON escaped as it arrives and wrapped with a pre-built JSON-RPC prefix and suffix.
//
// Streaming flow:
//
// .. code-block:: text
//
// input: [chunk1] → [chunk2] → [chunk3]
// output: [prefix+escaped_chunk1] → [escaped_chunk2] → [escaped_chunk3+suffix]
//
// Disabled by default.
bool text_content_streaming_enabled = 4;
}

// Configuration for a specific MCP tool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "envoy/http/filter.h"
#include "envoy/http/header_map.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/json_escape_string.h"
#include "source/common/http/headers.h"
#include "source/common/protobuf/utility.h"
#include "source/extensions/filters/common/mcp/constants.h"
Expand Down Expand Up @@ -61,7 +63,7 @@ json translateJsonRestResponseToJsonRpc(absl::string_view tool_call_response,
{McpConstants::RESULT_FIELD,
{
{McpConstants::CONTENT_FIELD,
json::array({{{McpConstants::TYPE_FIELD, "text"},
json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD},
{McpConstants::TEXT_FIELD, tool_call_response}}})},
{McpConstants::IS_ERROR_FIELD, is_error},
}},
Expand Down Expand Up @@ -165,6 +167,10 @@ McpJsonRestBridgeFilterConfig::getToolsListHttpRule() const {
return proto_config_.tool_config().tool_list_http_rule();
}

bool McpJsonRestBridgeFilterConfig::textContentStreamingEnabled() const {
return proto_config_.tool_config().text_content_streaming_enabled();
}

Http::FilterHeadersStatus
McpJsonRestBridgeFilter::decodeHeaders(Http::RequestHeaderMap& request_headers, bool) {
absl::string_view path = request_headers.getPathValue();
Expand Down Expand Up @@ -248,8 +254,8 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::decodeData(Buffer::Instance& dat
return Http::FilterDataStatus::Continue;
}

Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap&,
bool end_stream) {
Http::FilterHeadersStatus
McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseHeaderMap& response_headers, bool end_stream) {
switch (mcp_operation_) {
case McpOperation::Unspecified:
case McpOperation::Undecided:
Expand All @@ -262,6 +268,17 @@ Http::FilterHeadersStatus McpJsonRestBridgeFilter::encodeHeaders(Http::ResponseH
break;
}

// Streaming mode: pre-build the JSON-RPC prefix/suffix, strip Content-Length
// (final size is unknown), and let the headers flow through immediately so
// the client can start receiving data without waiting for the full body.
if (mcp_operation_ == McpOperation::ToolsCall && config_->textContentStreamingEnabled()) {
buildStreamingPrefixAndSuffix(getResponseCode(response_headers) >=
static_cast<int>(Http::Code::BadRequest));
response_headers.removeContentLength();
response_headers.setContentType(Http::Headers::get().ContentTypeValues.Json);
return Http::FilterHeadersStatus::Continue;
}

// TODO(guoyilin42): Handle headers-only upstream responses (e.g., 204 No Content).
// Currently, these cases bypass transcoding, which can cause MCP SDKs to timeout
// or throw exceptions because they expect a valid JSON-RPC response with a
Expand All @@ -280,6 +297,36 @@ Http::FilterDataStatus McpJsonRestBridgeFilter::encodeData(Buffer::Instance& dat
return Http::FilterDataStatus::Continue;
}

// Streaming fast-path for tools/call: JSON-escape each chunk on-the-fly without
// buffering the full response body.
if (!streaming_json_prefix_.empty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The streaming path currently bypasses the max_response_body_size check. While streaming avoids the "unbounded buffering" issue mentioned in the proto documentation, it is still good practice to enforce a total response size limit to prevent resource exhaustion or abuse from a malicious upstream. Consider tracking the total bytes processed and enforcing the limit even in streaming mode.

uint64_t len = data.length();
if (len == 0 && !end_stream) {
ENVOY_STREAM_LOG(debug, "Streaming: skipping empty intermediate chunk.", *encoder_callbacks_);
return Http::FilterDataStatus::Continue;
}
absl::string_view chunk(static_cast<const char*>(data.linearize(len)), len);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a TODO to handle the edge case where the backend server returns text/event-stream format?

std::string escaped_chunk = JsonEscaper::escapeString(chunk, JsonEscaper::extraSpace(chunk));
Comment on lines +308 to +309
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling data.linearize(len) followed by JsonEscaper::escapeString (which performs a 2-pass scan and returns a new std::string) results in multiple copies and full scans of the data. For large chunks, this can be inefficient. If performance becomes an issue, consider processing the buffer slice-by-slice using data.getRawSlices() and modifying the escaper to append directly to a target buffer.


data.drain(len);
if (is_first_streaming_chunk_) {
ENVOY_STREAM_LOG(debug,
"Streaming: emitting prefix + first chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, escaped_chunk.size());
data.add(streaming_json_prefix_);
is_first_streaming_chunk_ = false;
} else {
ENVOY_STREAM_LOG(debug, "Streaming: forwarding chunk ({} raw bytes, {} escaped bytes).",
*encoder_callbacks_, len, escaped_chunk.size());
}
data.add(escaped_chunk);
if (end_stream) {
ENVOY_STREAM_LOG(debug, "Streaming: appending suffix, stream complete.", *encoder_callbacks_);
data.add(streaming_json_suffix_);
}
return Http::FilterDataStatus::Continue;
}

const uint32_t max_response_body_size = config_->maxResponseBodySize();
if (max_response_body_size > 0 &&
(response_body_.length() + data.length()) > max_response_body_size) {
Expand Down Expand Up @@ -321,6 +368,28 @@ Http::FilterTrailersStatus McpJsonRestBridgeFilter::encodeTrailers(Http::Respons
return Http::FilterTrailersStatus::Continue;
}

void McpJsonRestBridgeFilter::buildStreamingPrefixAndSuffix(bool is_error) {
// Build a reference JSON-RPC envelope with an empty text placeholder.
json ref = {
{McpConstants::JSONRPC_FIELD, McpConstants::JSONRPC_VERSION},
{McpConstants::ID_FIELD, *session_id_},
{McpConstants::RESULT_FIELD,
{
{McpConstants::CONTENT_FIELD,
json::array({{{McpConstants::TYPE_FIELD, McpConstants::TEXT_FIELD},
{McpConstants::TEXT_FIELD, ""}}})},
{McpConstants::IS_ERROR_FIELD, is_error},
}},
};
std::string ref_json = ref.dump();

// Locate the empty-string placeholder for the text value: `"text":""`.
std::string marker = absl::StrCat("\"", McpConstants::TEXT_FIELD, "\":\"\"");
size_t pos = ref_json.find(marker);
streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1);
streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1);
Comment on lines +388 to +390
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Relying on the exact string representation of a JSON dump to find a split point is fragile. While nlohmann::json is generally deterministic, changes in default formatting (e.g., whitespace or key ordering) could cause find to fail. At a minimum, you should check if the marker was found to avoid a crash or incorrect behavior.

Suggested change
size_t pos = ref_json.find(marker);
streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1);
streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1);
size_t pos = ref_json.find(marker);
if (pos == std::string::npos) {
IS_ENVOY_BUG("Failed to find marker in JSON-RPC envelope");
return;
}
streaming_json_prefix_ = ref_json.substr(0, pos + marker.size() - 1);
streaming_json_suffix_ = ref_json.substr(pos + marker.size() - 1);

}

void McpJsonRestBridgeFilter::handleMcpMethod(const nlohmann::json& json_rpc,
Http::RequestHeaderMapOptRef request_headers) {
ENVOY_STREAM_LOG(debug, "Handling MCP JSON-RPC: {}", *decoder_callbacks_, json_rpc.dump());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class McpJsonRestBridgeFilterConfig : public Logger::Loggable<Logger::Id::config

uint32_t maxRequestBodySize() const { return max_request_body_size_; }
uint32_t maxResponseBodySize() const { return max_response_body_size_; }
bool textContentStreamingEnabled() const;

private:
absl::flat_hash_map<std::string,
Expand Down Expand Up @@ -93,6 +94,9 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter,
// fails. Otherwise, it returns OK status.
absl::Status validateJsonRpcIdAndMethod(const nlohmann::json& json_rpc);

// Builds streaming_json_prefix_ and streaming_json_suffix_ for the tools/call streaming path.
void buildStreamingPrefixAndSuffix(bool is_error);

enum class McpOperation {
Unspecified = 0,
// Received the "/mcp" URL but has not parsed the request body yet.
Expand All @@ -116,6 +120,13 @@ class McpJsonRestBridgeFilter : public Http::PassThroughFilter,
Buffer::OwnedImpl response_body_;
std::string response_body_str_;

// Streaming state for text_content_streaming_enabled.
// prefix/suffix are pre-built once in encodeHeaders; an empty prefix signals
// that the non-streaming (buffered) path is active.
std::string streaming_json_prefix_;
std::string streaming_json_suffix_;
bool is_first_streaming_chunk_ = true;

McpJsonRestBridgeFilterConfigSharedPtr config_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,152 @@ TEST_F(McpJsonRestBridgeFilterTest, ResponseBodyExceedsLimitReturnsError) {
Http::FilterDataStatus::StopIterationNoBuffer);
}

class McpJsonRestBridgeStreamingFilterTest : public testing::Test {
public:
void SetUp() override {
envoy::extensions::filters::http::mcp_json_rest_bridge::v3::McpJsonRestBridge proto_config =
ParseTextProtoOrDie(R"pb(
tool_config {
tools {
name: "get_api_key"
http_rule: { get: "/v1/apiKeys" }
}
text_content_streaming_enabled: true
}
)pb");
config_ = std::make_shared<McpJsonRestBridgeFilterConfig>(proto_config);
filter_ = std::make_unique<McpJsonRestBridgeFilter>(config_);
filter_->setDecoderFilterCallbacks(decoder_callbacks_);
filter_->setEncoderFilterCallbacks(encoder_callbacks_);
EXPECT_CALL(decoder_callbacks_, requestHeaders())
.WillRepeatedly(Return(Http::RequestHeaderMapOptRef(request_headers_)));
EXPECT_CALL(encoder_callbacks_, responseHeaders())
.WillRepeatedly(Return(Http::ResponseHeaderMapOptRef(response_headers_)));
}

void sendToolsCallRequest() {
request_headers_ = {{":method", "POST"}, {":path", "/mcp"}};
ASSERT_EQ(filter_->decodeHeaders(request_headers_, /*end_stream=*/false),
Http::FilterHeadersStatus::StopIteration);
EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache());
Buffer::OwnedImpl req(
R"json({"jsonrpc":"2.0","id":123,"method":"tools/call","params":{"name":"get_api_key"}})json");
ASSERT_EQ(filter_->decodeData(req, /*end_stream=*/true), Http::FilterDataStatus::Continue);
}

McpJsonRestBridgeFilterConfigSharedPtr config_;
std::unique_ptr<McpJsonRestBridgeFilter> filter_;
NiceMock<Http::MockStreamDecoderFilterCallbacks> decoder_callbacks_;
NiceMock<Http::MockStreamEncoderFilterCallbacks> encoder_callbacks_;
Http::TestRequestHeaderMapImpl request_headers_;
Http::TestResponseHeaderMapImpl response_headers_;
};

TEST_F(McpJsonRestBridgeStreamingFilterTest, SingleChunkReturnsFullJsonRpcResponse) {
sendToolsCallRequest();

response_headers_ = {
{":status", "200"}, {"content-type", "text/plain"}, {"content-length", "11"}};
EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false),
Http::FilterHeadersStatus::Continue);
EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json"));
EXPECT_FALSE(response_headers_.has(Http::Headers::get().ContentLength));

Buffer::OwnedImpl chunk("hello world");
EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue);
EXPECT_EQ(
nlohmann::json::parse(chunk.toString()),
nlohmann::json::parse(
R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"hello world","type":"text"}],"isError":false}})json"));
}

TEST_F(McpJsonRestBridgeStreamingFilterTest, MultipleChunksAreStreamedCorrectly) {
sendToolsCallRequest();

response_headers_ = {{":status", "200"}, {"content-length", "100"}};
EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false),
Http::FilterHeadersStatus::Continue);

Buffer::OwnedImpl chunk1("part1");
EXPECT_EQ(filter_->encodeData(chunk1, /*end_stream=*/false), Http::FilterDataStatus::Continue);
// First chunk contains the prefix + escaped "part1".
EXPECT_THAT(chunk1.toString(), testing::StartsWith("{\"id\":"));

Buffer::OwnedImpl chunk2("part2");
EXPECT_EQ(filter_->encodeData(chunk2, /*end_stream=*/false), Http::FilterDataStatus::Continue);
// Middle chunk is just "part2" (no JSON wrapper).
EXPECT_THAT(chunk2.toString(), StrEq("part2"));

Buffer::OwnedImpl chunk3("part3");
EXPECT_EQ(filter_->encodeData(chunk3, /*end_stream=*/true), Http::FilterDataStatus::Continue);
// Last chunk contains "part3" + the closing suffix.
EXPECT_THAT(chunk3.toString(), testing::EndsWith("}}"));

// Reassemble and verify the full JSON-RPC response.
const std::string full = chunk1.toString() + chunk2.toString() + chunk3.toString();
EXPECT_EQ(
nlohmann::json::parse(full),
nlohmann::json::parse(
R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"part1part2part3","type":"text"}],"isError":false}})json"));
}

TEST_F(McpJsonRestBridgeStreamingFilterTest, SpecialCharactersAreEscaped) {
sendToolsCallRequest();

response_headers_ = {{":status", "200"}};
EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false),
Http::FilterHeadersStatus::Continue);

// Content contains double-quotes, a backslash, a newline, and a tab.
Buffer::OwnedImpl chunk("{\"key\":\"val\\path\"\n\t}");
EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue);

const nlohmann::json response = nlohmann::json::parse(chunk.toString());
EXPECT_EQ(response["id"], 123);
EXPECT_EQ(response["result"]["isError"], false);
EXPECT_EQ(response["result"]["content"][0]["text"].get<std::string>(),
"{\"key\":\"val\\path\"\n\t}");
}

TEST_F(McpJsonRestBridgeStreamingFilterTest, ErrorResponseSetsIsErrorTrue) {
sendToolsCallRequest();

response_headers_ = {{":status", "500"}, {"content-length", "21"}};
EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false),
Http::FilterHeadersStatus::Continue);
EXPECT_THAT(response_headers_.getContentTypeValue(), StrEq("application/json"));
EXPECT_FALSE(response_headers_.has(Http::Headers::get().ContentLength));

Buffer::OwnedImpl chunk("Internal Server Error");
EXPECT_EQ(filter_->encodeData(chunk, /*end_stream=*/true), Http::FilterDataStatus::Continue);
EXPECT_EQ(
nlohmann::json::parse(chunk.toString()),
nlohmann::json::parse(
R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"Internal Server Error","type":"text"}],"isError":true}})json"));
}

TEST_F(McpJsonRestBridgeStreamingFilterTest, EmptyIntermediateChunkIsSkipped) {
sendToolsCallRequest();

response_headers_ = {{":status", "200"}};
EXPECT_EQ(filter_->encodeHeaders(response_headers_, /*end_stream=*/false),
Http::FilterHeadersStatus::Continue);

Buffer::OwnedImpl empty_chunk;
EXPECT_EQ(filter_->encodeData(empty_chunk, /*end_stream=*/false),
Http::FilterDataStatus::Continue);
EXPECT_TRUE(empty_chunk.toString().empty());

// The subsequent real chunk should still get the prefix.
Buffer::OwnedImpl final_chunk("content");
EXPECT_EQ(filter_->encodeData(final_chunk, /*end_stream=*/true),
Http::FilterDataStatus::Continue);
EXPECT_EQ(
nlohmann::json::parse(final_chunk.toString()),
nlohmann::json::parse(
R"json({"id":123,"jsonrpc":"2.0","result":{"content":[{"text":"content","type":"text"}],"isError":false}})json"));
}

class McpHttpMethodFilterTest : public testing::TestWithParam<std::string> {
public:
void SetUp() override {
Expand Down
Loading
Loading