Skip to content

Commit b2b5526

Browse files
feat(async/unstable): add AbortSignal and toReadableStream to Channel (#7086)
1 parent 2aca58e commit b2b5526

2 files changed

Lines changed: 508 additions & 45 deletions

File tree

async/unstable_channel.ts

Lines changed: 229 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@
33

44
import { Deque } from "@std/data-structures/unstable-deque";
55

6+
const RESOLVED: Promise<void> = Promise.resolve();
7+
8+
const EMPTY_RESULT: { readonly state: "empty" } = Object.freeze({
9+
state: "empty",
10+
});
11+
const CLOSED_RESULT: { readonly state: "closed" } = Object.freeze({
12+
state: "closed",
13+
});
14+
615
/** Internal node for the FIFO sender waiting queue. */
716
interface SenderNode<T> {
817
value: T;
@@ -80,6 +89,37 @@ export class ChannelClosedError extends Error {
8089
}
8190
}
8291

92+
/**
93+
* Result of a non-blocking {@linkcode Channel.tryReceive} call. Discriminate
94+
* on the `state` field:
95+
*
96+
* - `"ok"` — a value was available and is provided in `value`.
97+
* - `"empty"` — the channel is open but no value is immediately available.
98+
* - `"closed"` — the channel has been closed and no buffered values remain.
99+
*
100+
* @experimental **UNSTABLE**: New API, yet to be vetted.
101+
*
102+
* @typeParam T The type of the value received from the channel.
103+
*/
104+
export type ChannelReceiveResult<T> =
105+
| { state: "ok"; value: T }
106+
| { state: "empty" }
107+
| { state: "closed" };
108+
109+
/**
110+
* Options for blocking {@linkcode Channel} operations.
111+
*
112+
* @experimental **UNSTABLE**: New API, yet to be vetted.
113+
*/
114+
export interface ChannelOptions {
115+
/**
116+
* An {@linkcode AbortSignal} to cancel a pending `send` or `receive`.
117+
* When the signal is aborted, the operation rejects with the signal's
118+
* {@linkcode AbortSignal.reason}.
119+
*/
120+
signal?: AbortSignal;
121+
}
122+
83123
/**
84124
* An async channel for communicating between concurrent tasks with optional
85125
* bounded buffering and backpressure.
@@ -127,6 +167,7 @@ export class Channel<T>
127167
#closed = false;
128168
#closeReason: unknown = undefined;
129169
#hasCloseReason = false;
170+
#receiveClosedError: ChannelClosedError | undefined;
130171

131172
#senders: Deque<SenderNode<T>>;
132173
#receivers: Deque<ReceiverNode<T>>;
@@ -166,26 +207,64 @@ export class Channel<T>
166207
* ch.close();
167208
* ```
168209
*
210+
* @example Cancelling with an AbortSignal
211+
* ```ts
212+
* import { Channel } from "@std/async/unstable-channel";
213+
* import { assertRejects } from "@std/assert";
214+
*
215+
* const ch = new Channel<number>();
216+
* const controller = new AbortController();
217+
* const p = ch.send(42, { signal: controller.signal });
218+
* controller.abort(new Error("cancelled"));
219+
* await assertRejects(() => p, Error, "cancelled");
220+
* ```
221+
*
169222
* @param value The value to send into the channel.
223+
* @param options Optional settings for the send operation.
170224
* @throws {ChannelClosedError} If the channel is closed. The error's
171225
* `value` property carries the unsent value for recovery.
172226
*/
173-
send(value: T): Promise<void> {
227+
send(value: T, options?: ChannelOptions): Promise<void> {
174228
if (this.#closed) {
175229
return Promise.reject(
176230
new ChannelClosedError("Cannot send to a closed channel", value),
177231
);
178232
}
179233

180-
if (this.#deliverToReceiver(value)) return Promise.resolve();
234+
if (this.#deliverToReceiver(value)) return RESOLVED;
181235

182236
if (this.#buffer.length < this.#capacity) {
183237
this.#buffer.pushBack(value);
184-
return Promise.resolve();
238+
return RESOLVED;
239+
}
240+
241+
if (options?.signal?.aborted) {
242+
return Promise.reject(options.signal.reason);
185243
}
186244

187245
return new Promise<void>((res, rej) => {
188-
this.#senders.pushBack({ value, res, rej });
246+
const node: SenderNode<T> = { value, res, rej };
247+
this.#senders.pushBack(node);
248+
const signal = options?.signal;
249+
if (signal) {
250+
const onAbort = () => {
251+
if (this.#senders.peekFront() === node) {
252+
this.#senders.popFront();
253+
} else {
254+
this.#senders.removeFirst((n) => n === node);
255+
}
256+
node.rej(signal.reason);
257+
};
258+
signal.addEventListener("abort", onAbort, { once: true });
259+
node.res = () => {
260+
signal.removeEventListener("abort", onAbort);
261+
res();
262+
};
263+
node.rej = (reason: unknown) => {
264+
signal.removeEventListener("abort", onAbort);
265+
rej(reason);
266+
};
267+
}
189268
});
190269
}
191270

@@ -205,18 +284,62 @@ export class Channel<T>
205284
* ch.close();
206285
* ```
207286
*
287+
* @example Cancelling with an AbortSignal
288+
* ```ts
289+
* import { Channel } from "@std/async/unstable-channel";
290+
* import { assertRejects } from "@std/assert";
291+
*
292+
* const ch = new Channel<number>();
293+
* const controller = new AbortController();
294+
* const p = ch.receive({ signal: controller.signal });
295+
* controller.abort(new Error("cancelled"));
296+
* await assertRejects(() => p, Error, "cancelled");
297+
* ```
298+
*
299+
* @param options Optional settings for the receive operation.
208300
* @returns A promise that resolves with the next value from the channel.
209301
* @throws {ChannelClosedError} If the channel is closed and empty (no
210302
* `value` property). If `close(reason)` was called, rejects with
211303
* `reason` instead.
212304
*/
213-
receive(): Promise<T> {
305+
receive(options?: ChannelOptions): Promise<T> {
214306
if (this.#buffer.length > 0) return Promise.resolve(this.#dequeue());
215-
if (!this.#senders.isEmpty()) return Promise.resolve(this.#takeSender());
307+
308+
const sender = this.#nextSender();
309+
if (sender) {
310+
sender.res();
311+
return Promise.resolve(sender.value);
312+
}
313+
216314
if (this.#closed) return Promise.reject(this.#receiveError());
217315

316+
if (options?.signal?.aborted) {
317+
return Promise.reject(options.signal.reason);
318+
}
319+
218320
return new Promise<T>((res, rej) => {
219-
this.#receivers.pushBack({ res, rej });
321+
const node: ReceiverNode<T> = { res, rej };
322+
this.#receivers.pushBack(node);
323+
const signal = options?.signal;
324+
if (signal) {
325+
const onAbort = () => {
326+
if (this.#receivers.peekFront() === node) {
327+
this.#receivers.popFront();
328+
} else {
329+
this.#receivers.removeFirst((n) => n === node);
330+
}
331+
node.rej(signal.reason);
332+
};
333+
signal.addEventListener("abort", onAbort, { once: true });
334+
node.res = (value: T) => {
335+
signal.removeEventListener("abort", onAbort);
336+
res(value);
337+
};
338+
node.rej = (reason: unknown) => {
339+
signal.removeEventListener("abort", onAbort);
340+
rej(reason);
341+
};
342+
}
220343
});
221344
}
222345

@@ -248,13 +371,13 @@ export class Channel<T>
248371
}
249372

250373
/**
251-
* Non-blocking receive. The discriminated union avoids ambiguity when `T`
252-
* itself can be `undefined`.
374+
* Non-blocking receive. Discriminate on the `state` field to determine the
375+
* outcome without ambiguity, even when `T` itself can be `undefined`.
253376
*
254-
* @returns `{ ok: true, value }` if a value was available, or
255-
* `{ ok: false }` if the buffer is empty or the channel is closed. A
256-
* close reason passed to {@linkcode Channel.close} is not surfaced here;
257-
* use {@linkcode Channel.receive} to observe it.
377+
* @returns A {@linkcode ChannelReceiveResult} — `{ state: "ok", value }`
378+
* if a value was available, `{ state: "empty" }` if the channel is open
379+
* but no value is ready, or `{ state: "closed" }` if the channel has
380+
* been closed and no buffered values remain.
258381
*
259382
* @example Usage
260383
* ```ts
@@ -263,16 +386,23 @@ export class Channel<T>
263386
*
264387
* const ch = new Channel<number>(1);
265388
* await ch.send(42);
266-
* assertEquals(ch.tryReceive(), { ok: true, value: 42 });
267-
* assertEquals(ch.tryReceive(), { ok: false });
389+
* assertEquals(ch.tryReceive(), { state: "ok", value: 42 });
390+
* assertEquals(ch.tryReceive(), { state: "empty" });
391+
* ch.close();
392+
* assertEquals(ch.tryReceive(), { state: "closed" });
268393
* ```
269394
*/
270-
tryReceive(): { ok: true; value: T } | { ok: false } {
271-
if (this.#buffer.length > 0) return { ok: true, value: this.#dequeue() };
272-
if (!this.#senders.isEmpty()) {
273-
return { ok: true, value: this.#takeSender() };
395+
tryReceive(): ChannelReceiveResult<T> {
396+
if (this.#buffer.length > 0) {
397+
return { state: "ok", value: this.#dequeue() };
274398
}
275-
return { ok: false };
399+
const sender = this.#nextSender();
400+
if (sender) {
401+
sender.res();
402+
return { state: "ok", value: sender.value };
403+
}
404+
if (this.#closed) return CLOSED_RESULT;
405+
return EMPTY_RESULT;
276406
}
277407

278408
/**
@@ -290,11 +420,26 @@ export class Channel<T>
290420
* ch.close();
291421
* assert(ch.closed);
292422
* ```
423+
*/
424+
close(): void;
425+
/**
426+
* Closes the channel with a reason. All pending and future `receive()`
427+
* calls reject with `reason` after draining buffered values. Pending
428+
* `send()` calls reject with {@linkcode ChannelClosedError}. Idempotent.
429+
*
430+
* @example Usage
431+
* ```ts
432+
* import { Channel } from "@std/async/unstable-channel";
433+
* import { assertRejects } from "@std/assert";
434+
*
435+
* const ch = new Channel<number>();
436+
* ch.close(new Error("upstream failure"));
437+
* await assertRejects(() => ch.receive(), Error, "upstream failure");
438+
* ```
293439
*
294-
* @param args If a reason argument is provided, all pending and future
295-
* `receive()` calls reject with that reason (after draining buffered
296-
* values). Enables error propagation from producer to consumer.
440+
* @param reason The reason to reject pending and future receivers with.
297441
*/
442+
close(reason: unknown): void;
298443
close(...args: [reason: unknown] | []): void {
299444
if (this.#closed) return;
300445
this.#closed = true;
@@ -415,6 +560,50 @@ export class Channel<T>
415560
}
416561
}
417562

563+
/**
564+
* Creates a {@linkcode ReadableStream} that yields values from this
565+
* channel. The stream closes when the channel closes after draining
566+
* buffered values. If the channel was closed with a reason, the stream
567+
* errors with that reason. Cancelling the stream closes the channel; a
568+
* non-`undefined` cancel reason is forwarded to {@linkcode Channel.close}
569+
* so other consumers observe it.
570+
*
571+
* @example Usage
572+
* ```ts
573+
* import { Channel } from "@std/async/unstable-channel";
574+
* import { assertEquals } from "@std/assert";
575+
*
576+
* const ch = new Channel<number>(4);
577+
* await ch.send(1);
578+
* await ch.send(2);
579+
* ch.close();
580+
*
581+
* const values = await Array.fromAsync(ch.toReadableStream());
582+
* assertEquals(values, [1, 2]);
583+
* ```
584+
*
585+
* @returns A readable stream of channel values.
586+
*/
587+
toReadableStream(): ReadableStream<T> {
588+
return new ReadableStream<T>({
589+
pull: async (controller) => {
590+
try {
591+
controller.enqueue(await this.receive());
592+
} catch (e) {
593+
if (e instanceof ChannelClosedError && !this.#hasCloseReason) {
594+
controller.close();
595+
} else {
596+
controller.error(e);
597+
}
598+
}
599+
},
600+
cancel: (reason) => {
601+
if (reason === undefined) this.close();
602+
else this.close(reason);
603+
},
604+
});
605+
}
606+
418607
/**
419608
* Calls {@linkcode Channel.close}. Enables `using` for automatic cleanup.
420609
*
@@ -448,12 +637,22 @@ export class Channel<T>
448637
*/
449638
[Symbol.asyncDispose](): Promise<void> {
450639
this.close();
451-
return Promise.resolve();
640+
return RESOLVED;
641+
}
642+
643+
/** Pops the next sender from the queue. */
644+
#nextSender(): SenderNode<T> | undefined {
645+
return this.#senders.popFront();
646+
}
647+
648+
/** Pops the next receiver from the queue. */
649+
#nextReceiver(): ReceiverNode<T> | undefined {
650+
return this.#receivers.popFront();
452651
}
453652

454653
/** Hands `value` to the next waiting receiver, if any. */
455654
#deliverToReceiver(value: T): boolean {
456-
const receiver = this.#receivers.popFront();
655+
const receiver = this.#nextReceiver();
457656
if (!receiver) return false;
458657
receiver.res(value);
459658
return true;
@@ -465,23 +664,19 @@ export class Channel<T>
465664
*/
466665
#dequeue(): T {
467666
const value = this.#buffer.popFront()!;
468-
const sender = this.#senders.popFront();
667+
const sender = this.#nextSender();
469668
if (sender) {
470669
this.#buffer.pushBack(sender.value);
471670
sender.res();
472671
}
473672
return value;
474673
}
475674

476-
/** Takes a value directly from the head of the sender queue (unbuffered path). */
477-
#takeSender(): T {
478-
const sender = this.#senders.popFront()!;
479-
sender.res();
480-
return sender.value;
481-
}
482-
483675
#receiveError(): unknown {
484676
if (this.#hasCloseReason) return this.#closeReason;
485-
return new ChannelClosedError("Cannot receive from a closed channel");
677+
this.#receiveClosedError ??= new ChannelClosedError(
678+
"Cannot receive from a closed channel",
679+
);
680+
return this.#receiveClosedError;
486681
}
487682
}

0 commit comments

Comments
 (0)