Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions include/crow/http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdio>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -263,12 +264,33 @@ namespace crow
}
#endif

if (res.skip_body && res.is_streamed_type() && res.streamed_body_info_.has_content_length)
{
// HEAD responses should not send a body, but should preserve the same known Content-Length.
res.set_header("Content-Length", std::to_string(res.streamed_body_info_.content_length));
}

if (res.is_streamed_type() && res.streamed_body_info_.chunked && !supports_chunked_transfer_encoding())
{
// HTTP/1.0 doesn't support Transfer-Encoding: chunked.
// Fall back to connection-close delimited streaming.
res.headers.erase("Transfer-Encoding");
res.streamed_body_info_.chunked = false;
res.set_header("Connection", "close");
add_keep_alive_ = false;
close_connection_ = true;
}

prepare_buffers();

if (res.is_static_type())
{
do_write_static();
}
else if (res.is_streamed_type())
{
do_write_streamed();
}
else
{
do_write_general();
Expand Down Expand Up @@ -325,6 +347,157 @@ namespace crow
parser_.clear();
}

void do_write_streamed()
{
error_code ec;
asio::write(adaptor_.socket(), buffers_, ec); // Write the response start / headers
if (ec)
{
CROW_LOG_ERROR << ec << "- buffer write error happened while sending response start / headers. Writing stopped premature.";
}
cancel_deadline_timer();

if (!ec && res.skip_body)
{
// HEAD: only headers are sent.
}
else if (!ec && res.streamed_body_info_.reader)
{
std::vector<char> chunk_buffer(res.streamed_body_info_.chunk_size);
std::vector<asio::const_buffer> buffers;
buffers.reserve(3);

if (res.streamed_body_info_.chunked)
{
char chunk_header[32];
static constexpr char chunk_suffix[] = "\r\n";
bool chunk_format_failed = false;

while (true)
{
size_t produced = res.streamed_body_info_.reader(chunk_buffer.data(), chunk_buffer.size());
if (produced == 0)
{
break;
}
if (produced > chunk_buffer.size())
{
produced = chunk_buffer.size();
}

const int chunk_header_size = std::snprintf(chunk_header, sizeof(chunk_header), "%zx\r\n", produced);
if (chunk_header_size <= 0)
{
CROW_LOG_ERROR << "Failed to format chunk-size header while sending streamed response.";
chunk_format_failed = true;
close_connection_ = true;
break;
}

buffers.clear();
buffers.emplace_back(asio::const_buffer(chunk_header, static_cast<size_t>(chunk_header_size)));
buffers.emplace_back(asio::const_buffer(chunk_buffer.data(), produced));
buffers.emplace_back(asio::const_buffer(chunk_suffix, sizeof(chunk_suffix) - 1));
asio::write(adaptor_.socket(), buffers, ec);
if (ec)
{
CROW_LOG_ERROR << ec << " - buffer write error happened while sending chunked streamed response. Writing stopped premature.";
break;
}
}

if (!ec && !chunk_format_failed)
{
static constexpr char chunk_terminator[] = "0\r\n\r\n";
buffers.clear();
buffers.emplace_back(asio::const_buffer(chunk_terminator, sizeof(chunk_terminator) - 1));
asio::write(adaptor_.socket(), buffers, ec);
if (ec)
{
CROW_LOG_ERROR << ec << " - buffer write error happened while sending terminating chunk. Writing stopped premature.";
}
}
}
else if (res.streamed_body_info_.has_content_length)
{
buffers.resize(1);
size_t remaining = res.streamed_body_info_.content_length;

while (remaining > 0)
{
const size_t to_produce = std::min(chunk_buffer.size(), remaining);
size_t produced = res.streamed_body_info_.reader(chunk_buffer.data(), to_produce);
if (produced == 0)
{
CROW_LOG_WARNING << "Streamed response terminated before reaching Content-Length.";
break;
}
if (produced > to_produce)
{
produced = to_produce;
}

buffers[0] = asio::const_buffer(chunk_buffer.data(), produced);
asio::write(adaptor_.socket(), buffers, ec);
if (ec)
{
CROW_LOG_ERROR << ec << " - buffer write error happened while sending streamed response. Writing stopped premature.";
break;
}
remaining -= produced;
}

if (remaining != 0)
{
close_connection_ = true;
}
}
else
{
// Unknown total length without chunking (HTTP/1.0 fallback): stream until reader returns 0.
buffers.resize(1);
while (true)
{
size_t produced = res.streamed_body_info_.reader(chunk_buffer.data(), chunk_buffer.size());
if (produced == 0)
{
break;
}
if (produced > chunk_buffer.size())
{
produced = chunk_buffer.size();
}

buffers[0] = asio::const_buffer(chunk_buffer.data(), produced);
asio::write(adaptor_.socket(), buffers, ec);
if (ec)
{
CROW_LOG_ERROR << ec << " - buffer write error happened while sending streamed response (connection-close mode). Writing stopped premature.";
break;
}
}
close_connection_ = true;
}
}

if (close_connection_)
{
adaptor_.shutdown_readwrite();
adaptor_.close();
CROW_LOG_DEBUG << this << " from write (streamed_res)";
}

res.end();
res.clear();
buffers_.clear();
parser_.clear();
}

