Skip to content

Commit e0059b5

Browse files
committed
Propagate stream.readable.destroy()
1 parent 477b12a commit e0059b5

8 files changed

Lines changed: 204 additions & 85 deletions

File tree

.mocharc.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,5 @@
33
"watch-files": ["lib/**/*.ts", "test/**/*.ts"],
44
"spec": ["test/*.ts"],
55
"loader": ["ts-node/esm"],
6-
"extensions": ["ts", "tsx"],
7-
"timeout": 40000
6+
"extensions": ["ts", "tsx"]
87
}

README.md

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,11 @@ For TypeScript / CommonJS projects, not using Node.js ≥ 22, check [load-esm](h
5858

5959
## API
6060

61-
**constructor(stream: ReadableStream): Promise<void>**
61+
**constructor(stream: ReadableStream, options): Promise<void>**
6262

63-
`stream: ReadableStream`: the [Web-API readable stream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader).
64-
65-
**close(): Promise<void>**
66-
Will cancel close the Readable-node stream, and will release Web-API-readable-stream.
67-
68-
**waitForReadToComplete(): Promise<void>**
69-
If there is no unresolved read call to Web-API Readable​Stream immediately returns, otherwise it will wait until the read is resolved.
63+
- `stream: ReadableStream`: the [Web-API readable stream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader).
64+
- `options?: {propagateDestroy: boolean}`
65+
- `propagateDestroy`, default value is `false`, if set to `true`, will also cancel the _stream_ when the Node.js Readable is destroyed.
7066

7167
## Licence
7268

biome.jsonc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@
5858
"rules": {
5959
"correctness": {
6060
"noNodejsModules": "off"
61+
},
62+
"suspicious": {
63+
"noExplicitAny": "off"
6164
}
6265
}
6366
}

lib/common.ts

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,80 +1,91 @@
1-
import type { Readable as UserLandReadable } from 'readable-stream';
2-
import type { Readable as NodeReadable } from 'node:stream';
1+
import type {Readable as UserLandReadable} from 'readable-stream';
2+
import type {Readable as NodeReadable} from 'node:stream';
3+
4+
export interface ReadableWebToNodeStreamOptions {
5+
propagateDestroy: boolean;
6+
}
37

