Skip to content
57 changes: 57 additions & 0 deletions examples/ChunkedResponse.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include "App.h"

/* This example demonstrates a large chunked response streamed with tryWrite. */

#include <cstdint>
#include <memory>
#include <string>

namespace {

const std::string payload(16 * 1024 * 1024, 'x');

struct ResponseState {
bool aborted = false;
};

template <bool SSL>
bool tryWriteLoop(uWS::HttpResponse<SSL> *res, ResponseState *state) {
if (state->aborted) {
return true;
}

uintmax_t sent = res->getWriteOffset();
std::string_view remaining = payload;
remaining.remove_prefix((size_t) sent);
return res->tryWrite(remaining, true);
}

}

int main() {

uWS::SSLApp({
.key_file_name = "misc/key.pem",
.cert_file_name = "misc/cert.pem",
.passphrase = "1234"
}).get("/*", [](auto *res, auto */*req*/) {
auto state = std::make_shared<ResponseState>();

res->writeHeader("Content-Type", "application/octet-stream");
res->onAborted([state]() {
state->aborted = true;
});

if (!tryWriteLoop(res, state.get())) {
res->onWritable([res, state](uintmax_t) {
return tryWriteLoop(res, state.get());
});
}
}).listen(3000, [](auto *listen_socket) {
if (listen_socket) {
std::cout << "Listening on port " << 3000 << std::endl;
}
}).run();

std::cout << "Failed to listen on port 3000" << std::endl;
}
49 changes: 49 additions & 0 deletions src/AsyncSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,55 @@ struct AsyncSocket {
return {length, false};
}

/* Same semantics as write, but for two buffers. */
std::pair<int, bool> write2(const char *header, int headerLength, const char *payload, int payloadLength, bool optionally = false) {
int length = headerLength + payloadLength;

/* Fake success if closed, simple fix to allow uncork of closed socket to succeed */
if (us_socket_is_closed(SSL, (us_socket_t *) this)) {
return {length, false};
}

if (!headerLength) {
return write(payload, payloadLength, optionally);
}
if (!payloadLength) {
return write(header, headerLength, optionally);
}

if constexpr (!SSL) {
AsyncSocketData<SSL> *asyncSocketData = getAsyncSocketData();
if (!asyncSocketData->buffer.length() && getLoopData()->corkedSocket != this) {
int written = us_socket_write2(0, (us_socket_t *) this, header, headerLength, payload, payloadLength);
if (written == length || optionally) {
return {written, written != length};
}

if (written > headerLength) {
asyncSocketData->buffer.append(payload + written - headerLength, (size_t) (length - written));
} else {
asyncSocketData->buffer.append(header + written, (size_t) (headerLength - written));
asyncSocketData->buffer.append(payload, (size_t) payloadLength);
}

return {length, true};
}
}

auto [headerWritten, failed] = write(header, headerLength, optionally, payloadLength);
if (failed) {
if (!optionally) {
getAsyncSocketData()->buffer.append(payload, (size_t) payloadLength);
return {length, true};
}

return {headerWritten, true};
}

auto [payloadWritten, payloadFailed] = write(payload, payloadLength, optionally);
return {headerLength + payloadWritten, payloadFailed};
}

