diff --git a/examples/ChunkedResponse.cpp b/examples/ChunkedResponse.cpp new file mode 100644 index 000000000..4e4d38027 --- /dev/null +++ b/examples/ChunkedResponse.cpp @@ -0,0 +1,57 @@ +#include "App.h" + +/* This example demonstrates a large chunked response streamed with tryWrite. */ + +#include +#include +#include + +namespace { + +const std::string payload(16 * 1024 * 1024, 'x'); + +struct ResponseState { + bool aborted = false; +}; + +template +bool tryWriteLoop(uWS::HttpResponse *res, ResponseState *state) { + if (state->aborted) { + return true; + } + + uintmax_t sent = res->getWriteOffset(); + std::string_view remaining = payload; + remaining.remove_prefix((size_t) sent); + return res->tryWrite(remaining, true); +} + +} + +int main() { + + uWS::SSLApp({ + .key_file_name = "misc/key.pem", + .cert_file_name = "misc/cert.pem", + .passphrase = "1234" + }).get("/*", [](auto *res, auto */*req*/) { + auto state = std::make_shared(); + + res->writeHeader("Content-Type", "application/octet-stream"); + res->onAborted([state]() { + state->aborted = true; + }); + + if (!tryWriteLoop(res, state.get())) { + res->onWritable([res, state](uintmax_t) { + return tryWriteLoop(res, state.get()); + }); + } + }).listen(3000, [](auto *listen_socket) { + if (listen_socket) { + std::cout << "Listening on port " << 3000 << std::endl; + } + }).run(); + + std::cout << "Failed to listen on port 3000" << std::endl; +} diff --git a/src/AsyncSocket.h b/src/AsyncSocket.h index 4231f26f1..5b3d3f56d 100644 --- a/src/AsyncSocket.h +++ b/src/AsyncSocket.h @@ -333,6 +333,55 @@ struct AsyncSocket { return {length, false}; } + /* Same semantics as write, but for two buffers. */ + std::pair write2(const char *header, int headerLength, const char *payload, int payloadLength, bool optionally = false) { + int length = headerLength + payloadLength; + + /* Fake success if closed, simple fix to allow uncork of closed socket to succeed */ + if (us_socket_is_closed(SSL, (us_socket_t *) this)) { + return {length, false}; + } + + if (!headerLength) { + return write(payload, payloadLength, optionally); + } + if (!payloadLength) { + return write(header, headerLength, optionally); + } + + if constexpr (!SSL) { + AsyncSocketData *asyncSocketData = getAsyncSocketData(); + if (!asyncSocketData->buffer.length() && getLoopData()->corkedSocket != this) { + int written = us_socket_write2(0, (us_socket_t *) this, header, headerLength, payload, payloadLength); + if (written == length || optionally) { + return {written, written != length}; + } + + if (written > headerLength) { + asyncSocketData->buffer.append(payload + written - headerLength, (size_t) (length - written)); + } else { + asyncSocketData->buffer.append(header + written, (size_t) (headerLength - written)); + asyncSocketData->buffer.append(payload, (size_t) payloadLength); + } + + return {length, true}; + } + } + + auto [headerWritten, failed] = write(header, headerLength, optionally, payloadLength); + if (failed) { + if (!optionally) { + getAsyncSocketData()->buffer.append(payload, (size_t) payloadLength); + return {length, true}; + } + + return {headerWritten, true}; + } + + auto [payloadWritten, payloadFailed] = write(payload, payloadLength, optionally); + return {headerLength + payloadWritten, payloadFailed}; + } + /* Uncork this socket and flush or buffer any corked and/or passed data. It is essential to remember doing this. */ /* It does NOT count bytes written from cork buffer (they are already accounted for in the write call responsible for its corking)! */ std::pair uncork(const char *src = nullptr, int length = 0, bool optionally = false) { diff --git a/src/HttpResponse.h b/src/HttpResponse.h index 4e0a0b3f9..59945e4e6 100644 --- a/src/HttpResponse.h +++ b/src/HttpResponse.h @@ -33,8 +33,6 @@ #include "MoveOnlyFunction.h" -/* todo: tryWrite is missing currently, only send smaller segments with write */ - namespace uWS { /* Some pre-defined status constants to use with writeStatus */ @@ -73,6 +71,15 @@ struct HttpResponse : public AsyncSocket { Super::write(buf, length); } + unsigned int formatCRLFAndChunkHeader(unsigned int value, char *dst) { + dst[0] = '\r'; + dst[1] = '\n'; + int hexLength = utils::u32toaHex(value, dst + 2); + dst[hexLength + 2] = '\r'; + dst[hexLength + 3] = '\n'; + return (unsigned int) hexLength + 4; + } + /* Called only once per request */ void writeMark() { /* Date is always written */ @@ -87,6 +94,96 @@ struct HttpResponse : public AsyncSocket { #endif } + /* Chunked writes can only be resumed by continuing the same body suffix. */ + std::pair internalWriteChunk(std::string_view data, bool optional, bool terminate = false) { + HttpResponseData *httpResponseData = getHttpResponseData(); + constexpr const char *terminatingChunk = "\r\n0\r\n\r\n"; + bool insideChunk = httpResponseData->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + bool needsUncork = !Super::isCorked() && Super::canCork(); + if (needsUncork) { + Super::cork(); + } + + if (!(httpResponseData->state & HttpResponseData::HTTP_WRITE_CALLED)) { + writeMark(); + writeHeader("Transfer-Encoding", "chunked"); + httpResponseData->state |= HttpResponseData::HTTP_WRITE_CALLED; + } + + bool completed = !insideChunk || data.length(); + bool callerFailed = false; + bool hadBackpressure = false; + if (data.length()) { + if (!insideChunk) { + char chunkHeader[12]; + unsigned int chunkHeaderLength = formatCRLFAndChunkHeader((unsigned int) data.length(), chunkHeader); + /* A chunk header must never be optional, or getWriteOffset/onWritable semantics would break. */ + hadBackpressure = Super::write(chunkHeader, (int) chunkHeaderLength, false).second; + } + + auto writtenFailed = terminate ? + Super::write2(data.data(), (int) data.length(), terminatingChunk, 7, optional) : + Super::write(data.data(), (int) data.length(), optional); + int writtenBody = std::min(writtenFailed.first, (int) data.length()); + /* Offset tracks body bytes only, matching getWriteOffset and the offset passed to onWritable. */ + httpResponseData->offset += (uintmax_t) writtenBody; + hadBackpressure = hadBackpressure || writtenFailed.second; + + if (optional && writtenBody != (int) data.length()) { + httpResponseData->state |= HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + completed = false; + callerFailed = true; + } else { + httpResponseData->state &= ~HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + + if (terminate && writtenFailed.first != (int) data.length() + 7) { + int writtenTrailer = writtenFailed.first - writtenBody; + hadBackpressure = hadBackpressure || Super::write(terminatingChunk + writtenTrailer, 7 - writtenTrailer, false).second; + } else if (optional && writtenFailed.second) { + callerFailed = true; + } + } + } + + if (terminate && completed && !data.length()) { + auto writtenFailed = Super::write(terminatingChunk, 7, optional); + hadBackpressure = hadBackpressure || writtenFailed.second; + + if (optional && writtenFailed.first != 7) { + hadBackpressure = hadBackpressure || Super::write(terminatingChunk + writtenFailed.first, 7 - writtenFailed.first, false).second; + } + } + + if (needsUncork) { + bool uncorkFailed = Super::uncork().second; + hadBackpressure = hadBackpressure || uncorkFailed; + if (!terminate && uncorkFailed) { + callerFailed = true; + } + } + + if (hadBackpressure || terminate || !completed) { + Super::timeout(HTTP_TIMEOUT_S); + } + + if (terminate && completed) { + httpResponseData->markDone(); + + if (!Super::isCorked()) { + if (httpResponseData->state & HttpResponseData::HTTP_CONNECTION_CLOSE) { + if ((httpResponseData->state & HttpResponseData::HTTP_RESPONSE_PENDING) == 0) { + if (((AsyncSocket *) this)->getBufferedAmount() == 0) { + ((AsyncSocket *) this)->shutdown(); + ((AsyncSocket *) this)->close(); + } + } + } + } + } + + return {completed, callerFailed}; + } + /* Returns true on success, indicating that it might be feasible to write more data. * Will start timeout if stream reaches totalSize or write failure. */ bool internalEnd(std::string_view data, uintmax_t totalSize, bool optional, bool allowContentLength = true, bool closeConnection = false) { @@ -116,42 +213,7 @@ struct HttpResponse : public AsyncSocket { } if (httpResponseData->state & HttpResponseData::HTTP_WRITE_CALLED) { - - /* We do not have tryWrite-like functionalities, so ignore optional in this path */ - - /* Do not allow sending 0 chunk here */ - if (data.length()) { - Super::write("\r\n", 2); - writeUnsignedHex((unsigned int) data.length()); - Super::write("\r\n", 2); - - /* Ignoring optional for now */ - Super::write(data.data(), (int) data.length()); - } - - /* Terminating 0 chunk */ - Super::write("\r\n0\r\n\r\n", 7); - - httpResponseData->markDone(); - - /* We need to check if we should close this socket here now */ - if (!Super::isCorked()) { - if (httpResponseData->state & HttpResponseData::HTTP_CONNECTION_CLOSE) { - if ((httpResponseData->state & HttpResponseData::HTTP_RESPONSE_PENDING) == 0) { - if (((AsyncSocket *) this)->getBufferedAmount() == 0) { - ((AsyncSocket *) this)->shutdown(); - /* We need to force close after sending FIN since we want to hinder - * clients from keeping to send their huge data */ - ((AsyncSocket *) this)->close(); - return true; - } - } - } - } - - /* tryEnd can never fail when in chunked mode, since we do not have tryWrite (yet), only write */ - Super::timeout(HTTP_TIMEOUT_S); - return true; + return internalWriteChunk(data, false, true).first; } else { /* Write content-length on first call */ if (!(httpResponseData->state & HttpResponseData::HTTP_END_CALLED)) { @@ -452,33 +514,24 @@ struct HttpResponse : public AsyncSocket { bool write(std::string_view data) { writeStatus(HTTP_200_OK); - /* Do not allow sending 0 chunks, they mark end of response */ if (!data.length()) { - /* If you called us, then according to you it was fine to call us so it's fine to still call us */ - return true; + return !(getHttpResponseData()->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING); } - HttpResponseData *httpResponseData = getHttpResponseData(); - - if (!(httpResponseData->state & HttpResponseData::HTTP_WRITE_CALLED)) { - /* Write mark on first call to write */ - writeMark(); - - writeHeader("Transfer-Encoding", "chunked"); - httpResponseData->state |= HttpResponseData::HTTP_WRITE_CALLED; - } + return !internalWriteChunk(data, false).second; + } - Super::write("\r\n", 2); - writeUnsignedHex((unsigned int) data.length()); - Super::write("\r\n", 2); + /* Try and write one chunk. Continue with the remaining body suffix on onWritable. + * Set finalChunk to also send the terminating 0-chunk on completion. */ + bool tryWrite(std::string_view data, bool finalChunk = false) { + writeStatus(HTTP_200_OK); - auto [written, failed] = Super::write(data.data(), (int) data.length()); - if (failed) { - Super::timeout(HTTP_TIMEOUT_S); + if (!data.length() && !finalChunk) { + return !(getHttpResponseData()->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING); } - /* If we did not fail the write, accept more */ - return !failed; + auto [completed, failed] = internalWriteChunk(data, true, finalChunk); + return completed && !failed; } /* Get the current byte write offset for this Http response */ diff --git a/src/HttpResponseData.h b/src/HttpResponseData.h index 556039a52..737837605 100644 --- a/src/HttpResponseData.h +++ b/src/HttpResponseData.h @@ -69,7 +69,8 @@ struct HttpResponseData : AsyncSocketData, HttpParser { HTTP_WRITE_CALLED = 2, // used HTTP_END_CALLED = 4, // used HTTP_RESPONSE_PENDING = 8, // used - HTTP_CONNECTION_CLOSE = 16 // used + HTTP_CONNECTION_CLOSE = 16, // used + HTTP_WRITE_CONTINUATION_PENDING = 32 // used }; /* Per socket event handlers */