bool supports_chunked_transfer_encoding() const
{
return req_.http_ver_major > 1 || (req_.http_ver_major == 1 && req_.http_ver_minor >= 1);
}

void do_write_general()
{
error_code ec;
Expand Down
74 changes: 74 additions & 0 deletions include/crow/http_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ namespace crow
headers = std::move(r.headers);
completed_ = r.completed_;
file_info = std::move(r.file_info);
streamed_body_info_ = std::move(r.streamed_body_info_);
return *this;
}

Expand All @@ -195,6 +196,7 @@ namespace crow
headers.clear();
completed_ = false;
file_info = static_file_info{};
streamed_body_info_ = streamed_body_info{};
}

/// Return a "Temporary Redirect" response.
Expand Down Expand Up @@ -282,6 +284,14 @@ namespace crow
return file_info.path.size();
}

/// Check whether the response has a streamed body defined.
bool is_streamed_type()
{
return static_cast<bool>(streamed_body_info_.reader);
}

static constexpr size_t default_stream_chunk_size = 16 * 1024;

/// This constains metadata (coming from the `stat` command) related to any static files associated with this response.

///
Expand All @@ -293,6 +303,69 @@ namespace crow
int statResult;
};

struct streamed_body_info
{
std::function<size_t(void*, size_t)> reader;
size_t content_length = 0;
bool has_content_length = false;
bool chunked = false;
size_t chunk_size = default_stream_chunk_size;
};

/// Stream a response body using a callback that fills an output buffer.
///
/// The callback should write at most `max_size` bytes into `buffer` and return the number of written bytes.
/// Returning `0` indicates end-of-stream.
///
/// `chunk_size` controls how many bytes Crow asks from the callback at a time.
void set_streamed_body(std::function<size_t(void*, size_t)> reader, size_t content_length, std::string content_type = "", size_t chunk_size = default_stream_chunk_size)
{
body.clear();
file_info = static_file_info{};
streamed_body_info_.reader = std::move(reader);
streamed_body_info_.content_length = content_length;
streamed_body_info_.has_content_length = true;
streamed_body_info_.chunked = false;
streamed_body_info_.chunk_size = (chunk_size == 0 ? 1 : chunk_size);
this->headers.erase("Transfer-Encoding");
this->set_header("Content-Length", std::to_string(content_length));
manual_length_header = false;
if (!content_type.empty())
{
this->set_header("Content-Type", get_mime_type(content_type));
}
#ifdef CROW_ENABLE_COMPRESSION
compressed = false;
#endif
}

/// Stream a response body with unknown total size.
///
/// For HTTP/1.1+, Crow uses `Transfer-Encoding: chunked`.
/// The callback should return `0` to indicate end-of-stream.
///
/// `chunk_size` controls how many bytes Crow asks from the callback at a time.
void set_streamed_body(std::function<size_t(void*, size_t)> reader, std::string content_type = "", size_t chunk_size = default_stream_chunk_size)
{
body.clear();
file_info = static_file_info{};
streamed_body_info_.reader = std::move(reader);
streamed_body_info_.content_length = 0;
streamed_body_info_.has_content_length = false;
streamed_body_info_.chunked = true;
streamed_body_info_.chunk_size = (chunk_size == 0 ? 1 : chunk_size);
this->headers.erase("Content-Length");
this->set_header("Transfer-Encoding", "chunked");
manual_length_header = true;
if (!content_type.empty())
{
this->set_header("Content-Type", get_mime_type(content_type));
}
#ifdef CROW_ENABLE_COMPRESSION
compressed = false;
#endif
}

/// Return a static file as the response body, the content_type may be specified explicitly.
void set_static_file_info(std::string path, std::string content_type = "")
{
Expand Down Expand Up @@ -456,5 +529,6 @@ namespace crow
std::function<void()> complete_request_handler_;
std::function<bool()> is_alive_helper_;
static_file_info file_info;
streamed_body_info streamed_body_info_;
};
} // namespace crow
Loading
Loading