/* Uncork this socket and flush or buffer any corked and/or passed data. It is essential to remember doing this. */
/* It does NOT count bytes written from cork buffer (they are already accounted for in the write call responsible for its corking)! */
std::pair<int, bool> uncork(const char *src = nullptr, int length = 0, bool optionally = false) {
Expand Down
169 changes: 111 additions & 58 deletions src/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

#include "MoveOnlyFunction.h"

/* todo: tryWrite is missing currently, only send smaller segments with write */

namespace uWS {

/* Some pre-defined status constants to use with writeStatus */
Expand Down Expand Up @@ -73,6 +71,15 @@ struct HttpResponse : public AsyncSocket<SSL> {
Super::write(buf, length);
}

unsigned int formatCRLFAndChunkHeader(unsigned int value, char *dst) {
dst[0] = '\r';
dst[1] = '\n';
int hexLength = utils::u32toaHex(value, dst + 2);
dst[hexLength + 2] = '\r';
dst[hexLength + 3] = '\n';
return (unsigned int) hexLength + 4;
}

/* Called only once per request */
void writeMark() {
/* Date is always written */
Expand All @@ -87,6 +94,96 @@ struct HttpResponse : public AsyncSocket<SSL> {
#endif
}

/* Chunked writes can only be resumed by continuing the same body suffix. */
std::pair<bool, bool> internalWriteChunk(std::string_view data, bool optional, bool terminate = false) {
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
constexpr const char *terminatingChunk = "\r\n0\r\n\r\n";
bool insideChunk = httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
bool needsUncork = !Super::isCorked() && Super::canCork();
if (needsUncork) {
Super::cork();
}

if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
writeMark();
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}

bool completed = !insideChunk || data.length();
bool callerFailed = false;
bool hadBackpressure = false;
if (data.length()) {
if (!insideChunk) {
char chunkHeader[12];
unsigned int chunkHeaderLength = formatCRLFAndChunkHeader((unsigned int) data.length(), chunkHeader);
/* A chunk header must never be optional, or getWriteOffset/onWritable semantics would break. */
hadBackpressure = Super::write(chunkHeader, (int) chunkHeaderLength, false).second;
}

auto writtenFailed = terminate ?
Super::write2(data.data(), (int) data.length(), terminatingChunk, 7, optional) :
Super::write(data.data(), (int) data.length(), optional);
int writtenBody = std::min(writtenFailed.first, (int) data.length());
/* Offset tracks body bytes only, matching getWriteOffset and the offset passed to onWritable. */
httpResponseData->offset += (uintmax_t) writtenBody;
hadBackpressure = hadBackpressure || writtenFailed.second;

if (optional && writtenBody != (int) data.length()) {
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;
completed = false;
callerFailed = true;
} else {
httpResponseData->state &= ~HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING;

if (terminate && writtenFailed.first != (int) data.length() + 7) {
int writtenTrailer = writtenFailed.first - writtenBody;
hadBackpressure = hadBackpressure || Super::write(terminatingChunk + writtenTrailer, 7 - writtenTrailer, false).second;
} else if (optional && writtenFailed.second) {
callerFailed = true;
}
}
}

if (terminate && completed && !data.length()) {
auto writtenFailed = Super::write(terminatingChunk, 7, optional);
hadBackpressure = hadBackpressure || writtenFailed.second;

if (optional && writtenFailed.first != 7) {
hadBackpressure = hadBackpressure || Super::write(terminatingChunk + writtenFailed.first, 7 - writtenFailed.first, false).second;
}
}

if (needsUncork) {
bool uncorkFailed = Super::uncork().second;
hadBackpressure = hadBackpressure || uncorkFailed;
if (!terminate && uncorkFailed) {
callerFailed = true;
}
}

if (hadBackpressure || terminate || !completed) {
Super::timeout(HTTP_TIMEOUT_S);
}

if (terminate && completed) {
httpResponseData->markDone();

if (!Super::isCorked()) {
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) {
if ((httpResponseData->state & HttpResponseData<SSL>::HTTP_RESPONSE_PENDING) == 0) {
if (((AsyncSocket<SSL> *) this)->getBufferedAmount() == 0) {
((AsyncSocket<SSL> *) this)->shutdown();
((AsyncSocket<SSL> *) this)->close();
}
}
}
}
}

return {completed, callerFailed};
}

/* Returns true on success, indicating that it might be feasible to write more data.
* Will start timeout if stream reaches totalSize or write failure. */
bool internalEnd(std::string_view data, uintmax_t totalSize, bool optional, bool allowContentLength = true, bool closeConnection = false) {
Expand Down Expand Up @@ -116,42 +213,7 @@ struct HttpResponse : public AsyncSocket<SSL> {
}

if (httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED) {

/* We do not have tryWrite-like functionalities, so ignore optional in this path */

/* Do not allow sending 0 chunk here */
if (data.length()) {
Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);

/* Ignoring optional for now */
Super::write(data.data(), (int) data.length());
}

/* Terminating 0 chunk */
Super::write("\r\n0\r\n\r\n", 7);

httpResponseData->markDone();

/* We need to check if we should close this socket here now */
if (!Super::isCorked()) {
if (httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) {
if ((httpResponseData->state & HttpResponseData<SSL>::HTTP_RESPONSE_PENDING) == 0) {
if (((AsyncSocket<SSL> *) this)->getBufferedAmount() == 0) {
((AsyncSocket<SSL> *) this)->shutdown();
/* We need to force close after sending FIN since we want to hinder
* clients from keeping to send their huge data */
((AsyncSocket<SSL> *) this)->close();
return true;
}
}
}
}

/* tryEnd can never fail when in chunked mode, since we do not have tryWrite (yet), only write */
Super::timeout(HTTP_TIMEOUT_S);
return true;
return internalWriteChunk(data, false, true).first;
} else {
/* Write content-length on first call */
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_END_CALLED)) {
Expand Down Expand Up @@ -452,33 +514,24 @@ struct HttpResponse : public AsyncSocket<SSL> {
bool write(std::string_view data) {
writeStatus(HTTP_200_OK);

/* Do not allow sending 0 chunks, they mark end of response */
if (!data.length()) {
/* If you called us, then according to you it was fine to call us so it's fine to still call us */
return true;
return !(getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING);
}

HttpResponseData<SSL> *httpResponseData = getHttpResponseData();

if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();

writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
return !internalWriteChunk(data, false).second;
}

Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);
/* Try and write one chunk. Continue with the remaining body suffix on onWritable.
* Set finalChunk to also send the terminating 0-chunk on completion. */
bool tryWrite(std::string_view data, bool finalChunk = false) {
writeStatus(HTTP_200_OK);

auto [written, failed] = Super::write(data.data(), (int) data.length());
if (failed) {
Super::timeout(HTTP_TIMEOUT_S);
if (!data.length() && !finalChunk) {
return !(getHttpResponseData()->state & HttpResponseData<SSL>::HTTP_WRITE_CONTINUATION_PENDING);
}

/* If we did not fail the write, accept more */
return !failed;
auto [completed, failed] = internalWriteChunk(data, true, finalChunk);
return completed && !failed;
}

/* Get the current byte write offset for this Http response */
Expand Down
3 changes: 2 additions & 1 deletion src/HttpResponseData.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
HTTP_WRITE_CALLED = 2, // used
HTTP_END_CALLED = 4, // used
HTTP_RESPONSE_PENDING = 8, // used
HTTP_CONNECTION_CLOSE = 16 // used
HTTP_CONNECTION_CLOSE = 16, // used
HTTP_WRITE_CONTINUATION_PENDING = 32 // used
};

/* Per socket event handlers */
Expand Down
Loading