Skip to content

Commit 45b4119

Browse files
committed
quic: add proper inbound/outbound for stream
1 parent f0af79e commit 45b4119

File tree

11 files changed

+597
-15
lines changed

11 files changed

+597
-15
lines changed

doc/api/errors.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2633,6 +2633,19 @@ added:
26332633
26342634
Opening a QUIC stream failed.
26352635

2636+
<a id="ERR_QUIC_STREAM_RESET"></a>
2637+
2638+
### `ERR_QUIC_STREAM_RESET`
2639+
2640+
<!-- YAML
2641+
added: REPLACEME
2642+
-->
2643+
2644+
> Stability: 1 - Experimental
2645+
2646+
A QUIC stream was reset by the peer. The error includes the reset code
2647+
provided by the peer.
2648+
26362649
<a id="ERR_QUIC_TRANSPORT_ERROR"></a>
26372650

26382651
### `ERR_QUIC_TRANSPORT_ERROR`

doc/api/quic.md

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,13 +1149,85 @@ Sets the priority of the stream. Throws `ERR_INVALID_STATE` if the session
11491149
does not support priority (e.g. non-HTTP/3). Has no effect if the stream
11501150
has been destroyed.
11511151

1152-
### `stream.readable`
1152+
### `stream[Symbol.asyncIterator]()`
11531153

11541154
<!-- YAML
1155-
added: v23.8.0
1155+
added: REPLACEME
11561156
-->
11571157

1158-
* Type: {ReadableStream}
1158+
* Returns: {AsyncIterableIterator} yielding {Uint8Array\[]}
1159+
1160+
The stream implements `Symbol.asyncIterator`, making it directly usable
1161+
in `for await...of` loops. Each iteration yields a batch of `Uint8Array`
1162+
chunks.
1163+
1164+
Only one async iterator can be obtained per stream. A second call throws
1165+
`ERR_INVALID_STATE`. Non-readable streams (outbound-only unidirectional
1166+
or closed) return an immediately-finished iterator.
1167+
1168+
```mjs
1169+
for await (const chunks of stream) {
1170+
for (const chunk of chunks) {
1171+
// Process each Uint8Array chunk
1172+
}
1173+
}
1174+
```
1175+
1176+
Compatible with stream/iter utilities:
1177+
1178+
```mjs
1179+
import Stream from 'node:stream/iter';
1180+
const body = await Stream.bytes(stream);
1181+
const text = await Stream.text(stream);
1182+
await Stream.pipeTo(stream, someWriter);
1183+
```
1184+
1185+
### `stream.writer`
1186+
1187+
<!-- YAML
1188+
added: REPLACEME
1189+
-->
1190+
1191+
* Type: {Object}
1192+
1193+
Returns a Writer object for pushing data to the stream incrementally.
1194+
The Writer implements the stream/iter Writer interface with the
1195+
try-sync-fallback-to-async pattern.
1196+
1197+
Only available when no `body` source was provided at creation time or via
1198+
[`stream.setBody()`][]. Non-writable streams return an already-closed
1199+
Writer. Throws `ERR_INVALID_STATE` if the outbound is already configured.
1200+
1201+
The Writer has the following methods:
1202+
1203+
* `writeSync(chunk)` — Synchronous write. Returns `true` if accepted,
1204+
`false` if flow-controlled. Data is NOT accepted on `false`.
1205+
* `write(chunk[, options])` — Async write with drain wait. `options.signal`
1206+
is checked at entry but not observed during the write.
1207+
* `writevSync(chunks)` — Synchronous vectored write. All-or-nothing.
1208+
* `writev(chunks[, options])` — Async vectored write.
1209+
* `endSync()` — Synchronous close. Returns total bytes or `-1`.
1210+
* `end([options])` — Async close.
1211+
* `fail(reason)` — Errors the stream (sends RESET\_STREAM to peer).
1212+
* `desiredSize` — Available capacity in bytes, or `null` if closed/errored.
1213+
1214+
### `stream.setBody(body)`
1215+
1216+
<!-- YAML
1217+
added: REPLACEME
1218+
-->
1219+
1220+
* `body` {string|ArrayBuffer|SharedArrayBuffer|TypedArray|Blob|AsyncIterable|Iterable|Promise|null}
1221+
1222+
Sets the outbound body source for the stream. Can only be called once.
1223+
Mutually exclusive with [`stream.writer`][].
1224+
1225+
If `body` is `null`, the writable side is closed immediately (FIN sent).
1226+
If `body` is a `Promise`, it is awaited and the resolved value is used.
1227+
Other types are handled per their optimization tier (see below).
1228+
1229+
Throws `ERR_INVALID_STATE` if the outbound is already configured or if
1230+
the writer has been accessed.
11591231