48
/**
5-
* Hybrid implementation for Node.js / Web for conversion of a Web-API stream into a Node.js stream.Readable class
6-
* Node stream readable: https://nodejs.org/api/stream.html#stream_readable_streams
7-
* Web API readable-stream: https://developer.mozilla.org/docs/Web/API/ReadableStream
9+
* A hybrid implementation that converts a Web-API ReadableStream
10+
* into a Node.js Readable stream.
11+
*
12+
* Node.js Readable docs: https://nodejs.org/api/stream.html#stream_readable_streams
13+
* Web API ReadableStream docs: https://developer.mozilla.org/docs/Web/API/ReadableStream
814
*/
915
export class CommonReadableWebToNodeStream {
10-
16+
/** Total bytes pushed to the Node.js stream. */
1117
public bytesRead = 0;
18+
19+
/** Flag indicating that the stream has been released/closed. */
1220
public released = false;
1321

14-
private pendingRead: Promise<void> | undefined;
22+
/** Holds the currently pending read, if any. */
23+
private pendingRead: Promise<void> | null = null;
1524

16-
/**
17-
* Default web API stream reader
18-
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
19-
*/
25+
/** The underlying Web-API stream reader. */
2026
private reader: ReadableStreamDefaultReader<Uint8Array>;
2127

2228
/**
23-
* @param stream ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
29+
* @param stream The Web-API ReadableStream to be wrapped.
30+
* @param options Options: `{propagateDestroy: boolean}`
2431
*/
25-
constructor(stream: ReadableStream | ReadableStream<Uint8Array>) {
32+
constructor(stream: ReadableStream<Uint8Array> | ReadableStream, private options: ReadableWebToNodeStreamOptions = {propagateDestroy: false}) {
2633
this.reader = stream.getReader();
2734
}
2835

2936
/**
30-
* Implementation of readable._read(size).
31-
* When readable._read() is called, if data is available from the resource,
32-
* the implementation should begin pushing that data into the read queue
33-
* https://nodejs.org/api/stream.html#stream_readable_read_size_1
37+
* Should be bound to the Node.js Readable._read() method.
38+
* This method pushes data into the Node stream's internal queue.
39+
*
40+
* @param nodeReadable The Node.js stream instance.
3441
*/
3542
public read(nodeReadable: NodeReadable | UserLandReadable): void {
36-
// Should start pushing data into the queue
37-
// Read data from the underlying Web-API-readable-stream
3843
if (this.released) {
3944
nodeReadable.push(null); // Signal EOF
4045
return;
4146
}
42-
this.pendingRead = this.reader
43-
.read()
44-
.then((data) => {
45-
this.pendingRead = undefined;
46-
if (data.done || this.released) {
47+
48+
// Use an async IIFE to handle asynchronous reading.
49+
this.pendingRead = (async () => {
50+
try {
51+
const result = await this.reader.read();
52+
this.pendingRead = null;
53+
54+
if (result.done || this.released) {
4755
nodeReadable.push(null); // Signal EOF
4856
} else {
49-
this.bytesRead += data.value.length;
50-
nodeReadable.push(data.value); // Push new data to the queue
57+
this.bytesRead += result.value.length;
58+
nodeReadable.push(result.value); // Push the chunk into the Node.js stream
5159
}
52-
})
53-
.catch((err) => {
54-
nodeReadable.destroy(err);
55-
});
60+
} catch (error) {
61+
nodeReadable.destroy(error as Error);
62+
}
63+
})();
5664
}
5765

5866
/**
59-
* If there is no unresolved read call to Web-API Readable​Stream immediately returns;
60-
* otherwise will wait until the read is resolved.
67+
* Closes the stream and releasing the underlying stream lock.
68+
* Implementation is Readable._destroy()
6169
*/
62-
public async waitForReadToComplete() {
63-
if (this.pendingRead) {
64-
await this.pendingRead;
70+
public destroy(error: Error | null, callback: (error?: Error | null) => void) {
71+
if (this.options.propagateDestroy ?? false) {
72+
// Propagate cancelling stream to Web API Stream
73+
this.reader.cancel().then(() => {
74+
this.release();
75+
callback();
76+
}, error => callback(error));
77+
} else {
78+
this.release();
79+
callback(error);
6580
}
6681
}
6782

6883
/**
69-
* Close wrapper
84+
* Marks the stream as released, waits for pending operations,
85+
* and releases the underlying reader lock.
7086
*/
71-
public async close(): Promise<void> {
72-
await this.syncAndRelease();
73-
}
74-
75-
private async syncAndRelease() {
87+
private release() {
7688
this.released = true;
77-
await this.waitForReadToComplete();
78-
await this.reader.releaseLock();
89+
this.reader.releaseLock();
7990
}
8091
}

lib/default.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Readable } from 'readable-stream'; // Userland Readable
2-
import { CommonReadableWebToNodeStream } from './common.js';
2+
import { CommonReadableWebToNodeStream, type ReadableWebToNodeStreamOptions } from './common.js';
3+
4+
export type { ReadableWebToNodeStreamOptions } from './common.js';
35

46
/**
57
* Converts a Web-API stream into Node stream.Readable class
@@ -12,12 +14,12 @@ export class ReadableWebToNodeStream extends Readable {
1214
private converter: CommonReadableWebToNodeStream;
1315

1416
/**
15-
*
1617
* @param stream ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
18+
* @param options Options: `{propagateDestroy: boolean}`
1719
*/
18-
constructor(stream: ReadableStream | ReadableStream<Uint8Array>) {
20+
constructor(stream: ReadableStream | ReadableStream<Uint8Array>, options?: ReadableWebToNodeStreamOptions) {
1921
super();
20-
this.converter = new CommonReadableWebToNodeStream(stream);
22+
this.converter = new CommonReadableWebToNodeStream(stream, options);
2123
}
2224

2325
/**
@@ -30,10 +32,8 @@ export class ReadableWebToNodeStream extends Readable {
3032
this.converter.read(this);
3133
}
3234

33-
/**
34-
* Close wrapper
35-
*/
36-
public async close(): Promise<void> {
37-
await this.converter.close();
35+
_destroy(error: Error | null, callback: (error?: Error | null) => void) {
36+
this.converter.destroy(error, callback);
3837
}
38+
3939
}

lib/node.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Readable } from 'node:stream'; // Node.js dependency
2-
import { CommonReadableWebToNodeStream } from './common.js';
2+
import { CommonReadableWebToNodeStream, type ReadableWebToNodeStreamOptions } from './common.js';
3+
4+
export type { ReadableWebToNodeStreamOptions } from './common.js';
35

46
/**
57
* Converts a Web-API stream into Node stream.Readable class
@@ -14,10 +16,11 @@ export class ReadableWebToNodeStream extends Readable {
1416
/**
1517
*
1618
* @param stream ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
19+
* @param options Options: `{propagateDestroy: boolean}`
1720
*/
18-
constructor(stream: ReadableStream | ReadableStream<Uint8Array>) {
21+
constructor(stream: ReadableStream | ReadableStream<Uint8Array>, options?: ReadableWebToNodeStreamOptions) {
1922
super();
20-
this.converter = new CommonReadableWebToNodeStream(stream);
23+
this.converter = new CommonReadableWebToNodeStream(stream, options);
2124
}
2225

2326
/**
@@ -30,10 +33,8 @@ export class ReadableWebToNodeStream extends Readable {
3033
this.converter.read(this);
3134
}
3235

33-
/**
34-
* Close wrapper
35-
*/
36-
public async close(): Promise<void> {
37-
await this.converter.close();
36+
_destroy(error: Error | null, callback: (error?: Error | null) => void) {
37+
this.converter.destroy(error, callback);
3838
}
39+
3940
}

test/test.ts

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import {assert} from 'chai';
2-
import {ReadableWebToNodeStream as NodeReadableWebToNodeStream} from '../lib/node.js';
2+
import {ReadableWebToNodeStream as NodeReadableWebToNodeStream, type ReadableWebToNodeStreamOptions} from '../lib/node.js';
33
import {ReadableWebToNodeStream as DefaultReadableWebToNodeStream} from '../lib/default.js';
44
import {fileTypeFromStream} from 'file-type';
55
import {parseStream} from 'music-metadata';
6+
import {MockedReadableStream, NodeReadableListener} from "./util.js";
7+
import type {Readable} from "node:stream";
68

79
const jpegDataBase64 = '/9j/4AAQSkZJRgABAQEAAAAAAAD/2wCEAAEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE';
810

@@ -17,14 +19,14 @@ async function httpGetByUrl(url: string): Promise<Response> {
1719
return response;
1820
}
1921

20-
type makeReadableWebToNodeStream = (stream: ReadableStream | ReadableStream<Uint8Array>) => NodeReadableWebToNodeStream | DefaultReadableWebToNodeStream;
22+
type makeReadableWebToNodeStream = (stream: ReadableStream | ReadableStream<Uint8Array>, options?: ReadableWebToNodeStreamOptions) => Readable;
2123

2224
const entryPoints: { label: string, makeNodeStream: makeReadableWebToNodeStream } [] = [{
2325
label: 'Node.js',
24-
makeNodeStream: stream => new NodeReadableWebToNodeStream(stream)
26+
makeNodeStream: (stream, options) => new NodeReadableWebToNodeStream(stream, options)
2527
}, {
2628
label: 'default (userland Readable)',
27-
makeNodeStream: stream => new DefaultReadableWebToNodeStream(stream),
29+
makeNodeStream: (stream, options) => new DefaultReadableWebToNodeStream(stream, options)
2830
}
2931
];
3032

@@ -41,7 +43,7 @@ entryPoints.forEach(entryPoint => {
4143
assert.isDefined(fileType, 'Detected file-type');
4244
assert.strictEqual(fileType.mime, 'image/jpeg', 'fileType.mime');
4345
} finally {
44-
await nodeReadable.close();
46+
nodeReadable.destroy()
4547
}
4648
} finally {
4749
await webReadableStream.cancel();
@@ -57,21 +59,60 @@ entryPoints.forEach(entryPoint => {
5759
const url = `https://raw.githubusercontent.com/Borewit/test-audio/958e057${trackPath}`;
5860
const response = await httpGetByUrl(url);
5961
const contentLength = response.headers.get('Content-Length');
60-
if (!response.body) {
62+
const webStream = response.body;
63+
if (!webStream) {
6164
assert.fail('response.body (Web API ReadableStream) not defined')
6265
}
63-
const nodeReadable = entryPoint.makeNodeStream(response.body);
6466
try {
65-
const metadata = await parseStream(nodeReadable, {
66-
size: contentLength ? Number.parseInt(contentLength, 10) : undefined,
67-
mimeType: response.headers.get('Content-Type') ?? undefined
68-
});
69-
assert.isDefined(metadata, 'metadata');
70-
assert.strictEqual(metadata.common.artist, 'Diablo Swing Orchestra', 'metadata.common.artist');
71-
assert.strictEqual(metadata.common.title, 'Heroines', 'metadata.common.title');
67+
const nodeReadable = entryPoint.makeNodeStream(webStream);
68+
try {
69+
const metadata = await parseStream(nodeReadable, {
70+
size: contentLength ? Number.parseInt(contentLength, 10) : undefined,
71+
mimeType: response.headers.get('Content-Type') ?? undefined
72+
});
73+
assert.isDefined(metadata, 'metadata');
74+
assert.strictEqual(metadata.common.artist, 'Diablo Swing Orchestra', 'metadata.common.artist');
75+
assert.strictEqual(metadata.common.title, 'Heroines', 'metadata.common.title');
76+
} finally {
77+
nodeReadable.destroy();
78+
}
7279
} finally {
73-
await nodeReadable.close();
80+
await webStream.cancel();
81+
}
82+
});
83+
84+
describe('destroying Node.js stream.Readable', () => {
85+
86+
async function destroyReadable(propagateDestroy: boolean): Promise<void> {
87+
const mockedWebReadableStream = new MockedReadableStream();
88+
89+
const nodeReadable = entryPoint.makeNodeStream(mockedWebReadableStream.stream, {propagateDestroy});
90+
const nodeReadableListener = new NodeReadableListener(nodeReadable);
91+
92+
assert.isFalse(mockedWebReadableStream.cancelled, 'Web API ReadableStream is not cancelled');
93+
assert.strictEqual(nodeReadableListener.closed, 0, 'Node ReadableStream is not ended');
94+
assert.strictEqual(nodeReadableListener.errors.length, 0, 'Node ReadableStream did not receive any error');
95+
assert.isFalse(mockedWebReadableStream.cancelled, 'Web API ReadableStream is not cancelled yet');
96+
nodeReadable.destroy();
97+
await nodeReadableListener.waitUntilClosed();
98+
assert.strictEqual(nodeReadableListener.errors.length, 0, 'Node ReadableStream did not receive any error');
99+
assert.strictEqual(nodeReadableListener.closed, 1, 'Node ReadableStream ended, once');
100+
101+
assert.strictEqual(mockedWebReadableStream.cancelled, propagateDestroy, 'Web API ReadableStream is cancelled');
102+
if (!propagateDestroy) {
103+
await mockedWebReadableStream.stream.cancel();
104+
assert.isTrue(mockedWebReadableStream.cancelled, 'Web API ReadableStream is cancelled');
105+
}
106+
74107
}
108+
109+
it('should be able to abort a pending read', async () => {
110+
await destroyReadable(false);
111+
});
112+
113+
it('should be able to abort a pending read with cancellation propagation', async () => {
114+
await destroyReadable(true);
115+
});
75116
});
76117
});
77118
});

0 commit comments

Comments
 (0)