Skip to content

Commit 7c9c069

Browse files
committed
stream: align Readable adapter termination with eos
1 parent 9cff872 commit 7c9c069

File tree

3 files changed

+210
-111
lines changed

3 files changed

+210
-111
lines changed

lib/internal/streams/end-of-stream.js

Lines changed: 141 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
Promise,
88
PromisePrototypeThen,
99
ReflectApply,
10+
Symbol,
1011
SymbolDispose,
1112
} = primordials;
1213

@@ -66,6 +67,125 @@ function bindAsyncResource(fn, type) {
6667
};
6768
}
6869

70+
const kEosImmediateClose = Symbol('kEosImmediateClose');
71+
72+
/**
73+
* Returns the current stream error tracked by eos(), if any.
74+
* @param {import('stream').Stream} stream
75+
* @returns {Error | null}
76+
*/
77+
function getEosErrored(stream) {
78+
const errored = isWritableErrored(stream) || isReadableErrored(stream);
79+
return typeof errored !== 'boolean' && errored || null;
80+
}
81+
82+
/**
83+
* Returns the error eos() would report from an immediate close, including
84+
* premature close detection for unfinished readable or writable sides.
85+
* @param {import('stream').Stream} stream
86+
* @param {boolean} readable
87+
* @param {boolean | null} readableFinished
88+
* @param {boolean} writable
89+
* @param {boolean | null} writableFinished
90+
* @returns {Error | typeof kEosImmediateClose}
91+
*/
92+
function getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished) {
93+
const errored = getEosErrored(stream);
94+
if (errored) {
95+
return errored;
96+
}
97+
98+
if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
99+
if (!isReadableFinished(stream, false)) {
100+
return new ERR_STREAM_PREMATURE_CLOSE();
101+
}
102+
}
103+
if (writable && !writableFinished) {
104+
if (!isWritableFinished(stream, false)) {
105+
return new ERR_STREAM_PREMATURE_CLOSE();
106+
}
107+
}
108+
109+
return kEosImmediateClose;
110+
}
111+
112+
function getEosInitialState(stream, options = kEmptyObject) {
113+
const readable = options.readable ?? isReadableNodeStream(stream);
114+
const writable = options.writable ?? isWritableNodeStream(stream);
115+
116+
// TODO (ronag): Improve soft detection to include core modules and
117+
// common ecosystem modules that do properly emit 'close' but fail
118+
// this generic check.
119+
const willEmitClose = (
120+
_willEmitClose(stream) &&
121+
isReadableNodeStream(stream) === readable &&
122+
isWritableNodeStream(stream) === writable
123+
);
124+
125+
return {
126+
readable,
127+
writable,
128+
willEmitClose,
129+
writableFinished: isWritableFinished(stream, false),
130+
readableFinished: isReadableFinished(stream, false),
131+
closed: isClosed(stream),
132+
};
133+
}
134+
135+
/**
136+
* Classifies whether eos() can synchronously determine the result for the
137+
* current stream snapshot, or if it must defer to future events.
138+
* @param {import('stream').Stream} stream
139+
* @param {object} [options]
140+
* @param {boolean} [options.readable]
141+
* @param {boolean} [options.writable]
142+
* @param {ReturnType<typeof getEosInitialState>} [state]
143+
* @returns {Error | typeof kEosImmediateClose | null}
144+
*/
145+
function getEosImmediateResult(stream, options = kEmptyObject, state = getEosInitialState(stream, options)) {
146+
const {
147+
readable,
148+
writable,
149+
willEmitClose,
150+
writableFinished,
151+
readableFinished,
152+
closed,
153+
} = state;
154+
const wState = stream._writableState;
155+
const rState = stream._readableState;
156+
157+
if (closed) {
158+
return getEosOnCloseError(
159+
stream,
160+
readable,
161+
readableFinished,
162+
writable,
163+
writableFinished,
164+
);
165+
} else if (wState?.errorEmitted || rState?.errorEmitted) {
166+
if (!willEmitClose) {
167+
return getEosErrored(stream) ?? kEosImmediateClose;
168+
}
169+
} else if (
170+
!readable &&
171+
(!willEmitClose || isReadable(stream)) &&
172+
(writableFinished || isWritable(stream) === false) &&
173+
(wState == null || wState.pendingcb === undefined || wState.pendingcb === 0)
174+
) {
175+
return getEosErrored(stream) ?? kEosImmediateClose;
176+
} else if (
177+
!writable &&
178+
(!willEmitClose || isWritable(stream)) &&
179+
(readableFinished || isReadable(stream) === false)
180+
) {
181+
return getEosErrored(stream) ?? kEosImmediateClose;
182+
} else if ((rState && stream.req && stream.aborted)) {
183+
return getEosErrored(stream) ?? kEosImmediateClose;
184+
}
185+
186+
return null;
187+
}
188+
69189
function eos(stream, options, callback) {
70190
if (arguments.length === 2) {
71191
callback = options;
@@ -94,28 +214,22 @@ function eos(stream, options, callback) {
94214
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
95215
}
96216

97-
const readable = options.readable ?? isReadableNodeStream(stream);
98-
const writable = options.writable ?? isWritableNodeStream(stream);
99-
217+
const eosState = getEosInitialState(stream, options);
100218
const wState = stream._writableState;
101-
const rState = stream._readableState;
102219

103220
const onlegacyfinish = () => {
104221
if (!stream.writable) {
105222
onfinish();
106223
}
107224
};
108225

109-
// TODO (ronag): Improve soft detection to include core modules and
110-
// common ecosystem modules that do properly emit 'close' but fail
111-
// this generic check.
112-
let willEmitClose = (
113-
_willEmitClose(stream) &&
114-
isReadableNodeStream(stream) === readable &&
115-
isWritableNodeStream(stream) === writable
116-
);
117-
118-
let writableFinished = isWritableFinished(stream, false);
226+
const { readable, writable } = eosState;
227+
let {
228+
willEmitClose,
229+
writableFinished,
230+
readableFinished,
231+
closed,
232+
} = eosState;
119233
const onfinish = () => {
120234
writableFinished = true;
121235
// Stream should not be destroyed here. If it is that
@@ -134,7 +248,6 @@ function eos(stream, options, callback) {
134248
}
135249
};
136250

137-
let readableFinished = isReadableFinished(stream, false);
138251
const onend = () => {
139252
readableFinished = true;
140253
// Stream should not be destroyed here. If it is that
@@ -157,41 +270,15 @@ function eos(stream, options, callback) {
157270
callback.call(stream, err);
158271
};
159272

160-
let closed = isClosed(stream);
161-
162273
const onclose = () => {
163274
closed = true;
164275

165-
const errored = isWritableErrored(stream) || isReadableErrored(stream);
166-
167-
if (errored && typeof errored !== 'boolean') {
168-
return callback.call(stream, errored);
169-
}
170-
171-
if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
172-
if (!isReadableFinished(stream, false))
173-
return callback.call(stream,
174-
new ERR_STREAM_PREMATURE_CLOSE());
175-
}
176-
if (writable && !writableFinished) {
177-
if (!isWritableFinished(stream, false))
178-
return callback.call(stream,
179-
new ERR_STREAM_PREMATURE_CLOSE());
180-
}
181-
182-
callback.call(stream);
183-
};
184-
185-
const onclosed = () => {
186-
closed = true;
187-
188-
const errored = isWritableErrored(stream) || isReadableErrored(stream);
189-
190-
if (errored && typeof errored !== 'boolean') {
191-
return callback.call(stream, errored);
276+
const error = getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished);
277+
if (error === kEosImmediateClose) {
278+
callback.call(stream);
279+
} else {
280+
callback.call(stream, error);
192281
}
193-
194-
callback.call(stream);
195282
};
196283

197284
const onrequest = () => {
@@ -225,27 +312,13 @@ function eos(stream, options, callback) {
225312
}
226313
stream.on('close', onclose);
227314

228-
if (closed) {
229-
process.nextTick(onclose);
230-
} else if (wState?.errorEmitted || rState?.errorEmitted) {
231-
if (!willEmitClose) {
232-
process.nextTick(onclosed);
315+
const immediateResult = getEosImmediateResult(stream, options, eosState);
316+
if (immediateResult !== null) {
317+
if (immediateResult === kEosImmediateClose) {
318+
process.nextTick(() => callback.call(stream));
319+
} else {
320+
process.nextTick(() => callback.call(stream, immediateResult));
233321
}
234-
} else if (
235-
!readable &&
236-
(!willEmitClose || isReadable(stream)) &&
237-
(writableFinished || isWritable(stream) === false) &&
238-
(wState == null || wState.pendingcb === undefined || wState.pendingcb === 0)
239-
) {
240-
process.nextTick(onclosed);
241-
} else if (
242-
!writable &&
243-
(!willEmitClose || isWritable(stream)) &&
244-
(readableFinished || isReadable(stream) === false)
245-
) {
246-
process.nextTick(onclosed);
247-
} else if ((rState && stream.req && stream.aborted)) {
248-
process.nextTick(onclosed);
249322
}
250323

251324
const cleanup = () => {
@@ -347,4 +420,6 @@ function finished(stream, opts) {
347420
module.exports = {
348421
eos,
349422
finished,
423+
getEosImmediateResult,
424+
kEosImmediateClose,
350425
};

0 commit comments

Comments
 (0)