Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 141 additions & 66 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const {
Promise,
PromisePrototypeThen,
ReflectApply,
Symbol,
SymbolDispose,
} = primordials;

Expand Down Expand Up @@ -66,6 +67,125 @@ function bindAsyncResource(fn, type) {
};
}

const kEosImmediateClose = Symbol('kEosImmediateClose');

/**
* Returns the current stream error tracked by eos(), if any.
* @param {import('stream').Stream} stream
* @returns {Error | null}
*/
function getEosErrored(stream) {
const errored = isWritableErrored(stream) || isReadableErrored(stream);
return typeof errored !== 'boolean' && errored || null;
}

/**
* Returns the error eos() would report from an immediate close, including
* premature close detection for unfinished readable or writable sides.
* @param {import('stream').Stream} stream
* @param {boolean} readable
* @param {boolean | null} readableFinished
* @param {boolean} writable
* @param {boolean | null} writableFinished
* @returns {Error | typeof kEosImmediateClose}
*/
function getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished) {
const errored = getEosErrored(stream);
if (errored) {
return errored;
}

if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
if (!isReadableFinished(stream, false)) {
return new ERR_STREAM_PREMATURE_CLOSE();
}
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream, false)) {
return new ERR_STREAM_PREMATURE_CLOSE();
}
}

return kEosImmediateClose;
}

function getEosInitialState(stream, options = kEmptyObject) {
const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);

// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
const willEmitClose = (
_willEmitClose(stream) &&
isReadableNodeStream(stream) === readable &&
isWritableNodeStream(stream) === writable
);

return {
readable,
writable,
willEmitClose,
writableFinished: isWritableFinished(stream, false),
readableFinished: isReadableFinished(stream, false),
closed: isClosed(stream),
};
}

/**
* Classifies whether eos() can synchronously determine the result for the
* current stream snapshot, or if it must defer to future events.
* @param {import('stream').Stream} stream
* @param {object} [options]
* @param {boolean} [options.readable]
* @param {boolean} [options.writable]
* @param {ReturnType<typeof getEosInitialState>} [state]
* @returns {Error | typeof kEosImmediateClose | null}
*/
function getEosImmediateResult(stream, options = kEmptyObject, state = getEosInitialState(stream, options)) {
const {
readable,
writable,
willEmitClose,
writableFinished,
readableFinished,
closed,
} = state;
const wState = stream._writableState;
const rState = stream._readableState;

if (closed) {
return getEosOnCloseError(
stream,
readable,
readableFinished,
writable,
writableFinished,
);
} else if (wState?.errorEmitted || rState?.errorEmitted) {
if (!willEmitClose) {
return getEosErrored(stream) ?? kEosImmediateClose;
}
} else if (
!readable &&
(!willEmitClose || isReadable(stream)) &&
(writableFinished || isWritable(stream) === false) &&
(wState == null || wState.pendingcb === undefined || wState.pendingcb === 0)
) {
return getEosErrored(stream) ?? kEosImmediateClose;
} else if (
!writable &&
(!willEmitClose || isWritable(stream)) &&
(readableFinished || isReadable(stream) === false)
) {
return getEosErrored(stream) ?? kEosImmediateClose;
} else if ((rState && stream.req && stream.aborted)) {
return getEosErrored(stream) ?? kEosImmediateClose;
}

return null;
}

function eos(stream, options, callback) {
if (arguments.length === 2) {
callback = options;
Expand Down Expand Up @@ -94,28 +214,22 @@ function eos(stream, options, callback) {
throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
}

const readable = options.readable ?? isReadableNodeStream(stream);
const writable = options.writable ?? isWritableNodeStream(stream);

const eosState = getEosInitialState(stream, options);
const wState = stream._writableState;
const rState = stream._readableState;

const onlegacyfinish = () => {
if (!stream.writable) {
onfinish();
}
};

// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
let willEmitClose = (
_willEmitClose(stream) &&
isReadableNodeStream(stream) === readable &&
isWritableNodeStream(stream) === writable
);

let writableFinished = isWritableFinished(stream, false);
const { readable, writable } = eosState;
let {
willEmitClose,
writableFinished,
readableFinished,
closed,
} = eosState;
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -134,7 +248,6 @@ function eos(stream, options, callback) {
}
};

let readableFinished = isReadableFinished(stream, false);
const onend = () => {
readableFinished = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -157,41 +270,15 @@ function eos(stream, options, callback) {
callback.call(stream, err);
};

let closed = isClosed(stream);

const onclose = () => {
closed = true;

const errored = isWritableErrored(stream) || isReadableErrored(stream);

if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored);
}

if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
if (!isReadableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}

callback.call(stream);
};

const onclosed = () => {
closed = true;

const errored = isWritableErrored(stream) || isReadableErrored(stream);

if (errored && typeof errored !== 'boolean') {
return callback.call(stream, errored);
const error = getEosOnCloseError(stream, readable, readableFinished, writable, writableFinished);
if (error === kEosImmediateClose) {
callback.call(stream);
} else {
callback.call(stream, error);
}

callback.call(stream);
};

const onrequest = () => {
Expand Down Expand Up @@ -225,27 +312,13 @@ function eos(stream, options, callback) {
}
stream.on('close', onclose);

if (closed) {
process.nextTick(onclose);
} else if (wState?.errorEmitted || rState?.errorEmitted) {
if (!willEmitClose) {
process.nextTick(onclosed);
const immediateResult = getEosImmediateResult(stream, options, eosState);
if (immediateResult !== null) {
if (immediateResult === kEosImmediateClose) {
process.nextTick(() => callback.call(stream));
} else {
process.nextTick(() => callback.call(stream, immediateResult));
}
} else if (
!readable &&
(!willEmitClose || isReadable(stream)) &&
(writableFinished || isWritable(stream) === false) &&
(wState == null || wState.pendingcb === undefined || wState.pendingcb === 0)
) {
process.nextTick(onclosed);
} else if (
!writable &&
(!willEmitClose || isWritable(stream)) &&
(readableFinished || isReadable(stream) === false)
) {
process.nextTick(onclosed);
} else if ((rState && stream.req && stream.aborted)) {
process.nextTick(onclosed);
}

const cleanup = () => {
Expand Down Expand Up @@ -347,4 +420,6 @@ function finished(stream, opts) {
module.exports = {
eos,
finished,
getEosImmediateResult,
kEosImmediateClose,
};
Loading
Loading