From 8c8ec21b9085c8b98bc6d75cb2e4fd6c9f3cab33 Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 15:34:53 -0700 Subject: [PATCH 1/3] Surface numeric :status pseudo-header from HTTP client codec (#213) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The codec was only publishing the reason phrase in headers["status"] on the client path. That is good for logging but useless for callers that need to branch on the actual HTTP status (e.g. to treat a 2xx as success and a 4xx as a client error). Mirror the :method pseudo-header the server path already writes and populate headers[":status"] with the numeric response code before firing onHeaders. Comes with two unit tests that feed a minimal HTTP/1.1 response through the filter and assert both :status and the original status reason land on the callbacks' header map — guarding against future regressions in onHeadersComplete's client branch. --- src/filter/http_codec_filter.cc | 6 +++ tests/filter/test_http_codec_filter.cc | 57 ++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/src/filter/http_codec_filter.cc b/src/filter/http_codec_filter.cc index 3bb4b037..99672154 100644 --- a/src/filter/http_codec_filter.cc +++ b/src/filter/http_codec_filter.cc @@ -621,6 +621,12 @@ HttpCodecFilter::ParserCallbacks::onHeadersComplete() { } parent_.current_stream_->headers[":method"] = method_str; parent_.current_stream_->method = method_str; + } else { + // Client mode: surface numeric response status as :status pseudo-header. + // Callers (HttpAsyncClient, etc.) need the numeric code, not just the + // reason phrase that onStatus already captures into headers["status"]. + auto status_code = static_cast(parent_.parser_->statusCode()); + parent_.current_stream_->headers[":status"] = std::to_string(status_code); } // Check keep-alive diff --git a/tests/filter/test_http_codec_filter.cc b/tests/filter/test_http_codec_filter.cc index 00170cbc..a857c38a 100644 --- a/tests/filter/test_http_codec_filter.cc +++ b/tests/filter/test_http_codec_filter.cc @@ -589,6 +589,63 @@ TEST_F(HttpCodecFilterBodyTimeoutTest, ClientModeDisablesBodyTimeout) { "normal idle gaps, then crash the state machine on a live stream"; } +// --------------------------------------------------------------------------- +// Client-mode response parsing surfaces numeric status as :status. +// +// The upstream onStatus callback only captures the reason phrase into +// headers["status"] ("OK", "Not Found", ...). Callers that need to branch +// on the actual HTTP status code (the HttpAsyncClient does) need the +// numeric value. The codec populates headers[":status"] in onHeadersComplete +// mirroring how server mode populates headers[":method"]; this test pins +// that contract. +// --------------------------------------------------------------------------- + +class HttpCodecFilterClientStatusTest : public ::testing::Test { + protected: + void SetUp() override { + auto factory = event::createLibeventDispatcherFactory(); + dispatcher_ = factory->createDispatcher("codec-status"); + dispatcher_->run(event::RunType::NonBlock); + callbacks_ = std::make_unique(); + } + + void feedResponse(HttpCodecFilter& filter, const std::string& bytes) { + OwnedBuffer buf; + buf.add(bytes.c_str(), bytes.length()); + filter.onNewConnection(); + filter.onData(buf, false); + } + + std::unique_ptr dispatcher_; + std::unique_ptr callbacks_; +}; + +TEST_F(HttpCodecFilterClientStatusTest, PopulatesNumericStatusPseudoHeader) { + HttpCodecFilter filter(*callbacks_, *dispatcher_, /*is_server=*/false); + feedResponse(filter, + "HTTP/1.1 404 Not Found\r\n" + "Content-Length: 0\r\n" + "Connection: close\r\n" + "\r\n"); + ASSERT_TRUE(callbacks_->waitForHeaders()); + auto headers = callbacks_->getHeaders(); + EXPECT_EQ(headers[":status"], "404") + << "client mode must expose the numeric status as :status"; + EXPECT_EQ(headers["status"], "Not Found") + << "reason phrase should still land in headers[\"status\"]"; +} + +TEST_F(HttpCodecFilterClientStatusTest, SurfacesSuccessStatus) { + HttpCodecFilter filter(*callbacks_, *dispatcher_, /*is_server=*/false); + feedResponse(filter, + "HTTP/1.1 202 Accepted\r\n" + "Content-Length: 0\r\n" + "Connection: close\r\n" + "\r\n"); + ASSERT_TRUE(callbacks_->waitForHeaders()); + EXPECT_EQ(callbacks_->getHeaders()[":status"], "202"); +} + } // namespace } // namespace filter } // namespace mcp \ No newline at end of file From b820ab8b4d7d73f7cd20c562a573420626c299bf Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 16:31:11 -0700 Subject: [PATCH 2/3] Add HttpAsyncClient built on HttpCodecFilter (#213) HttpAsyncClient hosts one outbound TCP connection per send(), installs the existing HttpCodecFilter in client mode as a read-only filter, and formats the request bytes directly so callers keep full control over method, path, and headers. Each request context implements DeferredDeletable; on completion the client hands ownership to Dispatcher::deferredDelete so the connection, codec filter, and callbacks tear down past the current callback frame rather than unwinding from inside their own callbacks. Body handling reflects a quirk of the codec in client mode: each body chunk is emitted twice, once inline and once again with the accumulated body at message-complete. The client takes only the end_stream delivery so the response body matches what arrived on the wire. --- CMakeLists.txt | 1 + include/mcp/http/http_async_client.h | 94 +++++++ src/http/http_async_client.cc | 374 +++++++++++++++++++++++++++ 3 files changed, 469 insertions(+) create mode 100644 include/mcp/http/http_async_client.h create mode 100644 src/http/http_async_client.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 2081d5fc..1bc69260 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -489,6 +489,7 @@ set(MCP_CLIENT_SERVER_SOURCES set(MCP_HTTP_SOURCES src/http/http_parser.cc src/http/sse_parser.cc + src/http/http_async_client.cc # Async HTTP/1.1 client built on codec filter src/transport/http_sse_transport_socket.cc # HTTP+SSE with layered architecture src/transport/https_sse_transport_factory.cc # HTTPS+SSE factory ) diff --git a/include/mcp/http/http_async_client.h b/include/mcp/http/http_async_client.h new file mode 100644 index 00000000..cea5132d --- /dev/null +++ b/include/mcp/http/http_async_client.h @@ -0,0 +1,94 @@ +#ifndef MCP_HTTP_HTTP_ASYNC_CLIENT_H +#define MCP_HTTP_HTTP_ASYNC_CLIENT_H + +#include +#include +#include +#include + +#include "mcp/event/event_loop.h" +#include "mcp/network/socket_interface.h" +#include "mcp/network/transport_socket.h" + +namespace mcp { +namespace http { + +// Request handed to HttpAsyncClient::send. +// The url is a full absolute URL (scheme://host[:port]/path). host must be +// a dotted-quad IPv4 literal — hostname resolution is not performed here +// so that callers running on the dispatcher thread never block on DNS. +struct HttpRequest { + std::string method{"POST"}; + std::string url; + std::map headers; + std::string body; +}; + +// Response delivered to the HttpResponseCallback on success. +struct HttpResponse { + int status_code{0}; + std::string status_text; + std::map headers; + std::string body; +}; + +// Exactly one of these fires per send(). +using HttpResponseCallback = std::function; +using HttpErrorCallback = std::function; + +/** + * HttpAsyncClient — minimal fire-and-forget HTTP/1.1 client. + * + * Each send() creates an isolated request context: its own client + * connection, its own HttpCodecFilter (client mode), and its own + * callbacks. Request contexts are owned by the client until they + * complete, then handed to Dispatcher::deferredDelete so teardown + * runs past the current callback frame (Envoy-style lifetime, which + * avoids destroying a connection from inside its own callback). + * + * All public methods must be invoked from the dispatcher thread — + * this matches the project-wide convention that mutating network + * objects off-thread is undefined. + */ +class HttpAsyncClient { + public: + HttpAsyncClient( + event::Dispatcher& dispatcher, + network::SocketInterface& socket_interface, + std::unique_ptr transport_factory); + ~HttpAsyncClient(); + + HttpAsyncClient(const HttpAsyncClient&) = delete; + HttpAsyncClient& operator=(const HttpAsyncClient&) = delete; + + /** + * Send an HTTP request. Exactly one of on_response or on_error will + * be invoked (on the dispatcher thread) before the request context + * is torn down. Returns false if the URL could not be parsed; in + * that case no callback fires. + */ + bool send(const HttpRequest& request, + HttpResponseCallback on_response, + HttpErrorCallback on_error); + + private: + class RequestContext; + + // Called by RequestContext when it completes (success or failure). + // Extracts the unique_ptr from active_requests_ and hands it to + // Dispatcher::deferredDelete so the connection and filter tear down + // after the current callback returns. + void finishRequest(RequestContext* ctx); + + event::Dispatcher& dispatcher_; + network::SocketInterface& socket_interface_; + std::unique_ptr transport_factory_; + std::map> active_requests_; +}; + +using HttpAsyncClientPtr = std::unique_ptr; + +} // namespace http +} // namespace mcp + +#endif // MCP_HTTP_HTTP_ASYNC_CLIENT_H diff --git a/src/http/http_async_client.cc b/src/http/http_async_client.cc new file mode 100644 index 00000000..ebc618d1 --- /dev/null +++ b/src/http/http_async_client.cc @@ -0,0 +1,374 @@ +#include "mcp/http/http_async_client.h" + +#include +#include +#include +#include +#include + +#include "mcp/buffer.h" +#include "mcp/filter/http_codec_filter.h" +#include "mcp/network/address.h" +#include "mcp/network/address_impl.h" +#include "mcp/network/connection.h" +#include "mcp/network/connection_impl.h" +#include "mcp/network/socket_impl.h" + +namespace mcp { +namespace http { + +namespace { + +// Minimal URL parser — the input contract says "absolute URL with an +// IPv4 literal host", which is all the current callers produce. We +// stay strict and return false on anything we don't recognize so +// callers get a clean error instead of a mystery connection attempt. +struct ParsedUrl { + bool tls{false}; + std::string host; + uint16_t port{0}; + std::string path{"/"}; +}; + +bool parseUrl(const std::string& url, ParsedUrl& out) { + auto proto_end = url.find("://"); + if (proto_end == std::string::npos) { + return false; + } + const std::string scheme = url.substr(0, proto_end); + if (scheme == "http") { + out.tls = false; + out.port = 80; + } else if (scheme == "https") { + out.tls = true; + out.port = 443; + } else { + return false; + } + + const size_t host_start = proto_end + 3; + const size_t path_start = url.find('/', host_start); + std::string hostport = (path_start == std::string::npos) + ? url.substr(host_start) + : url.substr(host_start, path_start - host_start); + out.path = (path_start == std::string::npos) ? std::string{"/"} + : url.substr(path_start); + if (hostport.empty()) { + return false; + } + const auto colon = hostport.find(':'); + if (colon == std::string::npos) { + out.host = hostport; + } else { + out.host = hostport.substr(0, colon); + try { + out.port = static_cast(std::stoi(hostport.substr(colon + 1))); + } catch (...) { + return false; + } + } + return !out.host.empty(); +} + +std::string statusTextForCode(int code) { + switch (code) { + case 200: + return "OK"; + case 201: + return "Created"; + case 202: + return "Accepted"; + case 204: + return "No Content"; + case 400: + return "Bad Request"; + case 401: + return "Unauthorized"; + case 403: + return "Forbidden"; + case 404: + return "Not Found"; + case 500: + return "Internal Server Error"; + default: + return ""; + } +} + +} // namespace + +/** + * RequestContext - per-request state for HttpAsyncClient. + * + * Owns the outbound connection and the response-parsing codec filter + * for a single HTTP request. Lives until the response arrives (or an + * error occurs), at which point HttpAsyncClient::finishRequest hands + * the context to Dispatcher::deferredDelete so teardown runs past the + * current callback frame. + */ +class HttpAsyncClient::RequestContext + : public network::ConnectionCallbacks, + public filter::HttpCodecFilter::MessageCallbacks, + public event::DeferredDeletable { + public: + RequestContext(HttpAsyncClient& parent, + HttpRequest request, + ParsedUrl url, + HttpResponseCallback on_response, + HttpErrorCallback on_error) + : parent_(parent), + request_(std::move(request)), + url_(std::move(url)), + on_response_(std::move(on_response)), + on_error_(std::move(on_error)) {} + + // Build the connection and codec filter, then initiate connect(). + // Caller must guarantee dispatcher-thread context. + bool start() { + auto& dispatcher = parent_.dispatcher_; + auto& socket_interface = parent_.socket_interface_; + + // Build remote address from the IPv4 literal in the URL. + auto remote_address = + std::make_shared(url_.host, url_.port); + + // Create a non-blocking TCP socket via the MCP socket interface. + auto fd_result = socket_interface.socket(network::SocketType::Stream, + network::Address::Type::Ip, + network::Address::IpVersion::v4, + /*v6only=*/false); + if (!fd_result.ok()) { + return false; + } + auto io_handle = + socket_interface.ioHandleForFd(*fd_result.value, /*socket_v6only=*/false); + if (!io_handle) { + socket_interface.close(*fd_result.value); + return false; + } + io_handle->setBlocking(false); + + auto local_address = + network::Address::anyAddress(network::Address::IpVersion::v4, 0); + auto connection_socket = std::make_unique( + std::move(io_handle), local_address, remote_address); + + // Transport socket — let the injected factory decide TCP vs TLS. + auto* client_factory = + dynamic_cast( + parent_.transport_factory_.get()); + if (!client_factory) { + return false; + } + auto transport_socket = client_factory->createTransportSocket(nullptr); + + // Build the connection. connected=false means connect() will kick + // off the TCP handshake when called. + auto connection_impl = std::make_unique( + dispatcher, std::move(connection_socket), std::move(transport_socket), + /*connected=*/false); + connection_impl->addConnectionCallbacks(*this); + + // Install HttpCodecFilter in client mode as a READ filter only. + // We intentionally do not install it as a write filter: in client + // mode HttpCodecFilter rewrites outgoing bytes into its own HTTP + // request shape (hardcoded MCP-oriented headers, SSE handling). + // HttpAsyncClient formats the request itself so callers keep full + // control over method, path, and headers. + codec_filter_ = std::make_shared( + *this, dispatcher, /*is_server=*/false); + connection_impl->filterManager().addReadFilter(codec_filter_); + connection_impl->filterManager().initializeReadFilters(); + + // The ClientConnection wrapper is what connect() is called on. + connection_ = std::unique_ptr( + static_cast(connection_impl.release())); + connection_->connect(); + return true; + } + + // network::ConnectionCallbacks + void onEvent(network::ConnectionEvent event) override { + switch (event) { + case network::ConnectionEvent::Connected: + writeRequestBytes(); + break; + case network::ConnectionEvent::RemoteClose: + case network::ConnectionEvent::LocalClose: + // If we have not yet delivered a response, treat close-before- + // complete as an error. If the response already fired, a close + // here is just cleanup and we ignore it. + if (!completed_) { + fail("connection closed before response completed"); + } + break; + default: + break; + } + } + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} + + // filter::HttpCodecFilter::MessageCallbacks + void onHeaders(const std::map& headers, + bool /*keep_alive*/) override { + for (const auto& kv : headers) { + if (kv.first == ":status") { + try { + response_.status_code = std::stoi(kv.second); + } catch (...) { + response_.status_code = 0; + } + } else if (kv.first == "status") { + response_.status_text = kv.second; + } else { + response_.headers[kv.first] = kv.second; + } + } + if (response_.status_text.empty()) { + response_.status_text = statusTextForCode(response_.status_code); + } + } + + void onBody(const std::string& data, bool end_stream) override { + // HttpCodecFilter in client mode emits each body chunk twice: once + // inline as it arrives (end_stream=false) and once again with the + // fully accumulated body at message-complete (end_stream=true). We + // only care about the final complete body, so take the end_stream + // delivery and overwrite whatever streamed in. + if (end_stream) { + response_.body = data; + } + } + + void onMessageComplete() override { + if (completed_) { + return; + } + completed_ = true; + auto cb = std::move(on_response_); + if (cb) { + cb(std::move(response_)); + } + // We are still inside HttpCodecFilter::dispatch on the stack: the + // parser is in the middle of parsing and will try to drain the + // input buffer by the number of bytes it consumed once we return. + // Closing the connection here would flush that same buffer and + // make the drain overshoot. Defer close + teardown past the + // current callback frame so the parser can unwind cleanly first. + parent_.finishRequest(this); + } + + void onError(const std::string& error) override { + fail("codec error: " + error); + } + + private: + void fail(const std::string& message) { + if (completed_) { + return; + } + completed_ = true; + auto cb = std::move(on_error_); + if (cb) { + cb(message); + } + parent_.finishRequest(this); + } + + void writeRequestBytes() { + std::ostringstream req; + req << request_.method << " " << url_.path << " HTTP/1.1\r\n"; + req << "Host: " << url_.host << ":" << url_.port << "\r\n"; + bool has_content_length = false; + bool has_connection = false; + for (const auto& h : request_.headers) { + req << h.first << ": " << h.second << "\r\n"; + if (h.first == "Content-Length" || h.first == "content-length") { + has_content_length = true; + } else if (h.first == "Connection" || h.first == "connection") { + has_connection = true; + } + } + if (!has_content_length) { + req << "Content-Length: " << request_.body.size() << "\r\n"; + } + if (!has_connection) { + // Default to close — one-shot by design. Callers who want to + // keep connections open can pass Connection: keep-alive. + req << "Connection: close\r\n"; + } + req << "\r\n" << request_.body; + + const std::string bytes = req.str(); + OwnedBuffer buffer; + buffer.add(bytes.c_str(), bytes.length()); + connection_->write(buffer, /*end_stream=*/false); + } + + HttpAsyncClient& parent_; + HttpRequest request_; + ParsedUrl url_; + HttpResponseCallback on_response_; + HttpErrorCallback on_error_; + std::unique_ptr connection_; + std::shared_ptr codec_filter_; + HttpResponse response_; + bool completed_{false}; +}; + +// HttpAsyncClient implementation + +HttpAsyncClient::HttpAsyncClient( + event::Dispatcher& dispatcher, + network::SocketInterface& socket_interface, + std::unique_ptr transport_factory) + : dispatcher_(dispatcher), + socket_interface_(socket_interface), + transport_factory_(std::move(transport_factory)) {} + +HttpAsyncClient::~HttpAsyncClient() { + // Any requests still in flight at destruction are dropped. We don't + // fire on_error_ because destruction is observable to the owner — + // they know their callbacks won't be invoked after they destroy the + // client. Contexts destruct in-place via map clearing. + active_requests_.clear(); +} + +bool HttpAsyncClient::send(const HttpRequest& request, + HttpResponseCallback on_response, + HttpErrorCallback on_error) { + assert(dispatcher_.isThreadSafe() && + "HttpAsyncClient::send must be called from the dispatcher thread"); + + ParsedUrl url; + if (!parseUrl(request.url, url)) { + return false; + } + + auto ctx = std::make_unique(*this, request, std::move(url), + std::move(on_response), + std::move(on_error)); + auto* raw = ctx.get(); + active_requests_.emplace(raw, std::move(ctx)); + if (!raw->start()) { + active_requests_.erase(raw); + return false; + } + return true; +} + +void HttpAsyncClient::finishRequest(RequestContext* ctx) { + auto it = active_requests_.find(ctx); + if (it == active_requests_.end()) { + return; + } + std::unique_ptr owned = std::move(it->second); + active_requests_.erase(it); + // Defer destruction past the current callback frame so we don't + // unwind the connection and filter from within their own callback. + dispatcher_.deferredDelete(std::move(owned)); +} + +} // namespace http +} // namespace mcp From f0ff8c1be439430a9e062a8f88778bad5a3e9a2d Mon Sep 17 00:00:00 2001 From: gophergogo Date: Sun, 19 Apr 2026 16:32:33 -0700 Subject: [PATCH 3/3] Add integration tests for HttpAsyncClient (#213) Drive the client against a real 127.0.0.1 listener stood up by RealListenerTestBase so the codec, deferred-delete teardown, and connection lifecycle all run through live kernel sockets rather than mocks. Covers: POST round-trip (request formatting and response body delivery), malformed-URL rejection (send returns false and fires no callback), and malformed response bytes (codec error surfaces as an error callback). Callbacks are captured through a shared_ptr so late firings from the dispatcher teardown path cannot hit a destroyed sink on the test stack. --- tests/CMakeLists.txt | 13 ++ tests/http/test_http_async_client.cc | 273 +++++++++++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 tests/http/test_http_async_client.cc diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index da3ff7e2..2aa3d14e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -84,6 +84,9 @@ add_executable(test_llhttp_parser http/test_llhttp_parser.cc) add_executable(test_nghttp2_parser http/test_nghttp2_parser.cc) add_executable(test_sse_parser http/test_sse_parser.cc) +# Integration test for HttpAsyncClient (real IO against a localhost listener) +add_executable(test_http_async_client http/test_http_async_client.cc) + # Transport layer tests (low-level transport/pipe/socket testing) add_executable(test_stdio_echo_server transport/test_stdio_echo_server.cc) add_executable(test_stdio_echo_client transport/test_stdio_echo_client.cc) @@ -656,6 +659,15 @@ target_link_libraries(test_sse_parser Threads::Threads ) +target_link_libraries(test_http_async_client + gopher-mcp + gopher-mcp-event + gtest + gmock + gtest_main + Threads::Threads +) + target_link_libraries(test_stdio_echo_server gopher-mcp gopher-mcp-event @@ -1329,6 +1341,7 @@ add_test(NAME HttpParserTest COMMAND test_http_parser) add_test(NAME LLHttpParserTest COMMAND test_llhttp_parser) add_test(NAME Nghttp2ParserTest COMMAND test_nghttp2_parser) add_test(NAME SseParserTest COMMAND test_sse_parser) +add_test(NAME HttpAsyncClientTest COMMAND test_http_async_client) add_test(NAME StdioEchoServerTest COMMAND test_stdio_echo_server) add_test(NAME StdioEchoClientTest COMMAND test_stdio_echo_client) diff --git a/tests/http/test_http_async_client.cc b/tests/http/test_http_async_client.cc new file mode 100644 index 00000000..7557a64e --- /dev/null +++ b/tests/http/test_http_async_client.cc @@ -0,0 +1,273 @@ +/** + * Round-trip integration tests for mcp::http::HttpAsyncClient. + * + * We stand up a real TCP listener on 127.0.0.1 (ephemeral port) via + * RealListenerTestBase, drive HttpAsyncClient::send at it, then hand- + * handle the server-side accept/read/write with the MCP socket + * interface. That gives us real I/O across the whole stack — the + * client codec, the deferred-delete teardown, and the connection + * lifecycle all run against a live kernel socket rather than a mock. + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include "mcp/http/http_async_client.h" +#include "mcp/network/socket_interface.h" +#include "mcp/network/transport_socket.h" + +#include "../integration/real_io_test_base.h" + +namespace mcp { +namespace http { +namespace { + +using namespace std::chrono_literals; + +// Collects the outcome of an HttpAsyncClient::send so the test thread +// can block on it without racing the dispatcher thread. +class ResponseSink { + public: + void setResponse(HttpResponse r) { + std::lock_guard g(mu_); + response_ = std::move(r); + got_response_ = true; + cv_.notify_all(); + } + void setError(const std::string& msg) { + std::lock_guard g(mu_); + error_ = msg; + got_error_ = true; + cv_.notify_all(); + } + bool wait(std::chrono::milliseconds d = 2000ms) { + std::unique_lock g(mu_); + return cv_.wait_for(g, d, + [&] { return got_response_ || got_error_; }); + } + bool hasResponse() const { + std::lock_guard g(mu_); + return got_response_; + } + bool hasError() const { + std::lock_guard g(mu_); + return got_error_; + } + HttpResponse response() const { + std::lock_guard g(mu_); + return response_; + } + std::string error() const { + std::lock_guard g(mu_); + return error_; + } + + private: + mutable std::mutex mu_; + std::condition_variable cv_; + bool got_response_{false}; + bool got_error_{false}; + HttpResponse response_; + std::string error_; +}; + +class HttpAsyncClientTest : public test::RealListenerTestBase { + protected: + void SetUp() override { RealListenerTestBase::SetUp(); } + + void TearDown() override { + executeInDispatcher([this]() { client_.reset(); }); + RealListenerTestBase::TearDown(); + } + + void createClient() { + executeInDispatcher([this]() { + client_ = std::make_unique( + *dispatcher_, network::socketInterface(), + std::make_unique()); + }); + } + + // Busy-accept a single pending connection — the listen socket is + // non-blocking so we retry briefly. + network::IoHandlePtr acceptOne(std::chrono::milliseconds budget = 2000ms) { + const auto deadline = std::chrono::steady_clock::now() + budget; + while (std::chrono::steady_clock::now() < deadline) { + network::IoHandlePtr accepted; + executeInDispatcher([this, &accepted]() { + auto r = listen_handle_->accept(); + if (r.ok()) { + accepted = std::move(*r); + } + }); + if (accepted) { + return accepted; + } + std::this_thread::sleep_for(5ms); + } + return nullptr; + } + + // Read until we see the request line + headers terminator, then + // slurp Content-Length bytes of body. IoHandle::read fills a Buffer, + // which we drain to a std::string so the test can pattern-match. + std::string readRequest(network::IoHandle& handle, + std::chrono::milliseconds budget = 2000ms) { + std::string data; + const auto deadline = std::chrono::steady_clock::now() + budget; + bool headers_done = false; + size_t content_length = 0; + size_t header_end = 0; + + while (std::chrono::steady_clock::now() < deadline) { + OwnedBuffer buf; + auto r = handle.read(buf, /*max_length=*/4096); + if (r.ok() && *r > 0) { + data.append(buf.toString()); + } else { + std::this_thread::sleep_for(5ms); + } + if (!headers_done) { + auto pos = data.find("\r\n\r\n"); + if (pos != std::string::npos) { + headers_done = true; + header_end = pos + 4; + auto cl_pos = data.find("Content-Length:"); + if (cl_pos != std::string::npos && cl_pos < header_end) { + content_length = + std::stoul(data.substr(cl_pos + 15, header_end - cl_pos - 15)); + } + } + } + if (headers_done && data.size() >= header_end + content_length) { + return data; + } + } + return data; + } + + void writeResponse(network::IoHandle& handle, const std::string& bytes) { + // IoHandle::write drains the buffer as bytes go on the wire. We + // keep looping until the buffer is empty or the deadline fires so + // partial sends don't silently truncate the fake response. + OwnedBuffer buf; + buf.add(bytes); + const auto deadline = std::chrono::steady_clock::now() + 2000ms; + while (buf.length() > 0 && + std::chrono::steady_clock::now() < deadline) { + auto r = handle.write(buf); + if (!r.ok() || *r == 0) { + std::this_thread::sleep_for(5ms); + } + } + } + + std::unique_ptr client_; +}; + +TEST_F(HttpAsyncClientTest, PostRoundTripDeliversResponseBody) { + uint16_t port = createRealListener(); + createClient(); + + ResponseSink sink; + HttpRequest req; + req.method = "POST"; + req.url = "http://127.0.0.1:" + std::to_string(port) + "/mcp"; + req.headers["Content-Type"] = "application/json"; + req.body = "{\"jsonrpc\":\"2.0\",\"method\":\"ping\"}"; + + executeInDispatcher([this, &req, &sink]() { + const bool ok = client_->send( + req, + [&sink](HttpResponse r) { sink.setResponse(std::move(r)); }, + [&sink](const std::string& e) { sink.setError(e); }); + ASSERT_TRUE(ok); + }); + + auto accepted = acceptOne(); + ASSERT_TRUE(accepted) << "server never saw the inbound POST"; + + const std::string request_wire = readRequest(*accepted); + EXPECT_NE(request_wire.find("POST /mcp HTTP/1.1"), std::string::npos); + EXPECT_NE(request_wire.find("Host: 127.0.0.1:"), std::string::npos); + EXPECT_NE(request_wire.find("Content-Type: application/json"), + std::string::npos); + EXPECT_NE(request_wire.find(req.body), std::string::npos); + + const std::string reply_body = "{\"result\":\"pong\"}"; + std::string reply = "HTTP/1.1 200 OK\r\n"; + reply += "Content-Type: application/json\r\n"; + reply += "Content-Length: " + std::to_string(reply_body.size()) + "\r\n"; + reply += "Connection: close\r\n\r\n"; + reply += reply_body; + writeResponse(*accepted, reply); + + ASSERT_TRUE(sink.wait()); + ASSERT_TRUE(sink.hasResponse()) << "error instead: " << sink.error(); + auto r = sink.response(); + EXPECT_EQ(r.status_code, 200); + EXPECT_EQ(r.body, reply_body); + EXPECT_EQ(r.headers["content-type"], "application/json"); +} + +TEST_F(HttpAsyncClientTest, RejectsMalformedUrl) { + createClient(); + bool cb_fired = false; + executeInDispatcher([this, &cb_fired]() { + HttpRequest req; + req.method = "POST"; + req.url = "not-a-url"; + const bool ok = client_->send( + req, [&](HttpResponse) { cb_fired = true; }, + [&](const std::string&) { cb_fired = true; }); + EXPECT_FALSE(ok); + }); + // Contract: send returns false for a malformed URL and does NOT fire + // either callback. Give the dispatcher a tick to prove nothing posts. + std::this_thread::sleep_for(50ms); + EXPECT_FALSE(cb_fired); +} + +TEST_F(HttpAsyncClientTest, MalformedResponseFiresErrorCallback) { + uint16_t port = createRealListener(); + createClient(); + + // Shared with the callbacks so a late-firing callback (e.g. from + // teardown draining the dispatcher) doesn't hit a destroyed stack + // object. + auto sink = std::make_shared(); + + executeInDispatcher([this, port, sink]() { + HttpRequest req; + req.method = "POST"; + req.url = "http://127.0.0.1:" + std::to_string(port) + "/mcp"; + req.body = "{}"; + ASSERT_TRUE(client_->send( + req, + [sink](HttpResponse r) { sink->setResponse(std::move(r)); }, + [sink](const std::string& e) { sink->setError(e); })); + }); + + auto accepted = acceptOne(); + ASSERT_TRUE(accepted); + // Consume the request so the client moves past writeRequestBytes. + (void)readRequest(*accepted, 2000ms); + // Reply with bytes that cannot be parsed as an HTTP/1.1 response + // status line. The client codec surfaces this as onError, which + // RequestContext forwards as an error callback. + writeResponse(*accepted, "NOT AN HTTP RESPONSE\r\n\r\n"); + + ASSERT_TRUE(sink->wait()); + EXPECT_TRUE(sink->hasError()); + EXPECT_FALSE(sink->hasResponse()); +} + +} // namespace +} // namespace http +} // namespace mcp