11601232
### `stream.session`
11611233

@@ -2235,4 +2307,6 @@ added: v23.8.0
22352307
[`stream.onwanttrailers`]: #streamonwanttrailers
22362308
[`stream.pendingTrailers`]: #streampendingtrailers
22372309
[`stream.sendTrailers()`]: #streamsendtrailersheaders
2310+
[`stream.setBody()`]: #streamsetbodybody
22382311
[`stream.setPriority()`]: #streamsetpriorityoptions
2312+
[`stream.writer`]: #streamwriter

lib/internal/blob.js

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const {
44
ArrayFrom,
5+
ArrayPrototypePush,
56
MathMax,
67
MathMin,
78
ObjectDefineProperties,
@@ -523,13 +524,75 @@ function createBlobReaderStream(reader) {
523524
}, { highWaterMark: 0 });
524525
}
525526

527+
// Maximum number of chunks to collect in a single batch to prevent
528+
// unbounded memory growth when the DataQueue has a large burst of data.
529+
const kMaxBatchChunks = 16;
530+
531+
async function* createBlobReaderIterable(reader, options = {}) {
532+
const { getReadError } = options;
533+
let wakeup = PromiseWithResolvers();
534+
reader.setWakeup(wakeup.resolve);
535+
536+
try {
537+
while (true) {
538+
const batch = [];
539+
let blocked = false;
540+
let eos = false;
541+
let error = null;
542+
543+
// Pull as many chunks as available synchronously.
544+
// reader.pull(callback) calls the callback synchronously via
545+
// MakeCallback, so we can collect multiple chunks per iteration
546+
// step without any async overhead.
547+
while (true) {
548+
let pullResult;
549+
reader.pull((status, buffer) => {
550+
pullResult = { status, buffer };
551+
});
552+
553+
if (pullResult.status === 0) {
554+
eos = true;
555+
break;
556+
}
557+
if (pullResult.status < 0) {
558+
error = typeof getReadError === 'function' ?
559+
getReadError(pullResult.status) :
560+
new ERR_INVALID_STATE('The reader is not readable');
561+
break;
562+
}
563+
if (pullResult.status === 2) {
564+
blocked = true;
565+
break;
566+
}
567+
ArrayPrototypePush(batch, new Uint8Array(pullResult.buffer));
568+
if (batch.length >= kMaxBatchChunks) break;
569+
}
570+
571+
if (batch.length > 0) {
572+
yield batch;
573+
}
574+
575+
if (eos) return;
576+
if (error) throw error;
577+
578+
if (blocked) {
579+
await wakeup.promise;
580+
wakeup = PromiseWithResolvers();
581+
}
582+
}
583+
} finally {
584+
reader.setWakeup(undefined);
585+
}
586+
}
587+
526588
module.exports = {
527589
Blob,
528590
createBlob,
529591
createBlobFromFilePath,
592+
createBlobReaderIterable,
593+
createBlobReaderStream,
530594
isBlob,
531595
kHandle,
532596
resolveObjectURL,
533597
TransferableBlob,
534-
createBlobReaderStream,
535598
};

lib/internal/errors.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,8 @@ E('ERR_QUIC_APPLICATION_ERROR', 'A QUIC application error occurred. %d [%s]', Er
16861686
E('ERR_QUIC_CONNECTION_FAILED', 'QUIC connection failed', Error);
16871687
E('ERR_QUIC_ENDPOINT_CLOSED', 'QUIC endpoint closed: %s (%d)', Error);
16881688
E('ERR_QUIC_OPEN_STREAM_FAILED', 'Failed to open QUIC stream', Error);
1689+
E('ERR_QUIC_STREAM_RESET',
1690+
'The QUIC stream was reset by the peer with error code %d', Error);
16891691
E('ERR_QUIC_TRANSPORT_ERROR', 'A QUIC transport error occurred. %d [%s]', Error);
16901692
E('ERR_QUIC_VERSION_NEGOTIATION_ERROR', 'The QUIC session requires version negotiation', Error);
16911693
E('ERR_REQUIRE_ASYNC_MODULE', function(filename, parentFilename) {

0 commit comments

Comments
 (0)