From ce79debd00138e0c7701467a79c05a56998759e7 Mon Sep 17 00:00:00 2001 From: shartung Date: Thu, 16 Apr 2026 10:42:14 +0200 Subject: [PATCH 01/10] Optimize chunked write path and add tryWrite --- examples/Crc32.cpp | 117 ++++++++++++++++++++++++++++++++- src/HttpResponse.h | 144 ++++++++++++++++++++++++----------------- src/HttpResponseData.h | 3 +- tests/smoke.mjs | 61 ++++++++++++++++- 4 files changed, 264 insertions(+), 61 deletions(-) diff --git a/examples/Crc32.cpp b/examples/Crc32.cpp index d27350c2e..8793f4fa8 100644 --- a/examples/Crc32.cpp +++ b/examples/Crc32.cpp @@ -14,6 +14,75 @@ #include #include #include +#include +#include +#include + +#include "../uSockets/src/libusockets.h" + +namespace { + +const std::string writeChunk(1024, 'a'); +const std::string tryWritePayload(128 * 1024, 'x'); +const std::string tryWriteEndPayload(256 * 1024, 'y'); + +struct WriteState { + int remaining = 128; + bool aborted = false; +}; + +struct TryWriteState { + const std::string *payload = nullptr; + uintmax_t baseOffset = 0; + bool aborted = false; +}; + +template +void setSmallSendBuffer(uWS::HttpResponse *res) { +#ifdef LIBUS_NO_SSL + int fd = (int) (uintptr_t) us_socket_get_native_handle(SSL, (us_socket_t *) res); + int sendBuffer = 4096; + setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendBuffer, sizeof(sendBuffer)); +#else + (void) res; +#endif +} + +template +void writeLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { + if (state->aborted) { + return; + } + + if (!state->remaining) { + res->end(); + return; + } + + state->remaining--; + res->write(writeChunk); + uWS::Loop::get()->defer([res, state]() { + writeLoop(res, state); + }); +} + +template +bool tryWriteLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { + if (state->aborted) { + return true; + } + + uintmax_t sent = res->getWriteOffset() - state->baseOffset; + std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); + if (res->tryWrite(remaining)) { + res->end(); + return true; + } + + return false; +} + +} uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) { @@ -31,11 +100,57 @@ uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) { } int main() { - uWS::SSLApp({ .key_file_name = "misc/key.pem", .cert_file_name = "misc/cert.pem", .passphrase = "1234" + }).get("/write", [](auto *res, auto */*req*/) { + auto state = std::make_shared(); + + res->onAborted([state]() { + state->aborted = true; + }); + + uWS::Loop::get()->defer([res, state]() { + writeLoop(res, state); + }); + }).get("/trywrite", [](auto *res, auto */*req*/) { + setSmallSendBuffer(res); + + auto state = std::make_shared(); + state->payload = &tryWritePayload; + state->baseOffset = res->getWriteOffset(); + + res->onAborted([state]() { + state->aborted = true; + }); + + if (!tryWriteLoop(res, state)) { + res->onWritable([res, state](uintmax_t) { + return tryWriteLoop(res, state); + }); + } + }).get("/trywrite-end", [](auto *res, auto */*req*/) { + setSmallSendBuffer(res); + + auto state = std::make_shared(); + state->payload = &tryWriteEndPayload; + state->baseOffset = res->getWriteOffset(); + + res->onAborted([state]() { + state->aborted = true; + }); + + if (!res->tryWrite(*state->payload)) { + res->onWritable([res, state](uintmax_t offset) { + uintmax_t sent = offset - state->baseOffset; + std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); + res->end(remaining); + return true; + }); + } else { + res->end(); + } }).post("/*", [](auto *res, auto *req) { /* Display the headers */ diff --git a/src/HttpResponse.h b/src/HttpResponse.h index 4e0a0b3f9..1a6d5ebe2 100644 --- a/src/HttpResponse.h +++ b/src/HttpResponse.h @@ -33,8 +33,6 @@ #include "MoveOnlyFunction.h" -/* todo: tryWrite is missing currently, only send smaller segments with write */ - namespace uWS { /* Some pre-defined status constants to use with writeStatus */ @@ -73,6 +71,15 @@ struct HttpResponse : public AsyncSocket { Super::write(buf, length); } + unsigned int formatChunkHeader(unsigned int value, char *dst) { + dst[0] = '\r'; + dst[1] = '\n'; + int hexLength = utils::u32toaHex(value, dst + 2); + dst[hexLength + 2] = '\r'; + dst[hexLength + 3] = '\n'; + return (unsigned int) hexLength + 4; + } + /* Called only once per request */ void writeMark() { /* Date is always written */ @@ -87,6 +94,72 @@ struct HttpResponse : public AsyncSocket { #endif } + /* Chunked writes can only be resumed by continuing the same body suffix. */ + std::pair internalWriteChunk(std::string_view data, bool optional, bool terminate = false) { + HttpResponseData *httpResponseData = getHttpResponseData(); + bool continuingChunk = httpResponseData->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + bool needsUncork = !Super::isCorked() && Super::canCork(); + if (needsUncork) { + Super::cork(); + } + + if (!(httpResponseData->state & HttpResponseData::HTTP_WRITE_CALLED)) { + writeMark(); + writeHeader("Transfer-Encoding", "chunked"); + httpResponseData->state |= HttpResponseData::HTTP_WRITE_CALLED; + } + + bool completed = !continuingChunk || data.length(); + bool failed = false; + if (data.length()) { + if (!continuingChunk) { + char chunkHeader[34]; + unsigned int chunkHeaderLength = formatChunkHeader((unsigned int) data.length(), chunkHeader); + failed = Super::write(chunkHeader, (int) chunkHeaderLength).second; + } + + auto writtenFailed = Super::write(data.data(), (int) data.length(), optional); + httpResponseData->offset += (uintmax_t) writtenFailed.first; + failed = failed || writtenFailed.second; + + if (optional && (writtenFailed.first != (int) data.length() || failed)) { + httpResponseData->state |= HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + completed = false; + } else { + httpResponseData->state &= ~HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + } + } + + if (terminate && completed) { + Super::write("\r\n0\r\n\r\n", 7); + } + + if (needsUncork) { + failed = failed || Super::uncork().second; + } + + if (failed || terminate || !completed) { + Super::timeout(HTTP_TIMEOUT_S); + } + + if (terminate && completed) { + httpResponseData->markDone(); + + if (!Super::isCorked()) { + if (httpResponseData->state & HttpResponseData::HTTP_CONNECTION_CLOSE) { + if ((httpResponseData->state & HttpResponseData::HTTP_RESPONSE_PENDING) == 0) { + if (((AsyncSocket *) this)->getBufferedAmount() == 0) { + ((AsyncSocket *) this)->shutdown(); + ((AsyncSocket *) this)->close(); + } + } + } + } + } + + return {completed, failed}; + } + /* Returns true on success, indicating that it might be feasible to write more data. * Will start timeout if stream reaches totalSize or write failure. */ bool internalEnd(std::string_view data, uintmax_t totalSize, bool optional, bool allowContentLength = true, bool closeConnection = false) { @@ -116,42 +189,7 @@ struct HttpResponse : public AsyncSocket { } if (httpResponseData->state & HttpResponseData::HTTP_WRITE_CALLED) { - - /* We do not have tryWrite-like functionalities, so ignore optional in this path */ - - /* Do not allow sending 0 chunk here */ - if (data.length()) { - Super::write("\r\n", 2); - writeUnsignedHex((unsigned int) data.length()); - Super::write("\r\n", 2); - - /* Ignoring optional for now */ - Super::write(data.data(), (int) data.length()); - } - - /* Terminating 0 chunk */ - Super::write("\r\n0\r\n\r\n", 7); - - httpResponseData->markDone(); - - /* We need to check if we should close this socket here now */ - if (!Super::isCorked()) { - if (httpResponseData->state & HttpResponseData::HTTP_CONNECTION_CLOSE) { - if ((httpResponseData->state & HttpResponseData::HTTP_RESPONSE_PENDING) == 0) { - if (((AsyncSocket *) this)->getBufferedAmount() == 0) { - ((AsyncSocket *) this)->shutdown(); - /* We need to force close after sending FIN since we want to hinder - * clients from keeping to send their huge data */ - ((AsyncSocket *) this)->close(); - return true; - } - } - } - } - - /* tryEnd can never fail when in chunked mode, since we do not have tryWrite (yet), only write */ - Super::timeout(HTTP_TIMEOUT_S); - return true; + return internalWriteChunk(data, false, true).first; } else { /* Write content-length on first call */ if (!(httpResponseData->state & HttpResponseData::HTTP_END_CALLED)) { @@ -452,33 +490,23 @@ struct HttpResponse : public AsyncSocket { bool write(std::string_view data) { writeStatus(HTTP_200_OK); - /* Do not allow sending 0 chunks, they mark end of response */ if (!data.length()) { - /* If you called us, then according to you it was fine to call us so it's fine to still call us */ - return true; + return !(getHttpResponseData()->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING); } - HttpResponseData *httpResponseData = getHttpResponseData(); - - if (!(httpResponseData->state & HttpResponseData::HTTP_WRITE_CALLED)) { - /* Write mark on first call to write */ - writeMark(); - - writeHeader("Transfer-Encoding", "chunked"); - httpResponseData->state |= HttpResponseData::HTTP_WRITE_CALLED; - } + return !internalWriteChunk(data, false).second; + } - Super::write("\r\n", 2); - writeUnsignedHex((unsigned int) data.length()); - Super::write("\r\n", 2); + /* Try and write one chunk. Continue with the remaining body suffix on onWritable. */ + bool tryWrite(std::string_view data) { + writeStatus(HTTP_200_OK); - auto [written, failed] = Super::write(data.data(), (int) data.length()); - if (failed) { - Super::timeout(HTTP_TIMEOUT_S); + if (!data.length()) { + return !(getHttpResponseData()->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING); } - /* If we did not fail the write, accept more */ - return !failed; + auto [completed, failed] = internalWriteChunk(data, true); + return completed && !failed; } /* Get the current byte write offset for this Http response */ diff --git a/src/HttpResponseData.h b/src/HttpResponseData.h index 556039a52..737837605 100644 --- a/src/HttpResponseData.h +++ b/src/HttpResponseData.h @@ -69,7 +69,8 @@ struct HttpResponseData : AsyncSocketData, HttpParser { HTTP_WRITE_CALLED = 2, // used HTTP_END_CALLED = 4, // used HTTP_RESPONSE_PENDING = 8, // used - HTTP_CONNECTION_CLOSE = 16 // used + HTTP_CONNECTION_CLOSE = 16, // used + HTTP_WRITE_CONTINUATION_PENDING = 32 // used }; /* Per socket event handlers */ diff --git a/tests/smoke.mjs b/tests/smoke.mjs index a4b4f6953..a20505fea 100644 --- a/tests/smoke.mjs +++ b/tests/smoke.mjs @@ -66,6 +66,61 @@ async function fixedCrc32Test(array) { } } +async function readBodySlowly(response) { + const reader = response.body.getReader(); + const chunks = []; + let total = 0; + + while (true) { + const {done, value} = await reader.read(); + if (done) { + break; + } + + chunks.push(value); + total += value.length; + await new Promise((resolve) => setTimeout(resolve, 5)); + } + + const body = new Uint8Array(total); + let offset = 0; + for (const chunk of chunks) { + body.set(chunk, offset); + offset += chunk.length; + } + return body; +} + +function expectFilled(body, size, value, label) { + if (body.length !== size) { + throw new Error(label + " failed: expected body size " + size + ", got " + body.length); + } + + for (let i = 0; i < body.length; i++) { + if (body[i] !== value) { + throw new Error(label + " failed: unexpected byte at offset " + i); + } + } +} + +async function streamingWriteTest() { + console.log("Making streaming write request"); + const res = await fetch("http://localhost:3000/write"); + expectFilled(await readBodySlowly(res), 128 * 1024, "a".charCodeAt(0), "write"); +} + +async function streamingTryWriteTest() { + console.log("Making tryWrite request"); + const res = await fetch("http://localhost:3000/trywrite"); + expectFilled(await readBodySlowly(res), 128 * 1024, "x".charCodeAt(0), "tryWrite"); +} + +async function streamingTryWriteEndTest() { + console.log("Making tryWrite-end request"); + const res = await fetch("http://localhost:3000/trywrite-end"); + expectFilled(await readBodySlowly(res), 256 * 1024, "y".charCodeAt(0), "tryWrite-end"); +} + /* Maximum chunk size is less than 256mb */ const sizes = [0, 0, 32, 32, 128, 256, 1024, 65536, 1024 * 1024, 1024 * 1024 * 128, 0, 0, 32, 32]; for (let i = 0; i < sizes.length; i++) { @@ -83,4 +138,8 @@ for (let i = 0; i < sizes.length; i++) { await chunkedCrc32Test(array); } -console.log("Done!"); \ No newline at end of file +await streamingWriteTest(); +await streamingTryWriteTest(); +await streamingTryWriteEndTest(); + +console.log("Done!"); From cabf5aa54657c0fe6b778ee59ae99326e7051cbe Mon Sep 17 00:00:00 2001 From: shartung Date: Fri, 17 Apr 2026 22:26:24 +0200 Subject: [PATCH 02/10] Move tryWrite smoke coverage into dedicated SmokeTest --- .github/workflows/cpp.yml | 4 +- build.c | 4 +- examples/Crc32.cpp | 116 ------------------------- examples/SmokeTest.cpp | 174 ++++++++++++++++++++++++++++++-------- tests/Makefile | 4 +- tests/smoke.mjs | 6 +- 6 files changed, 147 insertions(+), 161 deletions(-) diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index 3dc24d93e..bdae4fd4a 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -48,10 +48,10 @@ jobs: run: | cd uWebSockets iwr https://deno.land/x/install/install.ps1 -useb | iex - Start-Process -NoNewWindow .\Crc32 + Start-Process -NoNewWindow .\SmokeTest sleep 1 deno run --allow-net tests\smoke.mjs - Stop-Process -Name Crc32 + Stop-Process -Name SmokeTest build_linux: diff --git a/build.c b/build.c index 879609ac8..702ccad17 100644 --- a/build.c +++ b/build.c @@ -9,8 +9,8 @@ int main(int argc, char **argv) { char *CXX = strncpy(calloc(1024, 1), or_else(getenv("CXX"), "g++"), 1024); char *EXEC_SUFFIX = strncpy(calloc(1024, 1), maybe(getenv("EXEC_SUFFIX")), 1024); - char *EXAMPLE_FILES[] = {"SecureGzipFileServer", "Precompress", "EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName", - "EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"}; + char *EXAMPLE_FILES[] = {"SecureGzipFileServer", "Precompress", "EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "SmokeTest", + "ServerName", "EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"}; strcat(CXXFLAGS, " -march=native -O3 -Wpedantic -Wall -Wextra -Wsign-conversion -Wconversion -std=c++2b -Isrc -IuSockets/src"); strcat(LDFLAGS, " uSockets/*.o"); diff --git a/examples/Crc32.cpp b/examples/Crc32.cpp index 8793f4fa8..2ddb37bea 100644 --- a/examples/Crc32.cpp +++ b/examples/Crc32.cpp @@ -14,75 +14,6 @@ #include #include #include -#include -#include -#include - -#include "../uSockets/src/libusockets.h" - -namespace { - -const std::string writeChunk(1024, 'a'); -const std::string tryWritePayload(128 * 1024, 'x'); -const std::string tryWriteEndPayload(256 * 1024, 'y'); - -struct WriteState { - int remaining = 128; - bool aborted = false; -}; - -struct TryWriteState { - const std::string *payload = nullptr; - uintmax_t baseOffset = 0; - bool aborted = false; -}; - -template -void setSmallSendBuffer(uWS::HttpResponse *res) { -#ifdef LIBUS_NO_SSL - int fd = (int) (uintptr_t) us_socket_get_native_handle(SSL, (us_socket_t *) res); - int sendBuffer = 4096; - setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sendBuffer, sizeof(sendBuffer)); -#else - (void) res; -#endif -} - -template -void writeLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { - if (state->aborted) { - return; - } - - if (!state->remaining) { - res->end(); - return; - } - - state->remaining--; - res->write(writeChunk); - uWS::Loop::get()->defer([res, state]() { - writeLoop(res, state); - }); -} - -template -bool tryWriteLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { - if (state->aborted) { - return true; - } - - uintmax_t sent = res->getWriteOffset() - state->baseOffset; - std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); - if (res->tryWrite(remaining)) { - res->end(); - return true; - } - - return false; -} - -} uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) { @@ -104,53 +35,6 @@ int main() { .key_file_name = "misc/key.pem", .cert_file_name = "misc/cert.pem", .passphrase = "1234" - }).get("/write", [](auto *res, auto */*req*/) { - auto state = std::make_shared(); - - res->onAborted([state]() { - state->aborted = true; - }); - - uWS::Loop::get()->defer([res, state]() { - writeLoop(res, state); - }); - }).get("/trywrite", [](auto *res, auto */*req*/) { - setSmallSendBuffer(res); - - auto state = std::make_shared(); - state->payload = &tryWritePayload; - state->baseOffset = res->getWriteOffset(); - - res->onAborted([state]() { - state->aborted = true; - }); - - if (!tryWriteLoop(res, state)) { - res->onWritable([res, state](uintmax_t) { - return tryWriteLoop(res, state); - }); - } - }).get("/trywrite-end", [](auto *res, auto */*req*/) { - setSmallSendBuffer(res); - - auto state = std::make_shared(); - state->payload = &tryWriteEndPayload; - state->baseOffset = res->getWriteOffset(); - - res->onAborted([state]() { - state->aborted = true; - }); - - if (!res->tryWrite(*state->payload)) { - res->onWritable([res, state](uintmax_t offset) { - uintmax_t sent = offset - state->baseOffset; - std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); - res->end(remaining); - return true; - }); - } else { - res->end(); - } }).post("/*", [](auto *res, auto *req) { /* Display the headers */ diff --git a/examples/SmokeTest.cpp b/examples/SmokeTest.cpp index dc635fadf..8233e8ced 100644 --- a/examples/SmokeTest.cpp +++ b/examples/SmokeTest.cpp @@ -2,55 +2,157 @@ /* This is not an example; it is a smoke test used in CI testing */ -struct Stream { - int offset; - bool aborted; +#include +#include +#include +#include +#include + +namespace { + +const std::string writeChunk(1024, 'a'); +const std::string tryWritePayload(16 * 1024 * 1024, 'x'); +const std::string tryWriteEndPayload(32 * 1024 * 1024, 'y'); + +struct WriteState { + int remaining = 128; + bool aborted = false; }; -std::string constantChunk; - -void streamData(auto *res, auto stream, int chunk) { - - if (stream->aborted) { - return; - } - - if (chunk < 1600) { - res->cork([res, stream, chunk]() { - auto ok = res->write(constantChunk); - if (ok) { - streamData(res, stream, chunk + 1); - return; - } - - uWS::Loop::get()->defer([res, stream, chunk]() { - streamData(res, stream, chunk + 1); - }); - }); - } else { - res->cork([res]() { - res->end(); - }); - } +struct TryWriteState { + const std::string *payload = nullptr; + uintmax_t baseOffset = 0; + bool aborted = false; +}; + +uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) { + + for (size_t i = 0; i < n; i++) { + unsigned char ch = static_cast(s[i]); + for (size_t j = 0; j < 8; j++) { + uint32_t b = (ch ^ crc) & 1; + crc >>= 1; + if (b) crc = crc ^ 0xEDB88320; + ch >>= 1; + } + } + + return crc; } -int main() { +template +bool writeLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { + while (!state->aborted && state->remaining) { + state->remaining--; + if (!res->write(writeChunk)) { + return false; + } + } + + if (!state->aborted) { + res->end(); + } + + return true; +} - for (int i = 0; i < 65536; i++) { - constantChunk.append("a", 1); +template +bool tryWriteLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { + if (state->aborted) { + return true; } + uintmax_t sent = res->getWriteOffset() - state->baseOffset; + std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); + if (res->tryWrite(remaining)) { + res->end(); + return true; + } + + return false; +} + +} + +int main() { uWS::SSLApp({ .key_file_name = "misc/key.pem", .cert_file_name = "misc/cert.pem", .passphrase = "1234" - }).get("/*", [](auto *res, auto */*req*/) { + }).get("/write", [](auto *res, auto */*req*/) { + auto state = std::make_shared(); + + res->onAborted([state]() { + state->aborted = true; + }); + + if (!writeLoop(res, state)) { + res->onWritable([res, state](uintmax_t) { + return writeLoop(res, state); + }); + } + }).get("/trywrite", [](auto *res, auto */*req*/) { + auto state = std::make_shared(); + state->payload = &tryWritePayload; + state->baseOffset = res->getWriteOffset(); + + res->onAborted([state]() { + state->aborted = true; + }); + + if (!tryWriteLoop(res, state)) { + res->onWritable([res, state](uintmax_t) { + return tryWriteLoop(res, state); + }); + } + }).get("/trywrite-end", [](auto *res, auto */*req*/) { + auto state = std::make_shared(); + state->payload = &tryWriteEndPayload; + state->baseOffset = res->getWriteOffset(); - auto stream = std::make_shared(0, false); - streamData(res, stream, 0); + res->onAborted([state]() { + state->aborted = true; + }); + + if (!res->tryWrite(*state->payload)) { + res->onWritable([res, state](uintmax_t offset) { + if (state->aborted) { + return true; + } + + uintmax_t sent = offset - state->baseOffset; + std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); + res->end(remaining); + return true; + }); + } else { + res->end(); + } + }).post("/*", [](auto *res, auto *req) { + + auto isAborted = std::make_shared(false); + uint32_t crc = 0xFFFFFFFF; + + /* Display the headers */ + std::cout << " --- " << req->getUrl() << " --- " << std::endl; + for (auto [key, value] : *req) { + std::cout << key << ": " << value << std::endl; + } + + res->onData([res, isAborted, crc](std::string_view chunk, bool isFin) mutable { + if (chunk.length()) { + crc = crc32(chunk.data(), chunk.length(), crc); + } + + if (isFin && !*isAborted) { + std::stringstream s; + s << std::hex << (~crc) << std::endl; + res->end(s.str()); + } + }); - res->onAborted([stream]() { - stream->aborted = true; + res->onAborted([isAborted]() { + *isAborted = true; }); }).listen(3000, [](auto *listen_socket) { if (listen_socket) { diff --git a/tests/Makefile b/tests/Makefile index 86dc82b9b..836474628 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -19,11 +19,11 @@ performance: ./HttpRouter smoke: - ../Crc32 & + ../SmokeTest & sleep 1 ~/.deno/bin/deno run --allow-net smoke.mjs node smoke.mjs - pkill Crc32 + pkill SmokeTest compliance: ../EchoBody & diff --git a/tests/smoke.mjs b/tests/smoke.mjs index a20505fea..a2dd89f9c 100644 --- a/tests/smoke.mjs +++ b/tests/smoke.mjs @@ -1,4 +1,4 @@ -/* This smoke test runs against the Crc32 example program for now, but this example will be extended for more tests */ +/* This smoke test runs against the dedicated SmokeTest program */ var crc32 = (function () { var table = new Uint32Array(256); @@ -112,13 +112,13 @@ async function streamingWriteTest() { async function streamingTryWriteTest() { console.log("Making tryWrite request"); const res = await fetch("http://localhost:3000/trywrite"); - expectFilled(await readBodySlowly(res), 128 * 1024, "x".charCodeAt(0), "tryWrite"); + expectFilled(await readBodySlowly(res), 16 * 1024 * 1024, "x".charCodeAt(0), "tryWrite"); } async function streamingTryWriteEndTest() { console.log("Making tryWrite-end request"); const res = await fetch("http://localhost:3000/trywrite-end"); - expectFilled(await readBodySlowly(res), 256 * 1024, "y".charCodeAt(0), "tryWrite-end"); + expectFilled(await readBodySlowly(res), 32 * 1024 * 1024, "y".charCodeAt(0), "tryWrite-end"); } /* Maximum chunk size is less than 256mb */ From b9d52dc1acdeffb7b5af647193415b9739dd1ef2 Mon Sep 17 00:00:00 2001 From: GetThatCookie Date: Fri, 17 Apr 2026 23:43:41 +0200 Subject: [PATCH 03/10] suffix for Windows SmokeTest process --- .github/workflows/cpp.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index bdae4fd4a..d50089b64 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -48,7 +48,7 @@ jobs: run: | cd uWebSockets iwr https://deno.land/x/install/install.ps1 -useb | iex - Start-Process -NoNewWindow .\SmokeTest + Start-Process -NoNewWindow .\SmokeTest.exe sleep 1 deno run --allow-net tests\smoke.mjs Stop-Process -Name SmokeTest From 8635150bd7d00f304d1209c3a918509ba19e662b Mon Sep 17 00:00:00 2001 From: shartung Date: Sat, 18 Apr 2026 21:20:25 +0200 Subject: [PATCH 04/10] Align tryWrite follow-up with existing example-based repo structure --- .github/workflows/cpp.yml | 4 +- build.c | 4 +- examples/ChunkedResponse.cpp | 64 +++++++++++++ examples/Crc32.cpp | 1 + examples/SmokeTest.cpp | 174 ++++++++--------------------------- tests/Makefile | 4 +- tests/smoke.mjs | 63 +------------ 7 files changed, 109 insertions(+), 205 deletions(-) create mode 100644 examples/ChunkedResponse.cpp diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index d50089b64..3dc24d93e 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -48,10 +48,10 @@ jobs: run: | cd uWebSockets iwr https://deno.land/x/install/install.ps1 -useb | iex - Start-Process -NoNewWindow .\SmokeTest.exe + Start-Process -NoNewWindow .\Crc32 sleep 1 deno run --allow-net tests\smoke.mjs - Stop-Process -Name SmokeTest + Stop-Process -Name Crc32 build_linux: diff --git a/build.c b/build.c index 702ccad17..879609ac8 100644 --- a/build.c +++ b/build.c @@ -9,8 +9,8 @@ int main(int argc, char **argv) { char *CXX = strncpy(calloc(1024, 1), or_else(getenv("CXX"), "g++"), 1024); char *EXEC_SUFFIX = strncpy(calloc(1024, 1), maybe(getenv("EXEC_SUFFIX")), 1024); - char *EXAMPLE_FILES[] = {"SecureGzipFileServer", "Precompress", "EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "SmokeTest", - "ServerName", "EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"}; + char *EXAMPLE_FILES[] = {"SecureGzipFileServer", "Precompress", "EchoBody", "HelloWorldThreaded", "Http3Server", "Broadcast", "HelloWorld", "Crc32", "ServerName", + "EchoServer", "BroadcastingEchoServer", "UpgradeSync", "UpgradeAsync", "ParameterRoutes"}; strcat(CXXFLAGS, " -march=native -O3 -Wpedantic -Wall -Wextra -Wsign-conversion -Wconversion -std=c++2b -Isrc -IuSockets/src"); strcat(LDFLAGS, " uSockets/*.o"); diff --git a/examples/ChunkedResponse.cpp b/examples/ChunkedResponse.cpp new file mode 100644 index 000000000..78a373783 --- /dev/null +++ b/examples/ChunkedResponse.cpp @@ -0,0 +1,64 @@ +#include "App.h" + +/* This example demonstrates a large chunked response streamed with tryWrite. */ + +#include +#include +#include + +namespace { + +const std::string payload(16 * 1024 * 1024, 'x'); + +struct ResponseState { + uintmax_t baseOffset = 0; + bool aborted = false; +}; + +template +bool tryWriteLoop(uWS::HttpResponse *res, ResponseState *state) { + if (state->aborted) { + return true; + } + + uintmax_t sent = res->getWriteOffset() - state->baseOffset; + std::string_view remaining = payload; + remaining.remove_prefix((size_t) sent); + if (res->tryWrite(remaining)) { + res->end(); + return true; + } + + return false; +} + +} + +int main() { + + uWS::SSLApp({ + .key_file_name = "misc/key.pem", + .cert_file_name = "misc/cert.pem", + .passphrase = "1234" + }).get("/*", [](auto *res, auto */*req*/) { + auto state = std::make_shared(); + state->baseOffset = res->getWriteOffset(); + + res->writeHeader("Content-Type", "application/octet-stream"); + res->onAborted([state]() { + state->aborted = true; + }); + + if (!tryWriteLoop(res, state.get())) { + res->onWritable([res, state](uintmax_t) { + return tryWriteLoop(res, state.get()); + }); + } + }).listen(3000, [](auto *listen_socket) { + if (listen_socket) { + std::cout << "Listening on port " << 3000 << std::endl; + } + }).run(); + + std::cout << "Failed to listen on port 3000" << std::endl; +} diff --git a/examples/Crc32.cpp b/examples/Crc32.cpp index 2ddb37bea..d27350c2e 100644 --- a/examples/Crc32.cpp +++ b/examples/Crc32.cpp @@ -31,6 +31,7 @@ uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) { } int main() { + uWS::SSLApp({ .key_file_name = "misc/key.pem", .cert_file_name = "misc/cert.pem", diff --git a/examples/SmokeTest.cpp b/examples/SmokeTest.cpp index 8233e8ced..dc635fadf 100644 --- a/examples/SmokeTest.cpp +++ b/examples/SmokeTest.cpp @@ -2,157 +2,55 @@ /* This is not an example; it is a smoke test used in CI testing */ -#include -#include -#include -#include -#include - -namespace { - -const std::string writeChunk(1024, 'a'); -const std::string tryWritePayload(16 * 1024 * 1024, 'x'); -const std::string tryWriteEndPayload(32 * 1024 * 1024, 'y'); - -struct WriteState { - int remaining = 128; - bool aborted = false; +struct Stream { + int offset; + bool aborted; }; -struct TryWriteState { - const std::string *payload = nullptr; - uintmax_t baseOffset = 0; - bool aborted = false; -}; - -uint32_t crc32(const char *s, size_t n, uint32_t crc = 0xFFFFFFFF) { - - for (size_t i = 0; i < n; i++) { - unsigned char ch = static_cast(s[i]); - for (size_t j = 0; j < 8; j++) { - uint32_t b = (ch ^ crc) & 1; - crc >>= 1; - if (b) crc = crc ^ 0xEDB88320; - ch >>= 1; - } - } - - return crc; +std::string constantChunk; + +void streamData(auto *res, auto stream, int chunk) { + + if (stream->aborted) { + return; + } + + if (chunk < 1600) { + res->cork([res, stream, chunk]() { + auto ok = res->write(constantChunk); + if (ok) { + streamData(res, stream, chunk + 1); + return; + } + + uWS::Loop::get()->defer([res, stream, chunk]() { + streamData(res, stream, chunk + 1); + }); + }); + } else { + res->cork([res]() { + res->end(); + }); + } } -template -bool writeLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { - while (!state->aborted && state->remaining) { - state->remaining--; - if (!res->write(writeChunk)) { - return false; - } - } - - if (!state->aborted) { - res->end(); - } - - return true; -} - -template -bool tryWriteLoop(uWS::HttpResponse *res, const std::shared_ptr &state) { - if (state->aborted) { - return true; - } +int main() { - uintmax_t sent = res->getWriteOffset() - state->baseOffset; - std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); - if (res->tryWrite(remaining)) { - res->end(); - return true; + for (int i = 0; i < 65536; i++) { + constantChunk.append("a", 1); } - return false; -} - -} - -int main() { uWS::SSLApp({ .key_file_name = "misc/key.pem", .cert_file_name = "misc/cert.pem", .passphrase = "1234" - }).get("/write", [](auto *res, auto */*req*/) { - auto state = std::make_shared(); - - res->onAborted([state]() { - state->aborted = true; - }); - - if (!writeLoop(res, state)) { - res->onWritable([res, state](uintmax_t) { - return writeLoop(res, state); - }); - } - }).get("/trywrite", [](auto *res, auto */*req*/) { - auto state = std::make_shared(); - state->payload = &tryWritePayload; - state->baseOffset = res->getWriteOffset(); - - res->onAborted([state]() { - state->aborted = true; - }); - - if (!tryWriteLoop(res, state)) { - res->onWritable([res, state](uintmax_t) { - return tryWriteLoop(res, state); - }); - } - }).get("/trywrite-end", [](auto *res, auto */*req*/) { - auto state = std::make_shared(); - state->payload = &tryWriteEndPayload; - state->baseOffset = res->getWriteOffset(); + }).get("/*", [](auto *res, auto */*req*/) { - res->onAborted([state]() { - state->aborted = true; - }); - - if (!res->tryWrite(*state->payload)) { - res->onWritable([res, state](uintmax_t offset) { - if (state->aborted) { - return true; - } - - uintmax_t sent = offset - state->baseOffset; - std::string_view remaining(state->payload->data() + sent, state->payload->size() - (size_t) sent); - res->end(remaining); - return true; - }); - } else { - res->end(); - } - }).post("/*", [](auto *res, auto *req) { - - auto isAborted = std::make_shared(false); - uint32_t crc = 0xFFFFFFFF; - - /* Display the headers */ - std::cout << " --- " << req->getUrl() << " --- " << std::endl; - for (auto [key, value] : *req) { - std::cout << key << ": " << value << std::endl; - } - - res->onData([res, isAborted, crc](std::string_view chunk, bool isFin) mutable { - if (chunk.length()) { - crc = crc32(chunk.data(), chunk.length(), crc); - } - - if (isFin && !*isAborted) { - std::stringstream s; - s << std::hex << (~crc) << std::endl; - res->end(s.str()); - } - }); + auto stream = std::make_shared(0, false); + streamData(res, stream, 0); - res->onAborted([isAborted]() { - *isAborted = true; + res->onAborted([stream]() { + stream->aborted = true; }); }).listen(3000, [](auto *listen_socket) { if (listen_socket) { diff --git a/tests/Makefile b/tests/Makefile index 836474628..86dc82b9b 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -19,11 +19,11 @@ performance: ./HttpRouter smoke: - ../SmokeTest & + ../Crc32 & sleep 1 ~/.deno/bin/deno run --allow-net smoke.mjs node smoke.mjs - pkill SmokeTest + pkill Crc32 compliance: ../EchoBody & diff --git a/tests/smoke.mjs b/tests/smoke.mjs index a2dd89f9c..a4b4f6953 100644 --- a/tests/smoke.mjs +++ b/tests/smoke.mjs @@ -1,4 +1,4 @@ -/* This smoke test runs against the dedicated SmokeTest program */ +/* This smoke test runs against the Crc32 example program for now, but this example will be extended for more tests */ var crc32 = (function () { var table = new Uint32Array(256); @@ -66,61 +66,6 @@ async function fixedCrc32Test(array) { } } -async function readBodySlowly(response) { - const reader = response.body.getReader(); - const chunks = []; - let total = 0; - - while (true) { - const {done, value} = await reader.read(); - if (done) { - break; - } - - chunks.push(value); - total += value.length; - await new Promise((resolve) => setTimeout(resolve, 5)); - } - - const body = new Uint8Array(total); - let offset = 0; - for (const chunk of chunks) { - body.set(chunk, offset); - offset += chunk.length; - } - return body; -} - -function expectFilled(body, size, value, label) { - if (body.length !== size) { - throw new Error(label + " failed: expected body size " + size + ", got " + body.length); - } - - for (let i = 0; i < body.length; i++) { - if (body[i] !== value) { - throw new Error(label + " failed: unexpected byte at offset " + i); - } - } -} - -async function streamingWriteTest() { - console.log("Making streaming write request"); - const res = await fetch("http://localhost:3000/write"); - expectFilled(await readBodySlowly(res), 128 * 1024, "a".charCodeAt(0), "write"); -} - -async function streamingTryWriteTest() { - console.log("Making tryWrite request"); - const res = await fetch("http://localhost:3000/trywrite"); - expectFilled(await readBodySlowly(res), 16 * 1024 * 1024, "x".charCodeAt(0), "tryWrite"); -} - -async function streamingTryWriteEndTest() { - console.log("Making tryWrite-end request"); - const res = await fetch("http://localhost:3000/trywrite-end"); - expectFilled(await readBodySlowly(res), 32 * 1024 * 1024, "y".charCodeAt(0), "tryWrite-end"); -} - /* Maximum chunk size is less than 256mb */ const sizes = [0, 0, 32, 32, 128, 256, 1024, 65536, 1024 * 1024, 1024 * 1024 * 128, 0, 0, 32, 32]; for (let i = 0; i < sizes.length; i++) { @@ -138,8 +83,4 @@ for (let i = 0; i < sizes.length; i++) { await chunkedCrc32Test(array); } -await streamingWriteTest(); -await streamingTryWriteTest(); -await streamingTryWriteEndTest(); - -console.log("Done!"); +console.log("Done!"); \ No newline at end of file From a5c168cd8d53ad739208becd59f509766164228c Mon Sep 17 00:00:00 2001 From: shartung Date: Sat, 18 Apr 2026 21:43:41 +0200 Subject: [PATCH 05/10] Clarify tryWrite chunk continuation semantics --- src/HttpResponse.h | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/HttpResponse.h b/src/HttpResponse.h index 1a6d5ebe2..7155f8c77 100644 --- a/src/HttpResponse.h +++ b/src/HttpResponse.h @@ -97,7 +97,7 @@ struct HttpResponse : public AsyncSocket { /* Chunked writes can only be resumed by continuing the same body suffix. */ std::pair internalWriteChunk(std::string_view data, bool optional, bool terminate = false) { HttpResponseData *httpResponseData = getHttpResponseData(); - bool continuingChunk = httpResponseData->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + bool insideChunk = httpResponseData->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; bool needsUncork = !Super::isCorked() && Super::canCork(); if (needsUncork) { Super::cork(); @@ -109,16 +109,18 @@ struct HttpResponse : public AsyncSocket { httpResponseData->state |= HttpResponseData::HTTP_WRITE_CALLED; } - bool completed = !continuingChunk || data.length(); + bool completed = !insideChunk || data.length(); bool failed = false; if (data.length()) { - if (!continuingChunk) { + if (!insideChunk) { char chunkHeader[34]; unsigned int chunkHeaderLength = formatChunkHeader((unsigned int) data.length(), chunkHeader); - failed = Super::write(chunkHeader, (int) chunkHeaderLength).second; + /* A chunk header must never be optional, or getWriteOffset/onWritable semantics would break. */ + failed = Super::write(chunkHeader, (int) chunkHeaderLength, false).second; } auto writtenFailed = Super::write(data.data(), (int) data.length(), optional); + /* Offset tracks body bytes only, matching getWriteOffset and the offset passed to onWritable. */ httpResponseData->offset += (uintmax_t) writtenFailed.first; failed = failed || writtenFailed.second; From a16c0688eef2a3437550998a4200cb5af5c42630 Mon Sep 17 00:00:00 2001 From: shartung Date: Sat, 18 Apr 2026 21:50:32 +0200 Subject: [PATCH 06/10] Tighten tryWrite example and chunk header handling --- examples/ChunkedResponse.cpp | 4 +--- src/HttpResponse.h | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/ChunkedResponse.cpp b/examples/ChunkedResponse.cpp index 78a373783..00e189056 100644 --- a/examples/ChunkedResponse.cpp +++ b/examples/ChunkedResponse.cpp @@ -11,7 +11,6 @@ namespace { const std::string payload(16 * 1024 * 1024, 'x'); struct ResponseState { - uintmax_t baseOffset = 0; bool aborted = false; }; @@ -21,7 +20,7 @@ bool tryWriteLoop(uWS::HttpResponse *res, ResponseState *state) { return true; } - uintmax_t sent = res->getWriteOffset() - state->baseOffset; + uintmax_t sent = res->getWriteOffset(); std::string_view remaining = payload; remaining.remove_prefix((size_t) sent); if (res->tryWrite(remaining)) { @@ -42,7 +41,6 @@ int main() { .passphrase = "1234" }).get("/*", [](auto *res, auto */*req*/) { auto state = std::make_shared(); - state->baseOffset = res->getWriteOffset(); res->writeHeader("Content-Type", "application/octet-stream"); res->onAborted([state]() { diff --git a/src/HttpResponse.h b/src/HttpResponse.h index 7155f8c77..22890fe68 100644 --- a/src/HttpResponse.h +++ b/src/HttpResponse.h @@ -113,7 +113,7 @@ struct HttpResponse : public AsyncSocket { bool failed = false; if (data.length()) { if (!insideChunk) { - char chunkHeader[34]; + char chunkHeader[12]; unsigned int chunkHeaderLength = formatChunkHeader((unsigned int) data.length(), chunkHeader); /* A chunk header must never be optional, or getWriteOffset/onWritable semantics would break. */ failed = Super::write(chunkHeader, (int) chunkHeaderLength, false).second; From 8c1da316f05999383c169417a9143d4e1c91cd25 Mon Sep 17 00:00:00 2001 From: shartung Date: Sat, 18 Apr 2026 21:56:02 +0200 Subject: [PATCH 07/10] Rename chunk formatter to reflect emitted CRLF framing --- src/HttpResponse.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/HttpResponse.h b/src/HttpResponse.h index 22890fe68..3df7b80aa 100644 --- a/src/HttpResponse.h +++ b/src/HttpResponse.h @@ -71,7 +71,7 @@ struct HttpResponse : public AsyncSocket { Super::write(buf, length); } - unsigned int formatChunkHeader(unsigned int value, char *dst) { + unsigned int formatCRLFAndChunkHeader(unsigned int value, char *dst) { dst[0] = '\r'; dst[1] = '\n'; int hexLength = utils::u32toaHex(value, dst + 2); @@ -114,7 +114,7 @@ struct HttpResponse : public AsyncSocket { if (data.length()) { if (!insideChunk) { char chunkHeader[12]; - unsigned int chunkHeaderLength = formatChunkHeader((unsigned int) data.length(), chunkHeader); + unsigned int chunkHeaderLength = formatCRLFAndChunkHeader((unsigned int) data.length(), chunkHeader); /* A chunk header must never be optional, or getWriteOffset/onWritable semantics would break. */ failed = Super::write(chunkHeader, (int) chunkHeaderLength, false).second; } From 11212b7c4873d651581c2a080d8a2c02c1e6dc9b Mon Sep 17 00:00:00 2001 From: shartung Date: Sat, 18 Apr 2026 23:01:44 +0200 Subject: [PATCH 08/10] Optimize BackPressure compaction and trim policy --- src/AsyncSocketData.h | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/AsyncSocketData.h b/src/AsyncSocketData.h index 1259ea68b..6f9c67777 100644 --- a/src/AsyncSocketData.h +++ b/src/AsyncSocketData.h @@ -18,13 +18,27 @@ #ifndef UWS_ASYNCSOCKETDATA_H #define UWS_ASYNCSOCKETDATA_H +#include #include namespace uWS { struct BackPressure { + static constexpr size_t TRIM_THRESHOLD = 64 * 1024; + static constexpr size_t RETAIN_CAPACITY = 32 * 1024; + std::string buffer; unsigned int pendingRemoval = 0; + + void normalize() { + size_t length = buffer.length() - pendingRemoval; + if (length) { + memmove(buffer.data(), buffer.data() + pendingRemoval, length); + } + buffer.resize(length); + pendingRemoval = 0; + } + BackPressure(BackPressure &&other) { buffer = std::move(other.buffer); pendingRemoval = other.pendingRemoval; @@ -34,21 +48,32 @@ struct BackPressure { buffer.append(data, length); } void erase(unsigned int length) { + size_t logicalLength = this->length(); + if (length >= logicalLength) { + clear(); + return; + } + pendingRemoval += length; /* Always erase a minimum of 1/32th the current backpressure */ if (pendingRemoval > (buffer.length() >> 5)) { - std::string(buffer.begin() + pendingRemoval, buffer.end()).swap(buffer); - pendingRemoval = 0; + normalize(); } } size_t length() { return buffer.length() - pendingRemoval; } - /* Only used in AsyncSocket::write - what about replacing it with the other functions like erase(length())? */ + /* Only used in AsyncSocket::write when buffered backpressure fully drains */ void clear() { pendingRemoval = 0; - buffer.clear(); - buffer.shrink_to_fit(); + if (buffer.capacity() > TRIM_THRESHOLD) { + /* Trim pathological spikes but keep a warm buffer for normal bursts */ + std::string retained; + retained.reserve(RETAIN_CAPACITY); + buffer.swap(retained); + } else { + buffer.clear(); + } } /* Only used by AsyncSocket::write (optionally) before append */ void reserve(size_t length) { @@ -56,7 +81,10 @@ struct BackPressure { } /* Only used by getSendBuffer as last resort */ void resize(size_t length) { - buffer.resize(length + pendingRemoval); + if (pendingRemoval) { + normalize(); + } + buffer.resize(length); } const char *data() { return buffer.data() + pendingRemoval; From c5c47eaa8dfdc4ebb4ebad268bc38950ce0ce845 Mon Sep 17 00:00:00 2001 From: shartung Date: Sun, 19 Apr 2026 10:03:05 +0200 Subject: [PATCH 09/10] Drop AsyncSocketData changes from tryWrite follow-up --- src/AsyncSocketData.h | 40 ++++++---------------------------------- 1 file changed, 6 insertions(+), 34 deletions(-) diff --git a/src/AsyncSocketData.h b/src/AsyncSocketData.h index 6f9c67777..1259ea68b 100644 --- a/src/AsyncSocketData.h +++ b/src/AsyncSocketData.h @@ -18,27 +18,13 @@ #ifndef UWS_ASYNCSOCKETDATA_H #define UWS_ASYNCSOCKETDATA_H -#include #include namespace uWS { struct BackPressure { - static constexpr size_t TRIM_THRESHOLD = 64 * 1024; - static constexpr size_t RETAIN_CAPACITY = 32 * 1024; - std::string buffer; unsigned int pendingRemoval = 0; - - void normalize() { - size_t length = buffer.length() - pendingRemoval; - if (length) { - memmove(buffer.data(), buffer.data() + pendingRemoval, length); - } - buffer.resize(length); - pendingRemoval = 0; - } - BackPressure(BackPressure &&other) { buffer = std::move(other.buffer); pendingRemoval = other.pendingRemoval; @@ -48,32 +34,21 @@ struct BackPressure { buffer.append(data, length); } void erase(unsigned int length) { - size_t logicalLength = this->length(); - if (length >= logicalLength) { - clear(); - return; - } - pendingRemoval += length; /* Always erase a minimum of 1/32th the current backpressure */ if (pendingRemoval > (buffer.length() >> 5)) { - normalize(); + std::string(buffer.begin() + pendingRemoval, buffer.end()).swap(buffer); + pendingRemoval = 0; } } size_t length() { return buffer.length() - pendingRemoval; } - /* Only used in AsyncSocket::write when buffered backpressure fully drains */ + /* Only used in AsyncSocket::write - what about replacing it with the other functions like erase(length())? */ void clear() { pendingRemoval = 0; - if (buffer.capacity() > TRIM_THRESHOLD) { - /* Trim pathological spikes but keep a warm buffer for normal bursts */ - std::string retained; - retained.reserve(RETAIN_CAPACITY); - buffer.swap(retained); - } else { - buffer.clear(); - } + buffer.clear(); + buffer.shrink_to_fit(); } /* Only used by AsyncSocket::write (optionally) before append */ void reserve(size_t length) { @@ -81,10 +56,7 @@ struct BackPressure { } /* Only used by getSendBuffer as last resort */ void resize(size_t length) { - if (pendingRemoval) { - normalize(); - } - buffer.resize(length); + buffer.resize(length + pendingRemoval); } const char *data() { return buffer.data() + pendingRemoval; From b14a54d1cc423114c5f1aa76a6a54558769135b5 Mon Sep 17 00:00:00 2001 From: shartung Date: Sun, 19 Apr 2026 11:13:02 +0200 Subject: [PATCH 10/10] Optimize final chunked tryWrite via AsyncSocket::write2 --- examples/ChunkedResponse.cpp | 7 +---- src/AsyncSocket.h | 49 +++++++++++++++++++++++++++++++++ src/HttpResponse.h | 53 ++++++++++++++++++++++++++---------- 3 files changed, 88 insertions(+), 21 deletions(-) diff --git a/examples/ChunkedResponse.cpp b/examples/ChunkedResponse.cpp index 00e189056..4e4d38027 100644 --- a/examples/ChunkedResponse.cpp +++ b/examples/ChunkedResponse.cpp @@ -23,12 +23,7 @@ bool tryWriteLoop(uWS::HttpResponse *res, ResponseState *state) { uintmax_t sent = res->getWriteOffset(); std::string_view remaining = payload; remaining.remove_prefix((size_t) sent); - if (res->tryWrite(remaining)) { - res->end(); - return true; - } - - return false; + return res->tryWrite(remaining, true); } } diff --git a/src/AsyncSocket.h b/src/AsyncSocket.h index 4231f26f1..5b3d3f56d 100644 --- a/src/AsyncSocket.h +++ b/src/AsyncSocket.h @@ -333,6 +333,55 @@ struct AsyncSocket { return {length, false}; } + /* Same semantics as write, but for two buffers. */ + std::pair write2(const char *header, int headerLength, const char *payload, int payloadLength, bool optionally = false) { + int length = headerLength + payloadLength; + + /* Fake success if closed, simple fix to allow uncork of closed socket to succeed */ + if (us_socket_is_closed(SSL, (us_socket_t *) this)) { + return {length, false}; + } + + if (!headerLength) { + return write(payload, payloadLength, optionally); + } + if (!payloadLength) { + return write(header, headerLength, optionally); + } + + if constexpr (!SSL) { + AsyncSocketData *asyncSocketData = getAsyncSocketData(); + if (!asyncSocketData->buffer.length() && getLoopData()->corkedSocket != this) { + int written = us_socket_write2(0, (us_socket_t *) this, header, headerLength, payload, payloadLength); + if (written == length || optionally) { + return {written, written != length}; + } + + if (written > headerLength) { + asyncSocketData->buffer.append(payload + written - headerLength, (size_t) (length - written)); + } else { + asyncSocketData->buffer.append(header + written, (size_t) (headerLength - written)); + asyncSocketData->buffer.append(payload, (size_t) payloadLength); + } + + return {length, true}; + } + } + + auto [headerWritten, failed] = write(header, headerLength, optionally, payloadLength); + if (failed) { + if (!optionally) { + getAsyncSocketData()->buffer.append(payload, (size_t) payloadLength); + return {length, true}; + } + + return {headerWritten, true}; + } + + auto [payloadWritten, payloadFailed] = write(payload, payloadLength, optionally); + return {headerLength + payloadWritten, payloadFailed}; + } + /* Uncork this socket and flush or buffer any corked and/or passed data. It is essential to remember doing this. */ /* It does NOT count bytes written from cork buffer (they are already accounted for in the write call responsible for its corking)! */ std::pair uncork(const char *src = nullptr, int length = 0, bool optionally = false) { diff --git a/src/HttpResponse.h b/src/HttpResponse.h index 3df7b80aa..59945e4e6 100644 --- a/src/HttpResponse.h +++ b/src/HttpResponse.h @@ -97,6 +97,7 @@ struct HttpResponse : public AsyncSocket { /* Chunked writes can only be resumed by continuing the same body suffix. */ std::pair internalWriteChunk(std::string_view data, bool optional, bool terminate = false) { HttpResponseData *httpResponseData = getHttpResponseData(); + constexpr const char *terminatingChunk = "\r\n0\r\n\r\n"; bool insideChunk = httpResponseData->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; bool needsUncork = !Super::isCorked() && Super::canCork(); if (needsUncork) { @@ -110,37 +111,58 @@ struct HttpResponse : public AsyncSocket { } bool completed = !insideChunk || data.length(); - bool failed = false; + bool callerFailed = false; + bool hadBackpressure = false; if (data.length()) { if (!insideChunk) { char chunkHeader[12]; unsigned int chunkHeaderLength = formatCRLFAndChunkHeader((unsigned int) data.length(), chunkHeader); /* A chunk header must never be optional, or getWriteOffset/onWritable semantics would break. */ - failed = Super::write(chunkHeader, (int) chunkHeaderLength, false).second; + hadBackpressure = Super::write(chunkHeader, (int) chunkHeaderLength, false).second; } - auto writtenFailed = Super::write(data.data(), (int) data.length(), optional); + auto writtenFailed = terminate ? + Super::write2(data.data(), (int) data.length(), terminatingChunk, 7, optional) : + Super::write(data.data(), (int) data.length(), optional); + int writtenBody = std::min(writtenFailed.first, (int) data.length()); /* Offset tracks body bytes only, matching getWriteOffset and the offset passed to onWritable. */ - httpResponseData->offset += (uintmax_t) writtenFailed.first; - failed = failed || writtenFailed.second; + httpResponseData->offset += (uintmax_t) writtenBody; + hadBackpressure = hadBackpressure || writtenFailed.second; - if (optional && (writtenFailed.first != (int) data.length() || failed)) { + if (optional && writtenBody != (int) data.length()) { httpResponseData->state |= HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; completed = false; + callerFailed = true; } else { httpResponseData->state &= ~HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING; + + if (terminate && writtenFailed.first != (int) data.length() + 7) { + int writtenTrailer = writtenFailed.first - writtenBody; + hadBackpressure = hadBackpressure || Super::write(terminatingChunk + writtenTrailer, 7 - writtenTrailer, false).second; + } else if (optional && writtenFailed.second) { + callerFailed = true; + } } } - if (terminate && completed) { - Super::write("\r\n0\r\n\r\n", 7); + if (terminate && completed && !data.length()) { + auto writtenFailed = Super::write(terminatingChunk, 7, optional); + hadBackpressure = hadBackpressure || writtenFailed.second; + + if (optional && writtenFailed.first != 7) { + hadBackpressure = hadBackpressure || Super::write(terminatingChunk + writtenFailed.first, 7 - writtenFailed.first, false).second; + } } if (needsUncork) { - failed = failed || Super::uncork().second; + bool uncorkFailed = Super::uncork().second; + hadBackpressure = hadBackpressure || uncorkFailed; + if (!terminate && uncorkFailed) { + callerFailed = true; + } } - if (failed || terminate || !completed) { + if (hadBackpressure || terminate || !completed) { Super::timeout(HTTP_TIMEOUT_S); } @@ -159,7 +181,7 @@ struct HttpResponse : public AsyncSocket { } } - return {completed, failed}; + return {completed, callerFailed}; } /* Returns true on success, indicating that it might be feasible to write more data. @@ -499,15 +521,16 @@ struct HttpResponse : public AsyncSocket { return !internalWriteChunk(data, false).second; } - /* Try and write one chunk. Continue with the remaining body suffix on onWritable. */ - bool tryWrite(std::string_view data) { + /* Try and write one chunk. Continue with the remaining body suffix on onWritable. + * Set finalChunk to also send the terminating 0-chunk on completion. */ + bool tryWrite(std::string_view data, bool finalChunk = false) { writeStatus(HTTP_200_OK); - if (!data.length()) { + if (!data.length() && !finalChunk) { return !(getHttpResponseData()->state & HttpResponseData::HTTP_WRITE_CONTINUATION_PENDING); } - auto [completed, failed] = internalWriteChunk(data, true); + auto [completed, failed] = internalWriteChunk(data, true, finalChunk); return completed && !failed; }