From 0311b677b6d8125ce51465c12342574c0680b654 Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Mon, 1 Sep 2025 15:01:34 +0200 Subject: [PATCH 1/4] Eliminate buffering in `transform` A `TransformStream` currently always requires an internal queue (until https://github.com/whatwg/streams/issues/1158 is resolved). Therefore, don't use `TransformStream` in `transform` anymore, but create a new `ReadableStream` with the transformed chunks directly, and with a `highWaterMark` of 0, such that the internal queue is always empty. --- lib/streams.js | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index 20019a2..c8953d3 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -248,24 +248,45 @@ function transform(input, process = () => undefined, finish = () => undefined) { return output; } if (isStream(input)) { - return transformRaw(input, { - async transform(value, controller) { - try { - const result = await process(value); - if (result !== undefined) controller.enqueue(result); - } catch(e) { - controller.error(e); - } + let reader; + let allDone = false; + return new ReadableStream({ + start() { + reader = input.getReader(); }, - async flush(controller) { + async pull(controller) { + if (allDone) { + controller.close(); + return; + } try { - const result = await finish(); - if (result !== undefined) controller.enqueue(result); - } catch(e) { + // Read repeatedly until we have a chunk to enqueue or until + // we can close the stream, as `pull` won't get called again + // until we call `enqueue` or `close`. + while (true) { + const { value, done } = await reader.read(); + allDone = done; + const result = await (done ? finish : process)(value); + if (result !== undefined) { + controller.enqueue(result); + return; // `pull` will get called again + } + if (done) { + // If `finish` didn't return a chunk to enqueue, call + // `close` here. Otherwise, it will get called in the + // next call to `pull`, above (since `allDone == true`). + controller.close(); + return; + } + } + } catch (e) { controller.error(e); } + }, + async cancel(reason) { + await reader.cancel(reason); } - }); + }, { highWaterMark: 0 }); } const result1 = process(input); const result2 = finish(); From 00c01f2a4f14b51e9de5d16ea44b85618547633c Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Mon, 1 Sep 2025 18:05:06 +0200 Subject: [PATCH 2/4] Unlock input reader after transforming stream --- lib/streams.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/streams.js b/lib/streams.js index c8953d3..19e8430 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -257,6 +257,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { async pull(controller) { if (allDone) { controller.close(); + input.releaseLock(); return; } try { @@ -276,6 +277,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { // `close` here. Otherwise, it will get called in the // next call to `pull`, above (since `allDone == true`). controller.close(); + input.releaseLock(); return; } } From c7325f8c69136440330bb4593c814c6b538400f2 Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Wed, 3 Sep 2025 15:27:12 +0200 Subject: [PATCH 3/4] Add optional `queuingStrategy` parameter to `transform` --- lib/streams.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index 19e8430..deda1e3 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -222,12 +222,19 @@ function transformWithCancel(customCancel) { /** * Transform a stream using helper functions which are called on each chunk, and on stream close, respectively. + * Takes an optional queuing strategy for the resulting readable stream; + * see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy. + * By default, the queueing strategy is non-buffering. When the `process` + * function is asynchronous, it may be useful to pass a buffering + * queuing strategy to enable multiple chunks to be processed in parallel; + * e.g. pass `{ highWaterMark: 4 }` to process up to 4 chunks in parallel. * @param {ReadableStream|Uint8array|String} input * @param {Function} process * @param {Function} finish + * @param {Object} queuingStrategy * @returns {ReadableStream|Uint8array|String} */ -function transform(input, process = () => undefined, finish = () => undefined) { +function transform(input, process = () => undefined, finish = () => undefined, queuingStrategy = { highWaterMark: 0 }) { if (isArrayStream(input)) { const output = new ArrayStream(); (async () => { @@ -288,7 +295,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { async cancel(reason) { await reader.cancel(reason); } - }, { highWaterMark: 0 }); + }, queuingStrategy); } const result1 = process(input); const result2 = finish(); From de59619cfab276b760cc00a44c0677f4d0c08c2d Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Wed, 3 Sep 2025 15:37:00 +0200 Subject: [PATCH 4/4] Eliminate buffering in `slice` --- lib/streams.js | 49 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 10 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index deda1e3..d4bc929 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -271,7 +271,7 @@ function transform(input, process = () => undefined, finish = () => undefined, q // Read repeatedly until we have a chunk to enqueue or until // we can close the stream, as `pull` won't get called again // until we call `enqueue` or `close`. - while (true) { + while (true) { // eslint-disable-line no-constant-condition const { value, done } = await reader.read(); allDone = done; const result = await (done ? finish : process)(value); @@ -473,19 +473,48 @@ function slice(input, begin=0, end=Infinity) { } if (isStream(input)) { if (begin >= 0 && end >= 0) { + let reader; let bytesRead = 0; - return transformRaw(input, { - transform(value, controller) { - if (bytesRead < end) { - if (bytesRead + value.length >= begin) { - controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead)); + return new ReadableStream({ + start() { + reader = input.getReader(); + }, + async pull(controller) { + try { + // Read repeatedly until we have a chunk to enqueue or until + // we can close the stream, as `pull` won't get called again + // until we call `enqueue` or `close`. + while (true) { // eslint-disable-line no-constant-condition + if (bytesRead < end) { + const { value, done } = await reader.read(); + if (done) { + controller.close(); + input.releaseLock(); + return; + } + let valueToEnqueue; + if (bytesRead + value.length >= begin) { + valueToEnqueue = slice(value, Math.max(begin - bytesRead, 0), end - bytesRead); + } + bytesRead += value.length; + if (valueToEnqueue) { + controller.enqueue(valueToEnqueue); + return; // `pull` will get called again + } + } else { + controller.close(); + input.releaseLock(); + return; + } } - bytesRead += value.length; - } else { - controller.terminate(); + } catch (e) { + controller.error(e); } + }, + async cancel(reason) { + await reader.cancel(reason); } - }); + }, { highWaterMark: 0 }); } if (begin < 0 && (end < 0 || end === Infinity)) { let lastBytes = [];