diff --git a/include/crow/http_connection.h b/include/crow/http_connection.h index fe748ec937..168f45f47d 100644 --- a/include/crow/http_connection.h +++ b/include/crow/http_connection.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -263,12 +264,33 @@ namespace crow } #endif + if (res.skip_body && res.is_streamed_type() && res.streamed_body_info_.has_content_length) + { + // HEAD responses should not send a body, but should preserve the same known Content-Length. + res.set_header("Content-Length", std::to_string(res.streamed_body_info_.content_length)); + } + + if (res.is_streamed_type() && res.streamed_body_info_.chunked && !supports_chunked_transfer_encoding()) + { + // HTTP/1.0 doesn't support Transfer-Encoding: chunked. + // Fall back to connection-close delimited streaming. + res.headers.erase("Transfer-Encoding"); + res.streamed_body_info_.chunked = false; + res.set_header("Connection", "close"); + add_keep_alive_ = false; + close_connection_ = true; + } + prepare_buffers(); if (res.is_static_type()) { do_write_static(); } + else if (res.is_streamed_type()) + { + do_write_streamed(); + } else { do_write_general(); @@ -325,6 +347,157 @@ namespace crow parser_.clear(); } + void do_write_streamed() + { + error_code ec; + asio::write(adaptor_.socket(), buffers_, ec); // Write the response start / headers + if (ec) + { + CROW_LOG_ERROR << ec << "- buffer write error happened while sending response start / headers. Writing stopped premature."; + } + cancel_deadline_timer(); + + if (!ec && res.skip_body) + { + // HEAD: only headers are sent. + } + else if (!ec && res.streamed_body_info_.reader) + { + std::vector chunk_buffer(res.streamed_body_info_.chunk_size); + std::vector buffers; + buffers.reserve(3); + + if (res.streamed_body_info_.chunked) + { + char chunk_header[32]; + static constexpr char chunk_suffix[] = "\r\n"; + bool chunk_format_failed = false; + + while (true) + { + size_t produced = res.streamed_body_info_.reader(chunk_buffer.data(), chunk_buffer.size()); + if (produced == 0) + { + break; + } + if (produced > chunk_buffer.size()) + { + produced = chunk_buffer.size(); + } + + const int chunk_header_size = std::snprintf(chunk_header, sizeof(chunk_header), "%zx\r\n", produced); + if (chunk_header_size <= 0) + { + CROW_LOG_ERROR << "Failed to format chunk-size header while sending streamed response."; + chunk_format_failed = true; + close_connection_ = true; + break; + } + + buffers.clear(); + buffers.emplace_back(asio::const_buffer(chunk_header, static_cast(chunk_header_size))); + buffers.emplace_back(asio::const_buffer(chunk_buffer.data(), produced)); + buffers.emplace_back(asio::const_buffer(chunk_suffix, sizeof(chunk_suffix) - 1)); + asio::write(adaptor_.socket(), buffers, ec); + if (ec) + { + CROW_LOG_ERROR << ec << " - buffer write error happened while sending chunked streamed response. Writing stopped premature."; + break; + } + } + + if (!ec && !chunk_format_failed) + { + static constexpr char chunk_terminator[] = "0\r\n\r\n"; + buffers.clear(); + buffers.emplace_back(asio::const_buffer(chunk_terminator, sizeof(chunk_terminator) - 1)); + asio::write(adaptor_.socket(), buffers, ec); + if (ec) + { + CROW_LOG_ERROR << ec << " - buffer write error happened while sending terminating chunk. Writing stopped premature."; + } + } + } + else if (res.streamed_body_info_.has_content_length) + { + buffers.resize(1); + size_t remaining = res.streamed_body_info_.content_length; + + while (remaining > 0) + { + const size_t to_produce = std::min(chunk_buffer.size(), remaining); + size_t produced = res.streamed_body_info_.reader(chunk_buffer.data(), to_produce); + if (produced == 0) + { + CROW_LOG_WARNING << "Streamed response terminated before reaching Content-Length."; + break; + } + if (produced > to_produce) + { + produced = to_produce; + } + + buffers[0] = asio::const_buffer(chunk_buffer.data(), produced); + asio::write(adaptor_.socket(), buffers, ec); + if (ec) + { + CROW_LOG_ERROR << ec << " - buffer write error happened while sending streamed response. Writing stopped premature."; + break; + } + remaining -= produced; + } + + if (remaining != 0) + { + close_connection_ = true; + } + } + else + { + // Unknown total length without chunking (HTTP/1.0 fallback): stream until reader returns 0. + buffers.resize(1); + while (true) + { + size_t produced = res.streamed_body_info_.reader(chunk_buffer.data(), chunk_buffer.size()); + if (produced == 0) + { + break; + } + if (produced > chunk_buffer.size()) + { + produced = chunk_buffer.size(); + } + + buffers[0] = asio::const_buffer(chunk_buffer.data(), produced); + asio::write(adaptor_.socket(), buffers, ec); + if (ec) + { + CROW_LOG_ERROR << ec << " - buffer write error happened while sending streamed response (connection-close mode). Writing stopped premature."; + break; + } + } + close_connection_ = true; + } + } + + if (close_connection_) + { + adaptor_.shutdown_readwrite(); + adaptor_.close(); + CROW_LOG_DEBUG << this << " from write (streamed_res)"; + } + + res.end(); + res.clear(); + buffers_.clear(); + parser_.clear(); + } + + bool supports_chunked_transfer_encoding() const + { + return req_.http_ver_major > 1 || (req_.http_ver_major == 1 && req_.http_ver_minor >= 1); + } + void do_write_general() { error_code ec; diff --git a/include/crow/http_response.h b/include/crow/http_response.h index ac5ced4def..e23773d992 100644 --- a/include/crow/http_response.h +++ b/include/crow/http_response.h @@ -179,6 +179,7 @@ namespace crow headers = std::move(r.headers); completed_ = r.completed_; file_info = std::move(r.file_info); + streamed_body_info_ = std::move(r.streamed_body_info_); return *this; } @@ -195,6 +196,7 @@ namespace crow headers.clear(); completed_ = false; file_info = static_file_info{}; + streamed_body_info_ = streamed_body_info{}; } /// Return a "Temporary Redirect" response. @@ -282,6 +284,14 @@ namespace crow return file_info.path.size(); } + /// Check whether the response has a streamed body defined. + bool is_streamed_type() + { + return static_cast(streamed_body_info_.reader); + } + + static constexpr size_t default_stream_chunk_size = 16 * 1024; + /// This constains metadata (coming from the `stat` command) related to any static files associated with this response. /// @@ -293,6 +303,69 @@ namespace crow int statResult; }; + struct streamed_body_info + { + std::function reader; + size_t content_length = 0; + bool has_content_length = false; + bool chunked = false; + size_t chunk_size = default_stream_chunk_size; + }; + + /// Stream a response body using a callback that fills an output buffer. + /// + /// The callback should write at most `max_size` bytes into `buffer` and return the number of written bytes. + /// Returning `0` indicates end-of-stream. + /// + /// `chunk_size` controls how many bytes Crow asks from the callback at a time. + void set_streamed_body(std::function reader, size_t content_length, std::string content_type = "", size_t chunk_size = default_stream_chunk_size) + { + body.clear(); + file_info = static_file_info{}; + streamed_body_info_.reader = std::move(reader); + streamed_body_info_.content_length = content_length; + streamed_body_info_.has_content_length = true; + streamed_body_info_.chunked = false; + streamed_body_info_.chunk_size = (chunk_size == 0 ? 1 : chunk_size); + this->headers.erase("Transfer-Encoding"); + this->set_header("Content-Length", std::to_string(content_length)); + manual_length_header = false; + if (!content_type.empty()) + { + this->set_header("Content-Type", get_mime_type(content_type)); + } +#ifdef CROW_ENABLE_COMPRESSION + compressed = false; +#endif + } + + /// Stream a response body with unknown total size. + /// + /// For HTTP/1.1+, Crow uses `Transfer-Encoding: chunked`. + /// The callback should return `0` to indicate end-of-stream. + /// + /// `chunk_size` controls how many bytes Crow asks from the callback at a time. + void set_streamed_body(std::function reader, std::string content_type = "", size_t chunk_size = default_stream_chunk_size) + { + body.clear(); + file_info = static_file_info{}; + streamed_body_info_.reader = std::move(reader); + streamed_body_info_.content_length = 0; + streamed_body_info_.has_content_length = false; + streamed_body_info_.chunked = true; + streamed_body_info_.chunk_size = (chunk_size == 0 ? 1 : chunk_size); + this->headers.erase("Content-Length"); + this->set_header("Transfer-Encoding", "chunked"); + manual_length_header = true; + if (!content_type.empty()) + { + this->set_header("Content-Type", get_mime_type(content_type)); + } +#ifdef CROW_ENABLE_COMPRESSION + compressed = false; +#endif + } + /// Return a static file as the response body, the content_type may be specified explicitly. void set_static_file_info(std::string path, std::string content_type = "") { @@ -456,5 +529,6 @@ namespace crow std::function complete_request_handler_; std::function is_alive_helper_; static_file_info file_info; + streamed_body_info streamed_body_info_; }; } // namespace crow diff --git a/tests/unittest.cpp b/tests/unittest.cpp index 88f730894f..60cbe121c1 100644 --- a/tests/unittest.cpp +++ b/tests/unittest.cpp @@ -2012,6 +2012,270 @@ TEST_CASE("stream_response") runTest.join(); } // stream_response +TEST_CASE("streamed_response") +{ + SimpleApp app; + + const std::string keyword_ = "hello"; + const size_t repetitions = 250000; + const size_t key_response_size = keyword_.length() * repetitions; + + std::string key_response; + key_response.reserve(key_response_size); + for (size_t i = 0; i < repetitions; i++) + key_response += keyword_; + + CROW_ROUTE(app, "/test") + ([&key_response](const crow::request&, crow::response& res) { + res.set_streamed_body([&key_response, offset = size_t{0}](void* buffer, size_t max_size) mutable -> size_t { + const size_t remaining = key_response.size() - offset; + if (remaining == 0) + return 0; + const size_t to_copy = std::min(max_size, remaining); + std::copy_n(key_response.data() + offset, to_copy, static_cast(buffer)); + offset += to_copy; + return to_copy; + }, + key_response.size(), "text/plain"); + res.end(); + }); + + app.validate(); + + std::thread runTest([&app, &key_response, key_response_size, keyword_]() { + auto _ = app.bindaddr(LOCALHOST_ADDRESS).port(45451).run_async(); + app.wait_for_server_start(); + asio::io_context io_context; + std::string sendmsg; + + //Total bytes received + unsigned int received = 0; + sendmsg = "GET /test HTTP/1.0\r\n\r\n"; + { + asio::streambuf b; + + asio::ip::tcp::socket c(io_context); + c.connect(asio::ip::tcp::endpoint( + asio::ip::make_address(LOCALHOST_ADDRESS), 45451)); + c.send(asio::buffer(sendmsg)); + + // consuming the headers, since we don't need those for the test + static char buf[2048]; + size_t received_headers_bytes = 0; + + // read some response bytes, then find where body starts + const size_t headers_bytes_and_some = 102 * 2; + while (received_headers_bytes < headers_bytes_and_some) + received_headers_bytes += c.receive(asio::buffer(buf + received_headers_bytes, + sizeof(buf) / sizeof(buf[0]) - received_headers_bytes)); + + const std::string::size_type header_end_pos = std::string(buf, received_headers_bytes).find(keyword_); + received += received_headers_bytes - header_end_pos; + + while (received < key_response_size) + { + asio::streambuf::mutable_buffers_type bufs = b.prepare(16384); + + size_t n(0); + n = c.receive(bufs); + b.commit(n); + received += n; + + std::istream istream(&b); + std::string s; + istream >> s; + + CHECK(key_response.substr(received - n, n) == s); + } + } + app.stop(); + }); + runTest.join(); +} // streamed_response + +TEST_CASE("streamed_response_unknown_size_chunked") +{ + SimpleApp app; + const std::string payload = "hello world"; + + CROW_ROUTE(app, "/test") + ([payload](const crow::request&, crow::response& res) { + res.set_streamed_body([payload, offset = size_t{0}](void* buffer, size_t max_size) mutable -> size_t { + const size_t remaining = payload.size() - offset; + if (remaining == 0) + return 0; + + const size_t to_copy = std::min(max_size, remaining); + std::copy_n(payload.data() + offset, to_copy, static_cast(buffer)); + offset += to_copy; + return to_copy; + }, + "text/plain", 4); + res.end(); + }); + + app.validate(); + + std::thread runTest([&app]() { + auto _ = app.bindaddr(LOCALHOST_ADDRESS).port(45452).run_async(); + app.wait_for_server_start(); + + asio::io_context io_context; + asio::ip::tcp::socket c(io_context); + c.connect(asio::ip::tcp::endpoint(asio::ip::make_address(LOCALHOST_ADDRESS), 45452)); + + const std::string sendmsg = "GET /test HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; + c.send(asio::buffer(sendmsg)); + + std::string raw_response; + std::array buf{}; + asio_error_code ec; + while (true) + { + const size_t n = c.receive(asio::buffer(buf), 0, ec); + if (ec == asio::error::eof) + break; + + REQUIRE_FALSE(ec); + raw_response.append(buf.data(), n); + } + + const std::string::size_type header_end_pos = raw_response.find("\r\n\r\n"); + REQUIRE(header_end_pos != std::string::npos); + const std::string headers = raw_response.substr(0, header_end_pos); + const std::string body = raw_response.substr(header_end_pos + 4); + + CHECK(headers.find("Transfer-Encoding: chunked") != std::string::npos); + CHECK(headers.find("Content-Length") == std::string::npos); + CHECK(body == "4\r\nhell\r\n4\r\no wo\r\n3\r\nrld\r\n0\r\n\r\n"); + + app.stop(); + }); + runTest.join(); +} // streamed_response_unknown_size_chunked + +TEST_CASE("streamed_response_unknown_size_http10_fallback") +{ + SimpleApp app; + const std::string payload = "hello world"; + + CROW_ROUTE(app, "/test") + ([payload](const crow::request&, crow::response& res) { + res.set_streamed_body([payload, offset = size_t{0}](void* buffer, size_t max_size) mutable -> size_t { + const size_t remaining = payload.size() - offset; + if (remaining == 0) + return 0; + + const size_t to_copy = std::min(max_size, remaining); + std::copy_n(payload.data() + offset, to_copy, static_cast(buffer)); + offset += to_copy; + return to_copy; + }, + "text/plain", 4); + res.end(); + }); + + app.validate(); + + std::thread runTest([&app, payload]() { + auto _ = app.bindaddr(LOCALHOST_ADDRESS).port(45453).run_async(); + app.wait_for_server_start(); + + asio::io_context io_context; + asio::ip::tcp::socket c(io_context); + c.connect(asio::ip::tcp::endpoint(asio::ip::make_address(LOCALHOST_ADDRESS), 45453)); + + const std::string sendmsg = "GET /test HTTP/1.0\r\nConnection: close\r\n\r\n"; + c.send(asio::buffer(sendmsg)); + + std::string raw_response; + std::array buf{}; + asio_error_code ec; + while (true) + { + const size_t n = c.receive(asio::buffer(buf), 0, ec); + if (ec == asio::error::eof) + break; + + REQUIRE_FALSE(ec); + raw_response.append(buf.data(), n); + } + + const std::string::size_type header_end_pos = raw_response.find("\r\n\r\n"); + REQUIRE(header_end_pos != std::string::npos); + const std::string headers = raw_response.substr(0, header_end_pos); + const std::string body = raw_response.substr(header_end_pos + 4); + + CHECK(headers.find("Transfer-Encoding: chunked") == std::string::npos); + CHECK(headers.find("Connection: close") != std::string::npos); + CHECK(body == payload); + + app.stop(); + }); + runTest.join(); +} // streamed_response_unknown_size_http10_fallback + +TEST_CASE("streamed_response_head_known_length_no_body") +{ + SimpleApp app; + const std::string payload = "hello world"; + + CROW_ROUTE(app, "/test") + .methods(crow::HTTPMethod::Get, crow::HTTPMethod::Head) + ([payload](const crow::request&, crow::response& res) { + res.set_streamed_body([payload, offset = size_t{0}](void* buffer, size_t max_size) mutable -> size_t { + const size_t remaining = payload.size() - offset; + if (remaining == 0) + return 0; + + const size_t to_copy = std::min(max_size, remaining); + std::copy_n(payload.data() + offset, to_copy, static_cast(buffer)); + offset += to_copy; + return to_copy; + }, + payload.size(), "text/plain", 4); + res.end(); + }); + + app.validate(); + + std::thread runTest([&app]() { + auto _ = app.bindaddr(LOCALHOST_ADDRESS).port(45454).run_async(); + app.wait_for_server_start(); + + asio::io_context io_context; + asio::ip::tcp::socket c(io_context); + c.connect(asio::ip::tcp::endpoint(asio::ip::make_address(LOCALHOST_ADDRESS), 45454)); + + const std::string sendmsg = "HEAD /test HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; + c.send(asio::buffer(sendmsg)); + + std::string raw_response; + std::array buf{}; + asio_error_code ec; + while (true) + { + const size_t n = c.receive(asio::buffer(buf), 0, ec); + if (ec == asio::error::eof) + break; + + REQUIRE_FALSE(ec); + raw_response.append(buf.data(), n); + } + + const std::string::size_type header_end_pos = raw_response.find("\r\n\r\n"); + REQUIRE(header_end_pos != std::string::npos); + const std::string headers = raw_response.substr(0, header_end_pos); + const std::string body = raw_response.substr(header_end_pos + 4); + + CHECK(headers.find("Content-Length: 11") != std::string::npos); + CHECK(body.empty()); + + app.stop(); + }); + runTest.join(); +} // streamed_response_head_known_length_no_body + #ifdef CROW_ENABLE_COMPRESSION TEST_CASE("zlib_compression") {