Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/internal/streams/iter/broadcast.js
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,10 @@ function wireBroadcastWriteSignal(entry, signal, resolve, reject, self) {
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
}

function onBroadcastCancel(broadcastImpl, signal) {
onSignalAbort(signal, () => broadcastImpl.cancel(signal.reason));
}

// =============================================================================
// Public API
// =============================================================================
Expand Down Expand Up @@ -720,7 +724,7 @@ function broadcast(options = { __proto__: null }) {
broadcastImpl.setWriter(writer);

if (signal) {
onSignalAbort(signal, () => broadcastImpl.cancel());
onBroadcastCancel(broadcastImpl, signal);
}

return { __proto__: null, writer, broadcast: broadcastImpl };
Expand Down
51 changes: 45 additions & 6 deletions lib/internal/streams/iter/classic.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const {
const {
toAsyncStreamable: kToAsyncStreamable,
kValidatedSource,
kSyncWriteAccepted,
drainableProtocol,
} = require('internal/streams/iter/types');

Expand Down Expand Up @@ -764,13 +765,41 @@ function toWritable(writer) {
const hasEndSync = hasEnd &&
typeof writer.endSync === 'function';
const hasFail = typeof writer.fail === 'function';
const hasSyncWriteAccepted =
typeof writer[kSyncWriteAccepted] === 'function';

// Try-sync-first pattern: attempt the synchronous method and
// fall back to the async method if it returns false (indicating
// the sync path was not accepted) or throws. When the sync path
// succeeds, the callback is deferred via queueMicrotask to
// preserve the async resolution contract that Writable internals
// expect from _write/_writev/_final callbacks.
function syncWriteAccepted() {
return hasSyncWriteAccepted && writer[kSyncWriteAccepted]();
}

function finishAfterSyncBackpressure(cb) {
let ondrain;
try {
if (typeof writer[drainableProtocol] === 'function') {
ondrain = writer[drainableProtocol]();
}
} catch (err) {
cb(err);
return;
}
if (ondrain !== null && ondrain !== undefined) {
PromisePrototypeThen(ondrain, (drained) => {
if (drained === false) {
cb(new ERR_INVALID_STATE.TypeError('Stream closed by consumer'));
return;
}
cb();
}, cb);
return;
}
queueMicrotask(cb);
}

// Try-sync-first pattern: attempt the synchronous method and fall back to the
// async method if it returns false without accepting the data, or if it
// throws. When the sync path succeeds, the callback is deferred via
// queueMicrotask to preserve the async resolution contract that Writable
// internals expect from _write/_writev/_final callbacks.

function _write(chunk, encoding, cb) {
const bytes = typeof chunk === 'string' ?
Expand All @@ -781,6 +810,11 @@ function toWritable(writer) {
queueMicrotask(cb);
return;
}
if (syncWriteAccepted()) {
// The chunk was accepted; false only signaled backpressure.
finishAfterSyncBackpressure(cb);
return;
}
} catch {
// Sync path threw -- fall through to async.
}
Expand All @@ -805,6 +839,11 @@ function toWritable(writer) {
queueMicrotask(cb);
return;
}
if (syncWriteAccepted()) {
// The chunks were accepted; false only signaled backpressure.
finishAfterSyncBackpressure(cb);
return;
}
} catch {
// Sync path threw -- fall through to async.
}
Expand Down
12 changes: 12 additions & 0 deletions lib/internal/streams/iter/push.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {

const {
drainableProtocol,
kSyncWriteAccepted,
kSyncWriteAcceptedOnFalse,
} = require('internal/streams/iter/types');

Expand Down Expand Up @@ -545,11 +546,16 @@ class PushQueue {

class PushWriter {
#queue;
#syncWriteAccepted = false;

constructor(queue) {
this.#queue = queue;
}

[kSyncWriteAccepted]() {
return this.#syncWriteAccepted;
}

[drainableProtocol]() {
const desired = this.desiredSize;
if (desired === null) return null;
Expand Down Expand Up @@ -589,19 +595,23 @@ class PushWriter {
}

writeSync(chunk) {
this.#syncWriteAccepted = false;
const bytes = toUint8Array(chunk);
const result = this.#queue.writeSync([bytes]);
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
// Block policy: force-enqueue and return false as backpressure signal.
// Data IS accepted; false tells caller to slow down.
this.#queue.forceEnqueue([bytes]);
this.#syncWriteAccepted = true;
return false;
}
this.#syncWriteAccepted = result;
return result;
}

writevSync(chunks) {
this.#syncWriteAccepted = false;
if (!ArrayIsArray(chunks)) {
throw new ERR_INVALID_ARG_TYPE('chunks', 'Array', chunks);
}
Expand All @@ -610,8 +620,10 @@ class PushWriter {
if (!result && this.#queue.backpressurePolicy === 'block' &&
this.#queue.desiredSize === 0) {
this.#queue.forceEnqueue(bytes);
this.#syncWriteAccepted = true;
return false;
}
this.#syncWriteAccepted = result;
return result;
}

Expand Down
7 changes: 6 additions & 1 deletion lib/internal/streams/iter/share.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class ShareImpl {
// cursor must re-pull rather than terminating prematurely.
for (;;) {
if (state.detached) {
if (self.#sourceError) throw self.#sourceError;
return { __proto__: null, done: true, value: undefined };
}

Expand Down Expand Up @@ -628,6 +629,10 @@ class SyncShareImpl {
}
}

function onShareCancel(shareImpl, signal) {
onSignalAbort(signal, () => shareImpl.cancel(signal.reason));
}

// =============================================================================
// Public API
// =============================================================================
Expand Down Expand Up @@ -657,7 +662,7 @@ function share(source, options = { __proto__: null }) {
const shareImpl = new ShareImpl(normalized, opts);

if (signal) {
onSignalAbort(signal, () => shareImpl.cancel());
onShareCancel(shareImpl, signal);
}

return shareImpl;
Expand Down
13 changes: 13 additions & 0 deletions lib/internal/streams/iter/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,24 @@ const kValidatedTransform = Symbol('kValidatedTransform');
*/
const kValidatedSource = Symbol('kValidatedSource');

/**
* Internal sentinel for writers whose sync write methods can return false
* after accepting data as a backpressure signal.
*/
const kSyncWriteAccepted = Symbol('kSyncWriteAccepted');

/**
* Internal sentinel for writers whose sync write methods may return false
* after accepting data when backpressure is applied. Such writers must expose
* desiredSize so callers can distinguish accepted backpressure from a sync
* write that was not performed.
*/
const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse');

module.exports = {
broadcastProtocol,
drainableProtocol,
kSyncWriteAccepted,
kSyncWriteAcceptedOnFalse,
kValidatedSource,
kValidatedTransform,
Expand Down
22 changes: 12 additions & 10 deletions test/parallel/test-stream-iter-broadcast-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ async function testAbortSignal() {

ac.abort();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
}, { name: 'AbortError' });
}

async function testAlreadyAbortedSignal() {
Expand All @@ -84,11 +85,12 @@ async function testAlreadyAbortedSignal() {
const { broadcast: bc } = broadcast({ signal: ac.signal });
const consumer = bc.push();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
}, { name: 'AbortError' });
}

// =============================================================================
Expand Down
76 changes: 64 additions & 12 deletions test/parallel/test-stream-iter-share-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,66 @@ async function testShareCancelWithReason() {

async function testShareAbortSignal() {
const ac = new AbortController();
const shared = share(from('data'), { signal: ac.signal });
const consumer = shared.pull();
const reason = new Error('share aborted');
const enc = new TextEncoder();
async function* source() {
yield [enc.encode('a')];
yield [enc.encode('b')];
}
const shared = share(source(), {
highWaterMark: 1,
backpressure: 'block',
signal: ac.signal,
});
const fast = shared.pull()[Symbol.asyncIterator]();
shared.pull();

await fast.next();
const read = fast.next();
const rejected = assert.rejects(read, (error) => error === reason);
ac.abort(reason);

await rejected;
}

async function testShareAbortSignalWhileSourcePullPending() {
const ac = new AbortController();
const {
promise: resumePromise,
resolve: resume,
} = Promise.withResolvers();
const {
promise: sourceStartedPromise,
resolve: sourceStarted,
} = Promise.withResolvers();

const source = {
__proto__: null,
[Symbol.asyncIterator]() {
return {
__proto__: null,
async next() {
sourceStarted();
await resumePromise;
return { __proto__: null, done: true, value: undefined };
},
};
},
};

const shared = share(source, { signal: ac.signal });
const iter1 = shared.pull()[Symbol.asyncIterator]();
const iter2 = shared.pull()[Symbol.asyncIterator]();
const read1 = iter1.next();
const read2 = iter2.next();
const rejected1 = assert.rejects(read1, { name: 'AbortError' });
const rejected2 = assert.rejects(read2, { name: 'AbortError' });

await sourceStartedPromise;
ac.abort();
resume();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await Promise.all([rejected1, rejected2]);
}

async function testShareAlreadyAborted() {
Expand All @@ -153,11 +203,12 @@ async function testShareAlreadyAborted() {
const shared = share(from('data'), { signal: ac.signal });
const consumer = shared.pull();

const batches = [];
for await (const batch of consumer) {
batches.push(batch);
}
assert.strictEqual(batches.length, 0);
await assert.rejects(async () => {
// eslint-disable-next-line no-unused-vars
for await (const _ of consumer) {
assert.fail('Should not reach here');
}
}, { name: 'AbortError' });
}

// =============================================================================
Expand Down Expand Up @@ -273,6 +324,7 @@ Promise.all([
testShareCancelMidIteration(),
testShareCancelWithReason(),
testShareAbortSignal(),
testShareAbortSignalWhileSourcePullPending(),
testShareAlreadyAborted(),
testShareSourceError(),
testShareLateJoiningConsumer(),
Expand Down
Loading
Loading