diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index 3d526c5..aca345d 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -17,4 +17,4 @@ jobs: test: uses: voxpelli/ghatemplates/.github/workflows/test.yml@main with: - node-versions: '18,20,21' + node-versions: '22,24' diff --git a/.knip.jsonc b/.knip.jsonc index 156b81a..c1873cd 100644 --- a/.knip.jsonc +++ b/.knip.jsonc @@ -1,7 +1,6 @@ { "$schema": "https://unpkg.com/knip@5/schema.json", "entry": [ - "index.js!", - "benchmark/*.js" + "benchmark/index.js" ] } diff --git a/.mocharc.json b/.mocharc.json new file mode 100644 index 0000000..61b6615 --- /dev/null +++ b/.mocharc.json @@ -0,0 +1,5 @@ +{ + "node-option": [ + "expose-gc" + ] +} diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..f4bfd3a --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,97 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Build, test, lint + +The project is pure ESM JavaScript with **types written in JSDoc** (no TypeScript source files). `.d.ts` declarations are emitted from JSDoc by `tsc -p declaration.tsconfig.json` at publish time only — do not commit them. + +- `npm test` — full check chain (lint, tsc, knip, type-coverage, installed-check) followed by mocha + c8 coverage. This is what the pre-push husky hook runs; if it fails, fix the cause rather than `--no-verify`. +- `npm run check` — only the static checks (lint + tsc + knip + type-coverage + installed-check), no tests. +- `npx mocha test/.spec.js` — run a single spec file. +- `npx mocha test/.spec.js -g ""` — filter to specific `it()` blocks within a file. +- `npm run build` — clean and emit `.d.ts` declarations. +- `npm run bench` — run the mitata benchmark suite (see "Benchmarks" below). + +Type-coverage is enforced at **≥95% strict** (excluding `test/*.spec.js` and `benchmark/**/*.js`). Lint is `@voxpelli/eslint-config` (neostandard). Knip's "unused devDependency" findings are treated as errors by `npm test`. `benchmark/index.js` is the knip entry point for the benchmark dir; `benchmark/*.js` is still tsc-checked (it's in `tsconfig.json`'s `include`) but excluded from `.d.ts` emit and type-coverage. + +Commits must follow Conventional Commits (validated by the `commit-msg` husky hook via `validate-conventional-commit`); `release-please` cuts releases automatically from `main`, so `feat:` bumps minor and `fix:` bumps patch. + +Engines: Node ≥22.0.0 (the well-known `Symbol.asyncDispose` is required natively). The CI matrix in `.github/workflows/nodejs.yml` should match. + +## Architecture + +The library is one core function (`bufferedAsyncMap`) plus a thin wrapper (`mergeIterables`). Everything lives in `index.js`; `lib/` contains three small helpers worth knowing about. + +### `bufferedAsyncMap(input, callback, options)` — the state machine + +The function returns a stateful `AsyncIterableIterator` with these closure variables forming the state machine: + +- **`bufferedPromises[]`** — in-flight promises (size capped at `bufferSize`). Each is the `callback(item, {signal})` result wrapped to never reject (errors are caught into `{err}` envelopes). +- **`subIterators[]`** — stack of nested iterators spawned when `callback` returns an `AsyncIterable` (async-generator callbacks). +- **`promisesToSourceIteratorMap`** — WeakMap tracking which iterator produced each buffer slot; consulted by `findLeastTargeted` (`lib/find-least-targeted.js`) for load-balancing. +- **`internalAbortController`** — an `AbortController` minted per call. Its signal is **always** the second arg to `callback`, regardless of whether the consumer passed `options.signal`. It fires from `markAsEnded()` on iterator close, from `options.signal` aborting (linked via `addEventListener('abort', …)`), and from the first error in `errors: 'fail-fast'` mode. This is what lets in-flight callbacks fast-path on shutdown. +- **`abortReason: { reason, delivered: boolean } | undefined`** — drives the "reject the next `.next()` once with `signal.reason`, then `done:true` forever" contract. Set by external abort, pre-aborted signal, or first fail-fast error. +- **`capturedErrors[]`** — accumulates errors in `'fail-eventually'` mode; on drain, throws the single error directly (identity-preserved) or wraps in `AggregateError` for ≥2. +- **`isDone`** — set once by `markAsEnded()` to make all close paths idempotent. + +### Two pull/dispatch loops + +`fillQueue()` is the **producer**: pulls from source up to `bufferSize`, dispatches via `callback(item, {signal})`, pushes the wrapped promise into `bufferedPromises`. In `ordered: true` mode it always feeds from `subIterators[0]`; in `ordered: false` it picks the least-targeted iterator via `findLeastTargeted` to prevent starvation. + +`nextValue()` is the **consumer**: in a single flat `Promise.race`, races `bufferedPromises[0]` (ordered) or all of `bufferedPromises` (unordered) against a **fresh per-pull park promise**. The park is a `{ resolve }` deferred created each pull, stored in the mutable `currentPark`, and cleared once the race settles. The single construction-time `{ once: true }` listener on `internalAbortController.signal` resolves whatever park is currently waiting. It is deliberately *not* a single long-lived promise: racing the same never-settling promise every pull leaves a `PromiseReaction` on it per item (the nodejs/node#51452 retention pattern). Abort always wins over a buffered value resolving in the same tick — the post-race code re-checks `abortReason` regardless of which entry won the race. + +`markAsEnded()` is the **single cleanup path**: sets `isDone`, fires `internalAbortController.abort()`, calls `Promise.allSettled(...iterators.map(it => it.return()))`, clears buffers. Called from `return()`, `throw()`, `Symbol.asyncDispose`, source-exhaustion, and abort delivery. Idempotent via the `isDone` guard. + +### Iterator chaining via `currentStep` + +`next()` chains each call's promise via `currentStep.then(nextValue, nextValue)` (both fulfilled and rejected handlers are `nextValue`) so that one rejection doesn't poison every subsequent call — the next call still re-enters `nextValue`, which then observes the post-rejection state machine (most often returning `{done:true}`). + +### Lib helpers (reuse these, don't reimplement) + +- `lib/find-least-targeted.js` — load-balancing: given a list of iterators and the current buffer, picks the iterator with fewest in-flight slots. +- `lib/misc.js` — `makeIterableAsync(input)` (sync iterable → async iterable), `arrayDeleteInPlace(list, value)` (in-place splice by value), and `normalizeError(err, defaultMessage)` (coerce non-`Error` rejections at every catch site — reuse this rather than open-coding `err instanceof Error ? err : new Error(...)`). +- `lib/type-checks.js` — `isObject` (truthy and `typeof === 'object'`; closes the `typeof null === 'object'` hole), plus `isAsyncIterable`, `isIterable`, `isPartOfArray` guards built on it. + +### Public-API contracts worth preserving + +- Callback receives `(item, { signal })` where `signal` is **always present** (the internal one) even when no `options.signal` is provided. +- Aborts cancel **consumption**, not in-flight callback work. Promises cannot be cancelled — the library propagates the signal so user code can voluntarily exit; it does not race-and-discard. The README documents this explicitly. +- `errors: 'fail-eventually'` (default) keeps the historical "drain then throw" semantics; `'fail-fast'` mirrors `Promise.all`. External abort always wins over queued/captured errors. +- Existing one-arg callbacks (`async (item) => …`) keep working — JS ignores extra args, so the second-arg widening is non-breaking. + +## Implementation invariants worth preserving + +- **Zero runtime dependencies.** `package.json` has no `dependencies` block; keep it that way unless a new feature genuinely cannot be implemented without one. The harden branch's `@voxpelli/typed-utils` import was reverted for this reason. +- **One abort listener per call; never race a long-lived promise.** The only `addEventListener` against `internalAbortController.signal` or `externalSignal` is the construction-time linkage. Do not add per-pull listeners and do not reintroduce a single long-lived "abort promise" into `nextValue`'s `Promise.race` — racing the same never-settling promise every pull retains a `PromiseReaction` per item (nodejs/node#51452). The per-pull park is collectable; keep it that way. `test/memory.spec.js` guards this. +- **`internalAbortController` is unconditional — do not lazify.** It is minted on every call regardless of whether `options.signal` or `errors: 'fail-fast'` are used. `iterator.return()` deliberately bypasses the `currentStep` chain (so it can run concurrently with a parked `next()`) and fires `markAsEnded()` → `internalAbortController.abort()`; that abort is what wakes the parked `nextValue()` by resolving its per-pull park. The per-callback signal tests (`test/per-task-signal.spec.js`) and the parallel-return+abort test (`test/abort.spec.js`) pin this in the no-options case, and the README / CLAUDE.md promise the per-callback `{signal}` is always present. + +## Style notes + +- Helpers and exports use American spelling (`normalizeError`, `lib/misc.js`); local variables follow the helper they wrap (e.g. `normalizedErr`). + +## Test conventions + +Mocha + chai + sinon. Tests use `sinon.useFakeTimers()` plus `clock.runAllAsync()` / `clock.tickAsync(ms)` for deterministic timing. The standard pattern for an async flow that needs the clock to advance is: + +```js +const flow = (async () => { for await (...) { ... } })(); +await clock.runAllAsync(); +await flow; +``` + +Inline `for await` blocks **without** the IIFE wrapper will deadlock under fake timers when the source uses real `setTimeout`. Test helpers in `test/utils.js` (`yieldValuesOverTime`, `nestedYieldValuesOverTime`, `promisableTimeout`) are the source of truth — reuse them. + +For testing rejections, prefer the `.catch(err => ({ rejectedWith: err }))` envelope pattern (used across `test/abort.spec.js` and `test/errors-fail-fast.spec.js`) over chai-as-promised's `should.be.rejectedWith` when asserting identity-equality on non-Error reasons. + +## Benchmarks + +`npm run bench` runs the [mitata](https://github.com/evanwashere/mitata) suite (`node --expose-gc --allow-natives-syntax`). `benchmark/fixtures.js` holds the shared helpers; three theme files register benches on import and `benchmark/index.js` is the only one that calls `run()`: + +- `benchmark/throughput.js` — overhead vs raw `for await`, `bufferSize` scaling, ordered vs unordered dispatch, input shape (async generator / sync iterable / array). +- `benchmark/abort.js` — always-on abort-wiring cost, plus abort & error *delivery* (pre-aborted signal, mid-stream external abort, `fail-fast` triggering, `fail-eventually` `AggregateError` aggregation). +- `benchmark/nested.js` — nested sub-iterators, `mergeIterables` wrapper overhead. + +`npm run bench:json` emits JSON — capture it before and after a change for a local diff (no baseline is committed; the numbers are machine-specific). `node benchmark/index.js ` passes a filter to `run()`, matched against **bench names** (so `… abort` runs every bench with "abort" in its name, not a whole group). `run({ throw: true })` makes a broken bench fail the process loudly. + +The non-negotiable rule, enforced by `benchmark/fixtures.js`: **benchmark fixtures never use timers.** `asyncRange` / `syncRange` yield with no artificial delay, so the numbers reflect the library's per-item bookkeeping overhead and not simulated I/O. A `setTimeout`-based source would make every benchmark measure the event loop instead. `do_not_optimize` wraps the drained result so the JIT cannot eliminate the loop; mitata handles warmup and flags dead-code-eliminated results with a `!`. The allocation-heavy groups (overhead, `bufferSize`, input shape, nested) use `.gc('inner')` — GC before each iteration — to remove cross-iteration GC noise; this makes those benches slower but their distribution tighter. The abort & error *delivery* benches are deliberately left on the default `.gc('once')`: they are composite metrics (construct + partial consume + teardown) and run noisier than the steady-state throughput benches — read them for the *shape* of the teardown cost, not a precise number. Re-run before/after any change to `fillQueue`, `nextValue`, or the abort wiring. diff --git a/README.md b/README.md index b9d29e7..e932dec 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,8 @@ Buffered parallel processing of async iterables / generators. +**Requirements**: Node.js ≥22.0.0 (native `Symbol.asyncDispose` is required). + [![npm version](https://img.shields.io/npm/v/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable) [![npm downloads](https://img.shields.io/npm/dm/buffered-async-iterable.svg?style=flat)](https://www.npmjs.com/package/buffered-async-iterable) [![Module type: ESM](https://img.shields.io/badge/module%20type-esm-brightgreen)](https://github.com/voxpelli/badges-cjs-esm) @@ -75,33 +77,133 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie #### Syntax -`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator` +`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false, signal, errors='fail-eventually' }]) => AsyncIterableIterator` #### Arguments -* `input` – either an async iterable, an ordinare iterable or an array -* `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer) +* `input` – either an async iterable, an ordinary iterable or an array +* `callback(item, { signal })` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer). The second argument is an `{ signal: AbortSignal }` that aborts on cancellation — see [Cancellation](#cancellation). #### Options -* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. -* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered +* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultaneous items processed at once in the buffer. +* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered. +* `signal` – _optional_ – an `AbortSignal`. When aborted, iteration stops pulling from the source, the next pending or freshly-called `iterator.next()` rejects with `signal.reason` exactly once, and all subsequent calls return `{ done: true, value: undefined }`. See [Cancellation](#cancellation). +* `errors` – _optional_ – defaults to `'fail-eventually'`. Controls how errors from the callback or the source surface to the consumer. See [Errors](#errors). + +The returned iterator also implements `Symbol.asyncDispose`, so it can be used with `await using` for deterministic cleanup. See [Resource management](#resource-management). + +## Cancellation + +Pass an `AbortSignal` and abort it whenever you want to stop iteration: + +```javascript +import { bufferedAsyncMap } from 'buffered-async-iterable'; + +const ac = new AbortController(); +setTimeout(() => ac.abort(new Error('took too long')), 5000); + +try { + for await (const item of bufferedAsyncMap(source, async (item) => { + return await fetchItem(item); + }, { signal: ac.signal })) { + console.log(item); + } +} catch (err) { + // err === ac.signal.reason +} +``` + +Aborting cancels *consumption* of the source. In-flight callbacks continue running until they settle. To cancel network/IO inside your callback, forward the per-callback `signal` (the second argument) into `fetch`/`undici`/etc: + +```javascript +bufferedAsyncMap(source, async (item, { signal }) => { + const res = await fetch(`/items/${item}`, { signal }); + return res.json(); +}, { signal: ac.signal }); +``` + +The per-callback `signal` is always present (even when no `options.signal` is passed) and aborts on iterator close (return / throw / dispose / source-exhaustion-with-cleanup), so callbacks can fast-path on shutdown. Callbacks observe `signal.aborted === true` within one microtask of iterator close — they continue running (Promises are not cancellable) until they reach the next `await` of something signal-aware (`fetch`, `undici`, etc.) or until they voluntarily exit via a check on `signal.aborted`. + +If `options.signal` is already aborted at construction time, the source is never read and the first `iterator.next()` rejects with `signal.reason`. External abort always wins over queued errors. + +## Errors + +There are two error modes: + +### `'fail-eventually'` (default) + +Iteration continues after errors. Captured errors are thrown when the iterator drains: + +* If exactly one error was captured, it is thrown directly (identity preserved). +* If two or more errors were captured, they are wrapped in an [`AggregateError`](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/AggregateError) (in capture order). + +In-flight callbacks may still complete in the background after an error is captured. Wrap your callback in `try/catch` if you need per-item isolation. + +### `'fail-fast'` + +Mirrors `Promise.all` semantics: the first error from the callback or the source short-circuits iteration. The next `iterator.next()` rejects with the original error (no `AggregateError` wrapping); subsequent calls return `{ done: true }`. The source's `.next()` is not called again, the source's `.return()` is called once, and in-flight callbacks observe `signal.aborted === true` on their per-callback signal within one microtask. + +```javascript +for await (const item of bufferedAsyncMap(source, fn, { errors: 'fail-fast' })) { + // first thrown error halts iteration immediately +} +``` + +External abort always takes precedence over either error mode: if `options.signal` aborts while errors are queued, the consumer sees `signal.reason`, not the captured errors. + +## Resource management + +The returned iterator implements `Symbol.asyncDispose`, so it can be used with [`await using`](https://github.com/tc39/proposal-explicit-resource-management) for deterministic cleanup: + +```javascript +{ + await using iterator = bufferedAsyncMap(source, fn); + for await (const item of iterator) { + if (shouldStop(item)) break; + } +} // source.return() runs here, regardless of how the block exited +``` + +`Symbol.asyncDispose` is equivalent to calling `iterator.return()` for cleanup and is idempotent. Native `await using` requires Node 22+ (or a transpiler). ### mergeIterables() -Merges all given (async) iterables in parallel, returning the values as they resolve +Merges all given (async) iterables in parallel, returning the values as they resolve. Thin wrapper over [`bufferedAsyncMap`](#bufferedasyncmap) — see that section for the full semantics of each option. #### Syntax -`mergeIterables(input[, { bufferSize=6 }]) => AsyncIterableIterator` +`mergeIterables(input[, { bufferSize=6, ordered=false, signal, errors='fail-eventually' }]) => AsyncIterableIterator` #### Arguments -* `input` – an array of async iterables, ordinare iterables and/or arrays +* `input` – an array of async iterables, ordinary iterables and/or arrays #### Options -* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. +* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultaneous items processed at once in the buffer. +* `ordered` – _optional_ – defaults to `false`. When `false` (the default), values are interleaved as they resolve; when `true`, the merge preserves the input array order (drains the first iterable before pulling from the second, etc.). +* `signal` – _optional_ – an `AbortSignal`. Aborts the merge. See [Cancellation](#cancellation). +* `errors` – _optional_ – defaults to `'fail-eventually'`. See [Errors](#errors). + +## Performance + +`npm run bench` runs a [mitata](https://github.com/evanwashere/mitata) suite covering the main design decisions. The findings: + +* **There is a per-item buffering tax.** Routing values through `bufferedAsyncMap` still costs more than a bare `for await` loop — roughly **20–25×** on synchronous-ish work. The library pays for itself when the callback is genuinely async / IO-bound and benefits from prefetching up to `bufferSize` items in parallel — for trivial synchronous transforms, a plain loop wins. +* **`bufferSize` is a throughput/overhead trade-off.** Larger buffers keep more work in flight but cost more per pull (the internal `Promise.race` grows with the buffer). The default of `6` is a reasonable midpoint. +* **The optional machinery is effectively free.** Passing `options.signal`, choosing an `errors` mode, feeding a sync iterable or array instead of an async generator, and using `mergeIterables` instead of a direct call all measure within a few percent of the base case. + +### Changes vs. earlier 2.0.0 pre-release builds + +Two optimisations during the 2.0.0 cycle, each guarded by the benchmark suite: + +* **Skip the load-balancer when there are no sub-iterators.** `fillQueue` no longer runs the `findLeastTargeted` load-balancer (a `Map` allocation + a per-item scan) on the common path where the callback returns plain values — it only runs once a nested async-generator callback actually creates a sub-iterator. ~10–15% faster throughput. +* **Per-pull abort "park" instead of a long-lived race promise.** `nextValue` previously raced every pull against a single abort promise that never settled until the iterator closed — which left a `Promise` reaction record per item ([nodejs/node#51452](https://github.com/nodejs/node/issues/51452)), a real memory-retention issue on long-lived/unbounded streams. It now races a fresh, collectable per-pull "park" instead. This both removes the retention (≈0 vs ≈530 bytes/item on an unbounded stream — see `test/memory.spec.js`) and, by keeping the live-object set small during iteration, cuts GC pressure enough for a further ~20–40% throughput gain. + +Net: throughput is **~25–45% faster** than the first 2.0.0 pre-release across the buffered-map, ordered/unordered, `bufferSize`, input-shape and `mergeIterables` benchmarks, with no regression in the abort/error paths and the long-stream memory retention eliminated. + +These ratios are *indicative of the shape of the cost* — measured on the maintainer's machine, not a benchmark report. `npm run bench` reproduces them locally; `npm run bench:json` captures a JSON snapshot for before/after diffing. See `CLAUDE.md` for the methodology. ## Similar modules diff --git a/benchmark/abort.js b/benchmark/abort.js new file mode 100644 index 0000000..24de8cf --- /dev/null +++ b/benchmark/abort.js @@ -0,0 +1,77 @@ +// Abort & error-handling benchmarks. Registers benches on import — the +// run() call lives in benchmark/index.js. + +import { + bench, do_not_optimize as doNotOptimize, group, summary, +} from 'mitata'; + +import { bufferedAsyncMap } from '../index.js'; +import { + asyncRange, drain, identity, rejectingCallback, +} from './fixtures.js'; + +// 1. The always-on abort wiring. internalAbortController is minted on every +// call, the per-callback `{ signal }` object is allocated per dispatch, and +// nextValue races a shared abortPromise. These three benches must stay +// within noise of each other — that proves passing `options.signal` / +// `errors: 'fail-fast'` does not add a hot-path cost over the no-options +// case, and that the shared abortPromise did not regress per-pull overhead. +group('abort wiring: always-on cost', () => { + const count = 5000; + + summary(() => { + bench('no options', async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity))); + }); + bench('options.signal (never aborted)', async () => { + const controller = new AbortController(); + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity, { signal: controller.signal }))); + }); + bench("errors: 'fail-fast' (happy path)", async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity, { errors: 'fail-fast' }))); + }); + }); +}); + +// 2. Abort & error *delivery* — the teardown paths the happy-path group above +// never exercises. These are composite metrics (construct + partial consume +// + teardown) and run noisier than the steady-state throughput benches; the +// point is to catch a regression in the *shape* of the teardown cost, not a +// precise per-item number. +group('abort & error delivery', () => { + const count = 5000; + + summary(() => { + // Pre-aborted signal: the source is never read, the first .next() rejects + // with signal.reason. The construct + immediate-reject fast path. + bench('pre-aborted signal', async () => { + const iterator = bufferedAsyncMap(asyncRange(count), identity, { signal: AbortSignal.abort() }); + await iterator.next().catch(doNotOptimize); + }); + + // Mid-stream external abort: pull a few items, then abort and observe the + // rejecting .next(). Measures abort delivery + markAsEnded teardown. + bench('mid-stream external abort', async () => { + const controller = new AbortController(); + const iterator = bufferedAsyncMap(asyncRange(count), identity, { signal: controller.signal }); + for (let i = 0; i < 10; i++) { + doNotOptimize(await iterator.next()); + } + controller.abort(); + await iterator.next().catch(doNotOptimize); + }); + + // fail-fast: the first settled callback error short-circuits iteration. + bench("errors: 'fail-fast' triggering", async () => { + await drain(bufferedAsyncMap(asyncRange(count), rejectingCallback, { errors: 'fail-fast' })) + .catch(doNotOptimize); + }); + + // fail-eventually: every callback rejects, so every error lands in + // capturedErrors[] and an AggregateError is built on drain. + bench("errors: 'fail-eventually' aggregation", async () => { + await drain(bufferedAsyncMap(asyncRange(count), rejectingCallback, { errors: 'fail-eventually' })) + .catch(doNotOptimize); + }); + }); +}); diff --git a/benchmark/fixtures.js b/benchmark/fixtures.js new file mode 100644 index 0000000..87f6001 --- /dev/null +++ b/benchmark/fixtures.js @@ -0,0 +1,82 @@ +// Shared fixtures for the mitata benchmark suite. +// +// IMPORTANT: benchmarks here measure *library overhead* — the bookkeeping cost +// of bufferedAsyncMap / mergeIterables per item — not simulated I/O latency. +// Fixtures therefore never use timers; every value resolves on the microtask +// queue. A `setTimeout`-based source would make every benchmark measure the +// event loop instead of the library. + +/** + * Async iterable yielding integers `0 .. count - 1` with no artificial delay. + * + * @param {number} count + * @returns {AsyncGenerator} + */ +export async function * asyncRange (count) { + for (let i = 0; i < count; i++) { + yield i; + } +} + +/** + * Sync iterable yielding integers `0 .. count - 1`. Exercises the + * `isIterable` → `makeIterableAsync` branch — no async-generator fixture + * reaches it. + * + * @param {number} count + * @returns {Generator} + */ +export function * syncRange (count) { + for (let i = 0; i < count; i++) { + yield i; + } +} + +/** + * Minimal-work async callback — isolates the library's per-item dispatch + * overhead from the callback's own cost. + * + * @param {number} item + * @returns {Promise} + */ +export const identity = async (item) => item; + +/** + * Callback that always rejects — drives the `fail-fast` short-circuit and the + * `fail-eventually` `AggregateError` accumulation paths. + * + * @param {number} item + * @returns {Promise} + */ +export const rejectingCallback = async (item) => { + throw new Error('benchmark rejection ' + item); +}; + +/** + * Async-generator callback: each input item fans out into 4 values. Exercises + * the sub-iterator path (subIterators stack + findLeastTargeted load + * balancing). + * + * @param {number} item + * @returns {AsyncGenerator} + */ +export async function * fanOut (item) { + for (let i = 0; i < 4; i++) { + yield item * 4 + i; + } +} + +/** + * Drains an async iterable and returns the last value seen, so callers can + * feed it to `do_not_optimize` and keep the JIT from eliminating the loop. + * + * @param {AsyncIterable} iterable + * @returns {Promise} + */ +export async function drain (iterable) { + let last; + for await (const value of iterable) { + last = value; + } + return last; +} diff --git a/benchmark/index.js b/benchmark/index.js new file mode 100644 index 0000000..6c21cff --- /dev/null +++ b/benchmark/index.js @@ -0,0 +1,30 @@ +// Entry point for the mitata benchmark suite. +// +// npm run bench — run everything +// npm run bench:json — emit JSON (for capture/diff against a +// local baseline; no baseline is committed) +// node benchmark/index.js abort — run only benches whose name matches /abort/ +// +// The suite measures *library overhead* — the per-item bookkeeping cost of +// bufferedAsyncMap / mergeIterables — not simulated I/O. Theme files register +// their benches on import; this file is the only one that calls run(). See +// CLAUDE.md "Benchmarks" for the methodology and the per-group rationale. + +import { run } from 'mitata'; + +import './throughput.js'; +import './abort.js'; +import './nested.js'; + +const args = process.argv.slice(2); +const json = args.includes('--json'); +const filter = args.find((arg) => !arg.startsWith('--')); + +await run({ + // Fail loudly if a bench throws instead of silently degrading the numbers. + 'throw': true, + ...(json ? { format: 'json' } : {}), + // The filter is a maintainer-supplied CLI arg matched against bench names. + // eslint-disable-next-line security/detect-non-literal-regexp + ...(filter ? { filter: new RegExp(filter) } : {}), +}); diff --git a/benchmark/nested.js b/benchmark/nested.js new file mode 100644 index 0000000..7362506 --- /dev/null +++ b/benchmark/nested.js @@ -0,0 +1,50 @@ +// Nested sub-iterator & mergeIterables benchmarks. Registers benches on +// import — the run() call lives in benchmark/index.js. + +import { + bench, do_not_optimize as doNotOptimize, group, summary, +} from 'mitata'; + +import { bufferedAsyncMap, mergeIterables } from '../index.js'; +import { + asyncRange, drain, fanOut, identity, +} from './fixtures.js'; + +// 1. Nested sub-iterators. When the callback returns an AsyncIterable, each +// result is unshifted onto the subIterators stack and drained through the +// same buffer. Guards the sub-iterator machinery and the promisesToSource +// WeakMap bookkeeping. .gc('inner') removes GC noise from the extra +// per-sub-iterator allocation. +group('nested sub-iterators (async-generator callback)', () => { + for (const count of [250, 2500]) { + summary(() => { + // Flat baseline producing the same number of output values (count * 4). + bench(`flat callback • ${count * 4} values`, async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count * 4), identity))); + }).gc('inner'); + bench(`fan-out callback • ${count} × 4`, async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), fanOut))); + }).gc('inner'); + }); + } +}); + +// 2. mergeIterables wrapper overhead. mergeIterables is a thin wrapper over +// bufferedAsyncMap with a `yield *` callback. This proves the wrapper adds +// no measurable cost over calling bufferedAsyncMap with the equivalent +// callback directly. +group('mergeIterables wrapper overhead', () => { + const count = 2500; + const sources = () => [asyncRange(count), asyncRange(count), asyncRange(count), asyncRange(count)]; + + summary(() => { + bench('mergeIterables([...])', async () => { + doNotOptimize(await drain(mergeIterables(sources()))); + }).gc('inner'); + bench('bufferedAsyncMap([...], yield *)', async () => { + doNotOptimize(await drain(bufferedAsyncMap(sources(), async function * (iterable) { + yield * iterable; + }))); + }).gc('inner'); + }); +}); diff --git a/benchmark/throughput.js b/benchmark/throughput.js new file mode 100644 index 0000000..7349605 --- /dev/null +++ b/benchmark/throughput.js @@ -0,0 +1,82 @@ +// Throughput & overhead benchmarks. Registers benches on import — the +// run() call lives in benchmark/index.js. + +import { + bench, do_not_optimize as doNotOptimize, group, summary, +} from 'mitata'; + +import { bufferedAsyncMap } from '../index.js'; +import { + asyncRange, drain, identity, syncRange, +} from './fixtures.js'; + +// 1. The per-item "tax": how much does routing an async iterable through +// bufferedAsyncMap cost versus a plain `for await` over the source? This is +// the headline regression guard — if the library gets slower per item, it +// shows up here first. .gc('inner') removes cross-iteration GC noise from +// the heavy per-item allocation (promise envelopes, { signal } objects). +group('overhead: bufferedAsyncMap vs raw for-await', () => { + for (const count of [100, 1000, 10000]) { + summary(() => { + bench(`raw for-await • ${count}`, async () => { + doNotOptimize(await drain(asyncRange(count))); + }).gc('inner'); + bench(`bufferedAsyncMap • ${count}`, async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity))); + }).gc('inner'); + }); + } +}); + +// 2. bufferSize scaling. Larger buffers mean more in-flight promises and a +// bigger Promise.race each pull — this shows the throughput/parallelism +// trade-off curve and guards against a pathological cost at large buffers. +group('bufferSize scaling', () => { + const count = 10000; + + summary(() => { + for (const bufferSize of [1, 4, 16, 64]) { + bench(`bufferSize: ${bufferSize}`, async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity, { bufferSize }))); + }).gc('inner'); + } + }); +}); + +// 3. The two dispatch loops. ordered:false picks the least-targeted iterator +// via findLeastTargeted; ordered:true feeds from subIterators[0] and splices +// new buffer slots into place. Guards both loops and the ordered-insertion +// splice. +group('dispatch loop: ordered vs unordered', () => { + for (const count of [1000, 10000]) { + summary(() => { + bench(`ordered: false • ${count}`, async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity, { ordered: false }))); + }).gc('inner'); + bench(`ordered: true • ${count}`, async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity, { ordered: true }))); + }).gc('inner'); + }); + } +}); + +// 4. Input shape. A sync iterable or a plain array is wrapped by +// makeIterableAsync before iteration — a distinct code path from feeding an +// async generator. Guards that wrapping cost stays in line with the native +// async-iterable case. +group('input shape: async generator vs sync iterable vs array', () => { + const count = 10000; + const fixedArray = [...syncRange(count)]; + + summary(() => { + bench('async generator', async () => { + doNotOptimize(await drain(bufferedAsyncMap(asyncRange(count), identity))); + }).gc('inner'); + bench('sync iterable', async () => { + doNotOptimize(await drain(bufferedAsyncMap(syncRange(count), identity))); + }).gc('inner'); + bench('array', async () => { + doNotOptimize(await drain(bufferedAsyncMap(fixedArray, identity))); + }).gc('inner'); + }); +}); diff --git a/index.js b/index.js index 6f1a2fd..3d7a7e3 100644 --- a/index.js +++ b/index.js @@ -1,14 +1,10 @@ /* eslint-disable promise/prefer-await-to-then */ -// TODO: Get inspired by Matteos https://github.com/mcollina/hwp/blob/main/index.js, eg AbortController is nice? -// TODO: Check docs here https://tc39.es/ecma262/#sec-operations-on-iterator-objects -// TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose -// TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values -// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js - import { findLeastTargeted } from './lib/find-least-targeted.js'; -import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js'; -import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js'; +import { arrayDeleteInPlace, makeIterableAsync, normalizeError } from './lib/misc.js'; +import { + isAsyncIterable, isIterable, isObject, isPartOfArray, +} from './lib/type-checks.js'; /** * @template T @@ -20,28 +16,42 @@ async function * yieldIterable (item) { } /** + * Merge several async (or sync) iterables in parallel. Items are + * yielded as they become available. Thin wrapper over + * `bufferedAsyncMap` — see that function for the full semantics of + * each option. + * * @template T * @param {Array | Iterable | T[]>} input - * @param {{ bufferSize?: number|undefined }} [options] + * @param {{ bufferSize?: number|undefined, errors?: 'fail-eventually'|'fail-fast'|undefined, ordered?: boolean|undefined, signal?: AbortSignal|undefined }} [options] * @returns {AsyncIterable} */ -export async function * mergeIterables (input, { bufferSize } = {}) { - yield * bufferedAsyncMap(input, yieldIterable, { bufferSize }); +export async function * mergeIterables (input, { bufferSize, errors, ordered, signal } = {}) { + yield * bufferedAsyncMap(input, yieldIterable, { bufferSize, errors, ordered, signal }); } /** + * Iterates `input` concurrently, applying `callback` to each item with up to + * `bufferSize` calls in flight. The per-callback `signal` is **always** + * present (even when no `options.signal` is passed) and aborts on iterator + * close — `return()`, `throw()`, `Symbol.asyncDispose`, source exhaustion, + * external abort, or first error in `errors: 'fail-fast'` mode — so callbacks + * can fast-path on shutdown. + * * @template T * @template R * @param {AsyncIterable | Iterable | T[]} input - * @param {(item: T) => (Promise|AsyncIterable)} callback - * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options] - * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]> }} + * @param {(item: T, opts: { signal: AbortSignal }) => (Promise|AsyncIterable)} callback + * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined, signal?: AbortSignal|undefined, errors?: 'fail-eventually'|'fail-fast'|undefined }} [options] + * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ export function bufferedAsyncMap (input, callback, options) { /** @typedef {Promise> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */ const { bufferSize = 6, + errors: errorsMode = 'fail-eventually', ordered = false, + signal: externalSignal, } = options || {}; /** @type {AsyncIterable} */ @@ -53,6 +63,8 @@ export function bufferedAsyncMap (input, callback, options) { if (!isAsyncIterable(asyncIterable)) throw new TypeError('Expected asyncIterable to have a Symbol.asyncIterator function'); if (typeof callback !== 'function') throw new TypeError('Expected callback to be a function'); if (typeof bufferSize !== 'number') throw new TypeError('Expected bufferSize to be a number'); + if (externalSignal !== undefined && !(externalSignal instanceof AbortSignal)) throw new TypeError('Expected signal to be an AbortSignal'); + if (errorsMode !== 'fail-eventually' && errorsMode !== 'fail-fast') throw new TypeError("Expected errors to be 'fail-eventually' or 'fail-fast'"); /** @type {AsyncIterator} */ const asyncIterator = asyncIterable[Symbol.asyncIterator](); @@ -72,18 +84,92 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {boolean} */ let isDone; - /** @type {Error|undefined} */ - let hasError; + /** @type {Error[]} */ + const capturedErrors = []; + + // Internal controller, minted unconditionally regardless of whether + // options.signal or errors:'fail-fast' are used. The per-callback `signal` + // contract (README "Cancellation" + CLAUDE.md) requires it: a `for await + // … break` desugars to iterator.return() → markAsEnded() → + // internalAbortController.abort(), which is what wakes a parked + // nextValue() and lets in-flight callbacks observe signal.aborted=true. + // Don't try to lazify this — the per-callback signal tests in + // test/per-task-signal.spec.js (and test/abort.spec.js for the parallel + // return()+abort case) pin the no-options case. + const internalAbortController = new AbortController(); + + /** @type {{ reason: unknown, delivered: boolean } | undefined} */ + let abortReason; + + if (externalSignal) { + if (externalSignal.aborted) { + abortReason = { reason: externalSignal.reason, delivered: false }; + internalAbortController.abort(externalSignal.reason); + } else { + externalSignal.addEventListener('abort', () => { + // If the iterator already closed via return()/throw()/dispose, abort is too late: no-op. + if (isDone) return; + if (!abortReason) { + abortReason = { reason: externalSignal.reason, delivered: false }; + } + if (!internalAbortController.signal.aborted) { + internalAbortController.abort(externalSignal.reason); + } + }, { once: true }); + } + } + + // Sentinel value distinguishing "abort fired" from any buffered promise's + // resolution in nextValue()'s Promise.race. + const ABORT_SENTINEL = Symbol('abort'); + + // Per-pull "park": nextValue() creates a fresh deferred each pull and races + // it alongside the buffered promises, so an abort can wake a parked pull + // even when no buffered promise will ever settle. It is deliberately NOT a + // single long-lived promise — racing the same never-settling promise on + // every pull leaves a PromiseReaction on it per item, which accumulates + // until it finally settles at iterator close (the documented + // nodejs/node#51452 retention pattern). Per-pull parks are collectable, so + // nothing accumulates. The single construction-time listener below resolves + // whichever park is currently waiting (read at fire time via the mutable + // `currentPark`); internalAbortController aborts at most once, so one + // listener suffices. If the signal is already aborted (pre-aborted external + // signal) the listener never fires — fine, because handleAbortIfPending() + // short-circuits nextValue() via `abortReason` before it reaches the race. + /** @type {{ resolve: (value: typeof ABORT_SENTINEL) => void } | undefined} */ + let currentPark; + + internalAbortController.signal.addEventListener( + 'abort', + () => currentPark?.resolve(ABORT_SENTINEL), + { once: true } + ); /** + * Single cleanup path. Idempotent via `isDone`. Called from `return()`, + * `throw()`, `Symbol.asyncDispose`, source exhaustion, and abort delivery. + * Always fires `internalAbortController.abort()` — this is what wakes a parked + * nextValue() and signals in-flight callbacks via the per-task signal. + * + * The cleanup body runs once; the resolved result still reflects *this* + * call's `value` (so `return(v)` is spec-correct even after the iterator + * has already closed). + * * @param {boolean} [throwAnyError] - * @returns {Promise>} + * @param {R} [value] + * @returns {Promise>} */ - const markAsEnded = async (throwAnyError) => { + const markAsEnded = async (throwAnyError, value) => { if (!isDone) { isDone = true; - // TODO: Errors from here, how to handle? allSettled() ensures they will be caught at least + if (!internalAbortController.signal.aborted) { + internalAbortController.abort(); + } + + // Source .return() rejections are intentionally swallowed: allSettled + // keeps cleanup going even if one source's return() rejects, so a broken + // cleanup can't mask the consumer-facing error or leave buffers uncleared. await Promise.allSettled( [ // Ensure the main iterators are completed @@ -93,26 +179,34 @@ export function bufferedAsyncMap (input, callback, options) { .map(item => item.return && item.return()) ); - // TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10 bufferedPromises.splice(0); subIterators.splice(0); - if (throwAnyError && hasError) { - throw hasError; + if (throwAnyError && capturedErrors.length > 0) { + throw capturedErrors.length === 1 + ? capturedErrors[0] + : new AggregateError(capturedErrors, 'Multiple errors in bufferedAsyncMap'); } } - return { done: true, value: undefined }; + return { done: true, value }; }; + // Producer: pulls from source up to bufferSize, dispatches via callback, + // pushes the wrapped promise into bufferedPromises. In `ordered: true` + // mode it always feeds from subIterators[0]; in `ordered: false` mode it + // picks the least-targeted iterator via findLeastTargeted to prevent + // starvation — but only once a sub-iterator actually exists, since with + // none there is nothing to balance and the main iterator is the only + // source. const fillQueue = () => { - if (hasError || isDone) return; + if (capturedErrors.length > 0 || isDone || abortReason) return; /** @type {AsyncIterator|undefined} */ let currentSubIterator; - if (ordered) { - currentSubIterator = subIterators[0]; + if (ordered || subIterators.length === 0) { + currentSubIterator = ordered ? subIterators[0] : undefined; } else { const iterator = findLeastTargeted( mainReturnedDone ? subIterators : [...subIterators, asyncIterator], @@ -127,11 +221,11 @@ export function bufferedAsyncMap (input, callback, options) { const bufferPromise = currentSubIterator ? Promise.resolve(currentSubIterator.next()) .catch(err => ({ - err: err instanceof Error ? err : new Error('Unknown subiterator error'), + err: normalizeError(err, 'Unknown subiterator error'), })) .then(async result => { - if (typeof result !== 'object') { - throw new TypeError('Expected an object value'); + if (!isObject(result)) { + throw new TypeError('Expected sub-iterator next() result to be an object'); } if ('err' in result || result.done) { arrayDeleteInPlace(subIterators, currentSubIterator); @@ -152,11 +246,11 @@ export function bufferedAsyncMap (input, callback, options) { }) : Promise.resolve(asyncIterator.next()) .catch(err => ({ - err: err instanceof Error ? err : new Error('Unknown iterator error'), + err: normalizeError(err, 'Unknown iterator error'), })) .then(async result => { - if (typeof result !== 'object') { - throw new TypeError('Expected an object value'); + if (!isObject(result)) { + throw new TypeError('Expected source iterator next() result to be an object'); } if ('err' in result || result.done) { mainReturnedDone = true; @@ -171,7 +265,7 @@ export function bufferedAsyncMap (input, callback, options) { } // eslint-disable-next-line promise/no-callback-in-promise - const callbackResult = callback(result.value); + const callbackResult = callback(result.value, { signal: internalAbortController.signal }); const isSubIterator = isAsyncIterable(callbackResult); /** @type {Awaited} */ @@ -189,7 +283,7 @@ export function bufferedAsyncMap (input, callback, options) { promiseValue = { bufferPromise, done: true, - err: err instanceof Error ? err : new Error('Unknown callback error'), + err: normalizeError(err, 'Unknown callback error'), value: undefined, }; } @@ -200,9 +294,14 @@ export function bufferedAsyncMap (input, callback, options) { promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator); if (ordered && currentSubIterator) { + // Insert after any buffer slots already produced by this sub-iterator so + // its values stay contiguous and in order. In practice consumption is + // 1-to-1 with production in ordered mode, so the buffer never holds a + // second slot from the same sub-iterator at insert time and the loop + // body below stays unentered — it is kept as a guard for that invariant. let i = 0; - while (promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) { + while (i < bufferedPromises.length && promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) { i += 1; } @@ -216,15 +315,110 @@ export function bufferedAsyncMap (input, callback, options) { } }; - /** @type {AsyncIterator["next"]} */ + /** + * Drives the "reject the next .next() once with abortReason.reason, then + * done:true forever" contract. + * + * Returns a descriptor rather than throwing directly so the caller can + * run cleanup (markAsEnded) before propagating the reason. Returns + * `undefined` when no abort is pending — caller continues normally. + * + * @returns {{ kind: 'throw', reason: unknown } | { kind: 'done' } | undefined} + */ + const handleAbortIfPending = () => { + if (abortReason && !abortReason.delivered) { + abortReason.delivered = true; + return { kind: 'throw', reason: abortReason.reason }; + } + if (abortReason && abortReason.delivered) { + return { kind: 'done' }; + } + // Implicit `undefined` return = "no abort pending, caller continues normally". + // Lint rejects an explicit `return undefined` / `return` here. + }; + + /** + * Routes a stream error — from the source, the callback, or a malformed + * sub-iterable — through the configured error mode. In `fail-fast` mode the + * first error short-circuits iteration via the abort machinery: this either + * throws the reason or returns the terminal `{ done: true }`. In + * `fail-eventually` mode it captures the error and returns `undefined`, so + * the caller keeps draining. + * + * @param {Error} normalizedErr + * @returns {Promise<{ done: true, value: undefined } | undefined>} + */ + const handleStreamError = async (normalizedErr) => { + // In fail-fast mode the first captured error short-circuits iteration: + // route it through the same abort machinery so the next .next() rejects + // with the original error and in-flight callbacks see signal.aborted=true. + if (errorsMode === 'fail-fast' && !abortReason) { + abortReason = { reason: normalizedErr, delivered: false }; + if (!internalAbortController.signal.aborted) { + internalAbortController.abort(normalizedErr); + } + await markAsEnded(); + if (abortReason && !abortReason.delivered) { + abortReason.delivered = true; + throw normalizedErr; + } + return { done: true, value: undefined }; + } + + // fail-eventually: capture and fall through — implicit `undefined` return + // tells the caller to keep draining. Lint rejects an explicit `return`. + capturedErrors.push(normalizedErr); + }; + + // Consumer: races buffered promises against a fresh per-pull park promise. + // Abort always wins over a buffered value that may have settled in the + // same tick — the post-race code re-checks abortReason regardless of + // which entry won the race. Typed as a plain zero-arg thunk (it never + // reads an argument) so it can be passed straight to currentStep.then(). + /** @type {() => Promise>} */ const nextValue = async () => { + { + const earlyAbort = handleAbortIfPending(); + if (earlyAbort) { + await markAsEnded(); + if (earlyAbort.kind === 'throw') throw earlyAbort.reason; + return { done: true, value: undefined }; + } + } + const nextBufferedPromise = bufferedPromises[0]; if (!nextBufferedPromise) return markAsEnded(true); if (isDone) return { done: true, value: undefined }; + // Fresh per-pull park: the executor runs synchronously, so currentPark is + // set before the race below. parkPromise is the last entry in the race so + // a buffered value resolving in the same tick still gets re-checked + // against abortReason afterwards. The park is cleared once the race + // settles, so the construction-time abort listener becomes a no-op + // between pulls. + /** @type {Promise} */ + const parkPromise = new Promise(resolve => { + currentPark = { resolve }; + }); + + const raced = await Promise.race( + ordered + ? [nextBufferedPromise, parkPromise] + : [...bufferedPromises, parkPromise] + ); + + currentPark = undefined; + + if (raced === ABORT_SENTINEL || abortReason) { + const handled = handleAbortIfPending(); + await markAsEnded(); + if (handled?.kind === 'throw') throw handled.reason; + return { done: true, value: undefined }; + } + /** @type {Awaited} */ - const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises)); + const resolvedPromise = raced; arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise); // Wait for some of the current promises to be finished @@ -236,14 +430,11 @@ export function bufferedAsyncMap (input, callback, options) { value, } = resolvedPromise; - // We are mandated by the spec to always do this return if the iterator is done - if (isDone) { - return { done: true, value: undefined }; - } else if (err || done) { - if (err && !hasError) { - hasError = err instanceof Error ? err : new Error('Unknown error'); - } - + // Refill if a sub-iterator is in play, then either close on drain or + // recurse for the next value. Shared by the error and the + // malformed-sub-iterable paths. + /** @returns {Promise>} */ + const drainOrContinue = () => { if (fromSubIterator || subIterators.length > 0) { fillQueue(); } @@ -251,37 +442,79 @@ export function bufferedAsyncMap (input, callback, options) { return bufferedPromises.length === 0 ? markAsEnded(true) : nextValue(); + }; + + // We are mandated by the spec to always do this return if the iterator is done + if (isDone) { + return { done: true, value: undefined }; + } else if (err || done) { + if (err) { + const handled = await handleStreamError(normalizeError(err, 'Unknown error')); + if (handled) return handled; + } + + return drainOrContinue(); } else if (isSubIterator && isAsyncIterable(value)) { - // TODO: Handle possible error here? Or too obscure? - subIterators.unshift(value[Symbol.asyncIterator]()); + /** @type {AsyncIterator} */ + let subIterator; + + try { + subIterator = value[Symbol.asyncIterator](); + } catch (subIterableErr) { + // The callback returned a malformed async iterable — the + // Symbol.asyncIterator property exists but invoking it threw. + // Surface it like any other stream error. + const handled = await handleStreamError(normalizeError(subIterableErr, 'Unknown sub-iterator error')); + if (handled) return handled; + return drainOrContinue(); + } + + subIterators.unshift(subIterator); fillQueue(); return nextValue(); } else { fillQueue(); - return /** @type {{ value: R }} */ ({ value }); + return /** @type {IteratorYieldResult} */ ({ value }); } }; /** @type {Promise>} */ let currentStep; - /** @type {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]> }} */ + /** @type {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ const resultAsyncIterableIterator = { async next () { - currentStep = currentStep ? currentStep.then(() => nextValue()) : nextValue(); + // Chain via then(nextValue, nextValue) so a rejection on one .next() does + // not poison every subsequent call — the next call still reaches + // nextValue() which observes the post-rejection state machine. + currentStep = currentStep + ? currentStep.then(nextValue, nextValue) + : nextValue(); return currentStep; }, - // TODO: Accept an argument, as in the spec. Look into what happens if one call return() multiple times + look into if the value provided to return is the one returned forever after - 'return': () => markAsEnded(), - // TODO: Add "throw", see reference in https://tc39.es/ecma262/ ? And https://twitter.com/matteocollina/status/1392056117128306691 + // return() deliberately bypasses the currentStep chain: it calls + // markAsEnded() directly (and thus internalAbortController.abort()) so a parked + // next() awaiting a buffered promise wakes up via its per-pull park. This is + // what lets `for await … break` work — break desugars to return() running + // concurrently with the in-flight next(). + // `value` is awaited (matching AsyncGenerator.prototype.return) so a + // thenable argument never lands in the IteratorResult's value field; + // cleanup runs once but each call's result reflects its own argument. + 'return': async (value) => markAsEnded(false, await value), + /** @type {NonNullable["throw"]>} */ 'throw': async (err) => { - // TODO: Should remember the throw? And return a rejected promise always? + // Spec-correct as-is: throw(err) rejects once, markAsEnded() closes the + // iterator, and subsequent .next() returns { done: true } — no need to + // "remember" the throw or reject forever. Pinned by test/throw.spec.js. await markAsEnded(); throw err; }, [Symbol.asyncIterator]: () => resultAsyncIterableIterator, + [Symbol.asyncDispose]: async () => { + await markAsEnded(); + }, }; fillQueue(); diff --git a/lib/misc.js b/lib/misc.js index 0f642ca..a43e425 100644 --- a/lib/misc.js +++ b/lib/misc.js @@ -22,3 +22,12 @@ export function arrayDeleteInPlace (list, value) { list.splice(index, 1); } } + +/** + * @param {unknown} err + * @param {string} defaultMessage + * @returns {Error} + */ +export function normalizeError (err, defaultMessage) { + return err instanceof Error ? err : new Error(defaultMessage); +} diff --git a/lib/type-checks.js b/lib/type-checks.js index 70909c3..74d15c2 100644 --- a/lib/type-checks.js +++ b/lib/type-checks.js @@ -1,14 +1,20 @@ +/** + * @param {unknown} value + * @returns {value is object} + */ +export const isObject = (value) => Boolean(value && typeof value === 'object'); + /** * @param {unknown} value * @returns {value is Iterable} */ -export const isIterable = (value) => Boolean(value && typeof value === 'object' && Symbol.iterator in value); +export const isIterable = (value) => isObject(value) && Symbol.iterator in value; /** * @param {unknown} value * @returns {value is AsyncIterable} */ -export const isAsyncIterable = (value) => Boolean(value && typeof value === 'object' && Symbol.asyncIterator in value); +export const isAsyncIterable = (value) => isObject(value) && Symbol.asyncIterator in value; /** * @template Values diff --git a/package.json b/package.json index 2909d87..b1d281a 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "author": "Pelle Wessman (http://kodfabrik.se/)", "license": "MIT", "engines": { - "node": ">=18.6.0" + "node": ">=22.0.0" }, "type": "module", "exports": "./index.js", @@ -29,6 +29,8 @@ "index.d.ts.map" ], "scripts": { + "bench": "node --expose-gc --allow-natives-syntax benchmark/index.js", + "bench:json": "node --expose-gc --allow-natives-syntax benchmark/index.js --json", "build": "run-s build:*", "build:0": "run-s clean", "build:1-declaration": "tsc -p declaration.tsconfig.json", @@ -37,7 +39,7 @@ "check:knip": "knip", "check:lint": "eslint --report-unused-disable-directives .", "check:tsc": "tsc", - "check:type-coverage": "type-coverage --detail --strict --at-least 95 --ignore-files 'test/*.spec.js'", + "check:type-coverage": "type-coverage --detail --strict --at-least 95 --ignore-files 'test/*.spec.js' --ignore-files 'benchmark/**/*.js'", "clean": "run-p clean:*", "clean:declarations": "rm -rf $(find . -maxdepth 2 -type f -name '*.d.ts*')", "prepare": "husky", @@ -49,9 +51,8 @@ "devDependencies": { "@types/chai": "^4.3.19", "@types/chai-as-promised": "^7.1.8", - "@types/chai-quantifiers": "^1.0.4", "@types/mocha": "^10.0.8", - "@types/node": "^18.19.50", + "@types/node": "^22.0.0", "@types/sinon": "^17.0.3", "@types/sinon-chai": "^3.2.12", "@voxpelli/eslint-config": "^25.0.2", @@ -64,6 +65,7 @@ "husky": "^9.1.6", "installed-check": "^9.3.0", "knip": "^5.30.2", + "mitata": "^1.0.34", "mocha": "^10.7.3", "npm-run-all2": "^6.2.2", "sinon": "^19.0.2", diff --git a/test/abort.spec.js b/test/abort.spec.js new file mode 100644 index 0000000..4427871 --- /dev/null +++ b/test/abort.spec.js @@ -0,0 +1,371 @@ +/* eslint-disable promise/prefer-await-to-then */ + +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + promisableTimeout, + yieldValuesOverTime, +} from './utils.js'; + +/** + * @param {number} delayBeforeFirstYield + * @returns {AsyncIterable} + */ +async function * slowSource (delayBeforeFirstYield) { + await promisableTimeout(delayBeforeFirstYield); + yield 0; + await promisableTimeout(delayBeforeFirstYield); + yield 1; + await promisableTimeout(delayBeforeFirstYield); + yield 2; +} + +chai.use(chaiAsPromised); +chai.use(sinonChai); +const should = chai.should(); + +describe('bufferedAsyncMap() options.signal', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + // --- Validation --- + + it('throws TypeError when signal is not an AbortSignal', () => { + should.Throw(() => { + bufferedAsyncMap( + yieldValuesOverTime(1, 100), + async (item) => item, + // @ts-expect-error + { signal: 'not-a-signal' } + ); + }, TypeError, 'Expected signal to be an AbortSignal'); + }); + + it('omitting signal behaves identically to commit 3 (no regression)', async () => { + /** @type {number[]} */ + const result = []; + const iterator = bufferedAsyncMap(yieldValuesOverTime(3, 100), async (item) => item); + + const flow = (async () => { + for await (const v of iterator) result.push(v); + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.members([0, 1, 2]); + }); + + // --- Pre-aborted signal --- + + it('pre-aborted signal: source.next never called, first .next() rejects with reason, subsequent return done', async () => { + const reason = new Error('Pre-aborted'); + const ac = new AbortController(); + ac.abort(reason); + + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + + chai.expect(await first).to.deep.equal({ rejectedWith: reason }); + nextSpy.should.not.have.been.called; + + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + }); + + it('return() after pre-abort resolves done without throwing', async () => { + const ac = new AbortController(); + ac.abort(new Error('Pre-aborted')); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item) => item, + { signal: ac.signal } + ); + + const ret = iterator.return(); + await clock.runAllAsync(); + await ret.should.eventually.deep.equal({ done: true, value: undefined }); + }); + + // --- Mid-iteration abort --- + + it('parked .next() rejects with signal.reason (identity preserved)', async () => { + const reason = { custom: 'reason-object' }; + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + slowSource(1000), + async (item) => item, + { signal: ac.signal } + ); + + const parkedNext = iterator.next().catch(err => ({ rejectedWith: err })); + + // Defer the abort to fire while the .next() is parked on the slow source. + setTimeout(() => ac.abort(reason), 10); + + await clock.runAllAsync(); + + chai.expect(await parkedNext).to.deep.equal({ rejectedWith: reason }); + }); + + it('fresh .next() after abort rejects with reason', async () => { + const reason = new Error('Aborted'); + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await next).to.deep.equal({ rejectedWith: reason }); + }); + + it('exactly one .next() rejects with reason; subsequent calls return done', async () => { + const reason = new Error('Once'); + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + /** @type {Array<{ rejected: boolean, value?: unknown }>} */ + const results = []; + + const sequence = (async () => { + for (let i = 0; i < 3; i += 1) { + try { + const r = await iterator.next(); + results.push({ rejected: false, value: r }); + } catch (err) { + results.push({ rejected: true, value: err }); + } + } + })(); + + await clock.runAllAsync(); + await sequence; + + chai.expect(results[0]).to.deep.equal({ rejected: true, value: reason }); + chai.expect(results[1]).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + chai.expect(results[2]).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + }); + + it('source.next not called after abort; source.return called once', async () => { + const ac = new AbortController(); + + const source = yieldValuesOverTime(20, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal, bufferSize: 2 } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + const callsBeforeAbort = nextSpy.callCount; + + ac.abort(new Error('stop')); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + const r = await next; + should.exist(r); + + // After the first rejecting .next() resolves, markAsEnded must have + // already run: source.return is called as part of the abort-delivery + // path, not deferred to the next consumer pull. + returnSpy.should.have.been.calledOnce; + + // Drain any further calls (should be none). + await iterator.next(); + await clock.runAllAsync(); + + nextSpy.callCount.should.equal(callsBeforeAbort); + returnSpy.should.have.been.calledOnce; + }); + + it('in-flight callbacks observe signal.aborted=true after external abort', async () => { + const ac = new AbortController(); + /** @type {AbortSignal | undefined} */ + let captured; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + captured = signal; + return item; + }, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + chai.expect(captured?.aborted).to.equal(false); + + ac.abort(new Error('stop')); + + const n = iterator.next().catch(err => err); + await clock.runAllAsync(); + await n; + + chai.expect(captured?.aborted).to.equal(true); + }); + + // --- Close races --- + + it('return() before abort makes subsequent .next() return done without throwing', async () => { + const ac = new AbortController(); + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + const returned = iterator.return(); + await clock.runAllAsync(); + await returned; + + ac.abort(new Error('late')); + + const final = iterator.next(); + await clock.runAllAsync(); + await final.should.eventually.deep.equal({ done: true, value: undefined }); + }); + + it('return() and abort fired together: cleanup runs once, no double-throw', async () => { + const ac = new AbortController(); + + // 200 items so the source is not naturally exhausted between consuming + // the first item and calling return() — that way markAsEnded actually + // has work to do on the source iterator. + const source = yieldValuesOverTime(200, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal, bufferSize: 2 } + ); + + const first = iterator.next(); + // Advance only enough for the first item to land, leaving the source live. + await clock.tickAsync(0); + await first; + + const ret = iterator.return(); + ac.abort(new Error('parallel')); + + await clock.runAllAsync(); + await ret; + + returnSpy.should.have.been.calledOnce; + }); + + it('throw(err) after abort delivered behaves like throw on a closed iterator', async () => { + const ac = new AbortController(); + const reason = new Error('aborted'); + const tossed = new Error('post-abort throw'); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const aborted = iterator.next().catch(err => err); + await clock.runAllAsync(); + await aborted; + + const tossedNext = iterator.throw(tossed).catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await tossedNext).to.deep.equal({ rejectedWith: tossed }); + }); + + // --- ordered:true coverage --- + + it('abort works in ordered:true mode', async () => { + const ac = new AbortController(); + const reason = new Error('ordered-abort'); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal, ordered: true } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await next).to.deep.equal({ rejectedWith: reason }); + + const after = iterator.next(); + await clock.runAllAsync(); + await after.should.eventually.deep.equal({ done: true, value: undefined }); + }); +}); diff --git a/test/dispose.spec.js b/test/dispose.spec.js new file mode 100644 index 0000000..0e70bf3 --- /dev/null +++ b/test/dispose.spec.js @@ -0,0 +1,113 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +chai.should(); + +describe('bufferedAsyncMap() Symbol.asyncDispose', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('should expose Symbol.asyncDispose as a function returning a Promise', async () => { + const iterator = bufferedAsyncMap(yieldValuesOverTime(3, 100), async (item) => item); + + chai.expect(iterator[Symbol.asyncDispose]).to.be.a('function'); + const result = iterator[Symbol.asyncDispose](); + chai.expect(result).to.be.a('promise'); + + await clock.runAllAsync(); + await result; + }); + + it('dispose should run the same cleanup as return() (source.return called)', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + await iterator.next(); + + const disposeResult = iterator[Symbol.asyncDispose](); + await clock.runAllAsync(); + await disposeResult.should.eventually.equal(undefined); + + returnSpy.should.have.been.calledOnce; + + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + }); + + // Simulates the desugaring of `await using iterator = bufferedAsyncMap(...)` — + // the runtime support for the syntax landed after Node 22.x, but the + // semantics we promise (cleanup on scope exit via Symbol.asyncDispose) are + // testable directly. + it('should run cleanup on scope exit (await using desugared)', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const promised = (async () => { + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + try { + // eslint-disable-next-line no-unreachable-loop + for await (const v of iterator) { + chai.expect(v).to.equal(0); + break; + } + } finally { + await iterator[Symbol.asyncDispose](); + } + })(); + + await clock.runAllAsync(); + await promised; + + returnSpy.should.have.been.calledOnce; + }); + + it('dispose should be idempotent after return()', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + const flow = (async () => { + await iterator.next(); + await iterator.return(); + await iterator[Symbol.asyncDispose](); + })(); + + await clock.runAllAsync(); + await flow; + + returnSpy.should.have.been.calledOnce; + }); +}); diff --git a/test/errors-fail-fast.spec.js b/test/errors-fail-fast.spec.js new file mode 100644 index 0000000..a0baedd --- /dev/null +++ b/test/errors-fail-fast.spec.js @@ -0,0 +1,360 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + promisableTimeout, + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +const should = chai.should(); + +/** + * @template T + * @param {T[]} items + * @returns {AsyncIterable} + */ +async function * fromArray (items) { + for (const item of items) { + yield item; + } +} + +/** + * @param {Error} expected + * @returns {AsyncIterable} + */ +async function * sourceThatThrows (expected) { + yield 0; + yield 1; + throw expected; +} + +describe('bufferedAsyncMap() errors: fail-fast', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + // --- Validation & default --- + + it("throws TypeError when errors is not 'fail-eventually'/'fail-fast'", () => { + should.Throw(() => { + bufferedAsyncMap( + fromArray([0, 1]), + async (item) => item, + // @ts-expect-error + { errors: 'isolate' } + ); + }, TypeError, "Expected errors to be 'fail-eventually' or 'fail-fast'"); + }); + + it("omitting errors option (or 'fail-eventually') keeps current AggregateError behaviour", async () => { + const errA = new Error('A'); + const errB = new Error('B'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(errA) + .onCall(1).rejects(errB); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + chai.expect(caught).to.be.instanceOf(AggregateError); + }); + + // --- fail-fast semantics --- + + it('first callback error rejects next .next() with that error; subsequent calls return done', async () => { + const reason = new Error('cb-error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(1).rejects(reason); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(20, 100), + callback, + { errors: 'fail-fast', bufferSize: 2 } + ); + + /** @type {Array<{ rejected: boolean, value?: unknown }>} */ + const results = []; + + const flow = (async () => { + for (let i = 0; i < 5; i += 1) { + try { + const r = await iterator.next(); + results.push({ rejected: false, value: r }); + } catch (err) { + results.push({ rejected: true, value: err }); + } + } + })(); + + await clock.runAllAsync(); + await flow; + + const firstReject = results.findIndex(r => r.rejected); + chai.expect(firstReject).to.be.greaterThan(-1); + chai.expect(results[firstReject]?.value).to.equal(reason); + for (const r of results.slice(firstReject + 1)) { + chai.expect(r).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + } + }); + + it('source error fails fast', async () => { + const sourceError = new Error('src-error'); + + const iterator = bufferedAsyncMap( + sourceThatThrows(sourceError), + async (item) => item, + { errors: 'fail-fast' } + ); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(sourceError); + }); + + it('source.next not called after first error; source.return called once', async () => { + const reason = new Error('halt'); + const source = yieldValuesOverTime(50, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(reason); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + callback, + { errors: 'fail-fast', bufferSize: 1 } + ); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(reason); + + const callsAfterFail = nextSpy.callCount; + + await iterator.next(); + await clock.runAllAsync(); + + nextSpy.callCount.should.equal(callsAfterFail); + returnSpy.should.have.been.calledOnce; + }); + + it('in-flight callbacks observe signal.aborted=true after fail-fast error', async () => { + const reason = new Error('halt'); + /** @type {AbortSignal | undefined} */ + let captured; + + let callIdx = 0; + const callback = async ( + /** @type {number} */ item, + /** @type {{ signal: AbortSignal }} */ { signal } + ) => { + const i = callIdx++; + captured = signal; + if (i === 1) { + // Slow second callback so the first failure has time to trigger fail-fast + // before this one settles. + await promisableTimeout(500); + throw reason; + } + if (i === 2) { + await promisableTimeout(2000); + return item; + } + return item; + }; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(20, 50), + callback, + { errors: 'fail-fast', bufferSize: 4 } + ); + + /** @type {unknown} */ + let caught; + const flow = (async () => { + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + })(); + + await clock.runAllAsync(); + await flow; + chai.expect(caught).to.equal(reason); + chai.expect(captured?.aborted).to.equal(true); + }); + + it('rejected error is the original (not AggregateError)', async () => { + const reason = new Error('original'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(reason); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { errors: 'fail-fast' })) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(reason); + chai.expect(caught).to.not.be.instanceOf(AggregateError); + }); + + it('fail-fast works in ordered:true mode', async () => { + const reason = new Error('ordered-fail'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(1).rejects(reason); + + /** @type {unknown} */ + let caught; + const flow = (async () => { + try { + for await (const v of bufferedAsyncMap( + yieldValuesOverTime(10, 100), + callback, + { errors: 'fail-fast', ordered: true, bufferSize: 3 } + )) { + // drain — should yield 0 then fail on 1 + chai.expect(v).to.equal(0); + } + } catch (err) { + caught = err; + } + })(); + + await clock.runAllAsync(); + await flow; + chai.expect(caught).to.equal(reason); + }); + + // --- Interaction with abort --- + + it('external abort wins over a fail-fast error not yet captured', async () => { + const externalReason = new Error('external'); + const ac = new AbortController(); + + let callIdx = 0; + const callback = async (/** @type {number} */ item) => { + const i = callIdx++; + if (i === 0) return item; + await promisableTimeout(2000); + throw new Error('would-have-failed'); + }; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(10, 50), + callback, + { signal: ac.signal, errors: 'fail-fast', bufferSize: 2 } + ); + + const first = iterator.next(); + await clock.tickAsync(60); + await first; + + ac.abort(externalReason); + + /** @type {unknown} */ + let caught; + try { + const n = iterator.next(); + await clock.runAllAsync(); + await n; + } catch (err) { + caught = err; + } + + chai.expect(caught).to.equal(externalReason); + }); + + it('external abort wins over queued errors in fail-eventually mode', async () => { + const externalReason = new Error('external'); + const ac = new AbortController(); + + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(new Error('cb-error-A')) + .onCall(1).rejects(new Error('cb-error-B')); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(10, 100), + callback, + { signal: ac.signal, bufferSize: 3 } + ); + + // Let some items load and fail in the background; the errors get queued. + await clock.tickAsync(50); + ac.abort(externalReason); + + /** @type {unknown} */ + let caught; + try { + const n = iterator.next(); + await clock.runAllAsync(); + await n; + } catch (err) { + caught = err; + } + + chai.expect(caught).to.equal(externalReason); + chai.expect(caught).to.not.be.instanceOf(AggregateError); + }); +}); diff --git a/test/errors.spec.js b/test/errors.spec.js new file mode 100644 index 0000000..7daa60b --- /dev/null +++ b/test/errors.spec.js @@ -0,0 +1,153 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; + +import { + bufferedAsyncMap, +} from '../index.js'; + +chai.use(chaiAsPromised); +chai.should(); + +/** + * @template T + * @param {T[]} items + * @returns {AsyncIterable} + */ +async function * fromArray (items) { + for (const item of items) { + yield item; + } +} + +/** + * @param {Error} expected + * @returns {AsyncIterable} + */ +async function * sourceThatThrows (expected) { + yield 0; + yield 1; + throw expected; +} + +describe('bufferedAsyncMap() errors', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('should reject with the original error (identity preserved) when exactly one error is captured', async () => { + const rejectionError = new Error('Single error'); + const callback = sinon.stub() + .returnsArg(0) + .onSecondCall().rejects(rejectionError); + + const promised = (async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback)) { + // drain + } + })(); + + await clock.runAllAsync(); + await promised.should.be.rejectedWith(rejectionError); + }); + + it('should reject with AggregateError containing all errors when 2+ errors are captured', async () => { + const errA = new Error('Error A'); + const errB = new Error('Error B'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(errA) + .onCall(1).rejects(errB); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + chai.expect(caught).to.be.instanceOf(AggregateError); + /** @type {AggregateError} */ (caught).errors.should.deep.equal([errA, errB]); + }); + + it('should include both source and callback errors in the AggregateError', async () => { + const sourceError = new Error('Source error'); + const callbackError = new Error('Callback error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(callbackError); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(sourceThatThrows(sourceError), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + chai.expect(caught).to.be.instanceOf(AggregateError); + const { errors } = /** @type {AggregateError} */ (caught); + errors.should.include(sourceError); + errors.should.include(callbackError); + }); + + it('should resolve cleanly when no errors are captured (no regression)', async () => { + /** @type {number[]} */ + const result = []; + + const promised = (async () => { + for await (const v of bufferedAsyncMap(fromArray([0, 1, 2]), async (item) => item * 2)) { + result.push(v); + } + })(); + + await clock.runAllAsync(); + await promised; + + result.should.have.members([0, 2, 4]); + }); + + it('should not retract values delivered before the throw', async () => { + const rejectionError = new Error('Late error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(2).rejects(rejectionError); + + /** @type {number[]} */ + const delivered = []; + /** @type {Error | undefined} */ + let caught; + + try { + for await (const v of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 1 })) { + delivered.push(v); + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + delivered.should.deep.equal([0, 1]); + chai.expect(caught).to.equal(rejectionError); + }); +}); diff --git a/test/memory.spec.js b/test/memory.spec.js new file mode 100644 index 0000000..14bb6a8 --- /dev/null +++ b/test/memory.spec.js @@ -0,0 +1,57 @@ +import chai from 'chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; + +chai.should(); + +/** @returns {AsyncGenerator} */ +async function * endlessSource () { + for (let i = 0; ; i++) { + yield i; + } +} + +describe('bufferedAsyncMap() memory', () => { + it('does not retain memory per pull on a long-lived unordered source', async function () { + const { gc } = globalThis; + + if (typeof gc !== 'function') { + // Needs --expose-gc — wired via .mocharc.json. Skip if run without it. + this.skip(); + return; + } + + const iterator = bufferedAsyncMap(endlessSource(), async (item) => item); + + const settle = async () => { + for (let i = 0; i < 3; i++) { + gc(); + await new Promise(resolve => { setImmediate(resolve); }); + } + }; + + // Warm up, then take a heap baseline once allocation has stabilised. + for (let i = 0; i < 2000; i++) { + await iterator.next(); + } + await settle(); + const baseline = process.memoryUsage().heapUsed; + + const items = 20000; + for (let i = 0; i < items; i++) { + await iterator.next(); + } + await settle(); + const growth = process.memoryUsage().heapUsed - baseline; + + await iterator.return(); + + // Before the per-pull park fix, nextValue() raced a single long-lived + // abortPromise every pull, leaving a PromiseReaction on it per item + // (~530 bytes/item — ~10 MB over 20k items). The per-pull park keeps + // this flat; the cap leaves a wide margin for unrelated allocation noise. + growth.should.be.below(3 * 1024 * 1024); + }); +}); diff --git a/test/per-task-signal.spec.js b/test/per-task-signal.spec.js new file mode 100644 index 0000000..8824fe5 --- /dev/null +++ b/test/per-task-signal.spec.js @@ -0,0 +1,184 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +chai.should(); + +describe('bufferedAsyncMap() per-task AbortSignal', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('invokes callback with (item, { signal }) where signal.aborted === false', async () => { + /** @type {Array<{ item: number, signalIsAbortSignal: boolean, abortedAtCall: boolean }>} */ + const observations = []; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item, opts) => { + observations.push({ + item, + signalIsAbortSignal: opts.signal instanceof AbortSignal, + abortedAtCall: opts.signal.aborted, + }); + return item; + } + ); + + const flow = (async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + })(); + + await clock.runAllAsync(); + await flow; + + observations.should.have.length(3); + for (const o of observations) { + o.signalIsAbortSignal.should.equal(true); + o.abortedAtCall.should.equal(false); + } + }); + + it('in-flight callbacks observe signal.aborted=true after iterator.return()', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + chai.expect(capturedSignal).to.exist; + chai.expect(capturedSignal?.aborted).to.equal(false); + + const returned = iterator.return(); + await clock.runAllAsync(); + await returned; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('in-flight callbacks observe signal.aborted=true after iterator.throw()', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + + const flow = (async () => { + try { + await iterator.throw(new Error('boom')); + } catch { + // expected + } + })(); + + await clock.runAllAsync(); + await flow; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('in-flight callbacks observe signal.aborted=true after Symbol.asyncDispose', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + + const disposed = iterator[Symbol.asyncDispose](); + await clock.runAllAsync(); + await disposed; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('in-flight callbacks do NOT observe signal.aborted=true on natural source exhaustion', async () => { + /** @type {AbortSignal | undefined} */ + let lastSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(2, 100), + async (item, { signal }) => { + lastSignal = signal; + return item; + } + ); + + /** @type {number[]} */ + const result = []; + const flow = (async () => { + for await (const v of iterator) { + result.push(v); + // Capture the signal state *after* the callback has been invoked but + // before the iterator finalises by reading the signal aborted state + // before draining further. + chai.expect(lastSignal?.aborted).to.equal(false); + } + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.length(2); + }); + + it('callbacks ignoring the second arg keep working unmodified', async () => { + /** @type {number[]} */ + const result = []; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item) => item * 10 + ); + + const flow = (async () => { + for await (const v of iterator) { + result.push(v); + } + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.members([0, 10, 20]); + }); +}); diff --git a/test/return.spec.js b/test/return.spec.js index 604d609..602fe9a 100644 --- a/test/return.spec.js +++ b/test/return.spec.js @@ -57,6 +57,57 @@ describe('bufferedAsyncMap() AsyncInterface return()', () => { await nextAfterReturn.should.eventually.deep.equal({ done: true, value: undefined }); }); + it('should resolve return(value) to { done: true, value } per the iterator protocol', async () => { + const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); + + await iterator.next().should.eventually.deep.equal({ value: 0 }); + + // return(value) reflects the passed argument... + const firstReturn = iterator.return('sentinel'); + await clock.runAllAsync(); + await firstReturn.should.eventually.deep.equal({ done: true, value: 'sentinel' }); + + // ...and each subsequent call reflects its own argument (not "remembered"). + const secondReturn = iterator.return('other'); + await clock.runAllAsync(); + await secondReturn.should.eventually.deep.equal({ done: true, value: 'other' }); + + // A no-argument return() still resolves to { done: true, value: undefined }. + const bareReturn = iterator.return(); + await clock.runAllAsync(); + await bareReturn.should.eventually.deep.equal({ done: true, value: undefined }); + }); + + it('should await a thenable return(value) argument', async () => { + const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); + + const returnValue = iterator.return(Promise.resolve('awaited')); + await clock.runAllAsync(); + + await returnValue.should.eventually.deep.equal({ done: true, value: 'awaited' }); + }); + + it('should call the source iterator return() only once across repeated return() calls', async () => { + const source = yieldValuesOverTime(count, (i) => i % 2 === 1 ? 2000 : 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + await iterator.next().should.eventually.deep.equal({ value: 0 }); + + const first = iterator.return('a'); + const second = iterator.return('b'); + await clock.runAllAsync(); + + await first.should.eventually.deep.equal({ done: true, value: 'a' }); + await second.should.eventually.deep.equal({ done: true, value: 'b' }); + returnSpy.should.have.been.calledOnce; + }); + it('should be called when a loop breaks', async () => { const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item, { bufferSize: 3 }); const returnSpy = sinon.spy(iterator, 'return'); diff --git a/test/throw.spec.js b/test/throw.spec.js index 52fc2e8..913fa90 100644 --- a/test/throw.spec.js +++ b/test/throw.spec.js @@ -1,3 +1,5 @@ +/* eslint-disable promise/prefer-await-to-then */ + import chai from 'chai'; import chaiAsPromised from 'chai-as-promised'; import chaiQuantifiers from 'chai-quantifiers'; @@ -17,60 +19,85 @@ chai.use(sinonChai); chai.should(); -describe.skip('bufferedAsyncMap() AsyncInterface throw()', () => { +describe('bufferedAsyncMap() AsyncInterface throw()', () => { const count = 6; + /** @type {import('sinon').SinonFakeTimers} */ + let clock; /** @type {AsyncIterable} */ let baseAsyncIterable; - /** @type {number[]} */ - let expectedResult; beforeEach(() => { + clock = sinon.useFakeTimers(); baseAsyncIterable = yieldValuesOverTime(count, (i) => i % 2 === 1 ? 2000 : 100); + }); - expectedResult = []; - for (let i = 0; i < count; i++) { - expectedResult.push(i); - } + afterEach(() => { + sinon.restore(); }); - it('should end the iterator when called', async () => { - const errorToThrow = new Error('Yet another error'); + it('throw(err) rejects with err and subsequent .next() returns done', async () => { + const errorToThrow = new Error('thrown'); const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); - await iterator.next().should.eventually.deep.equal({ value: 0 }); - await iterator.throw(errorToThrow).should.eventually.be.rejectedWith(errorToThrow); - await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + const first = iterator.next(); + await clock.runAllAsync(); + await first.should.eventually.deep.equal({ value: 0 }); + + const tossed = iterator.throw(errorToThrow).catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await tossed).to.deep.equal({ rejectedWith: errorToThrow }); + + const after = iterator.next(); + await clock.runAllAsync(); + await after.should.eventually.deep.equal({ done: true, value: undefined }); }); - it('should be called when a loop throws', async () => { - const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); - const returnSpy = sinon.spy(iterator, 'return'); - const throwSpy = sinon.spy(iterator, 'throw'); - const errorToThrow = new Error('Yet another error'); - - let caught; - - // Inspired by https://github.com/WebKit/WebKit/blob/1a09d8d95ba6085df4ef44306c4bfc9fc86fdbc7/JSTests/test262/test/language/expressions/yield/star-rhs-iter-thrw-thrw-get-err.js - async function * g () { - try { - yield * iterator; - } catch (err) { - caught = err; - throw err; + it('throw(err) calls source.return() once', async () => { + const errorToThrow = new Error('thrown'); + const source = yieldValuesOverTime(50, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + const first = iterator.next(); + await clock.tickAsync(0); + await first; + + const tossed = iterator.throw(errorToThrow).catch(err => err); + await clock.runAllAsync(); + await tossed; + + returnSpy.should.have.been.calledOnce; + }); + + it('in-flight callbacks observe signal.aborted=true after throw()', async () => { + const errorToThrow = new Error('thrown'); + /** @type {AbortSignal | undefined} */ + let captured; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(50, 100), + async (item, { signal }) => { + captured = signal; + return item; } - } + ); - const wrappedIterator = g(); + const first = iterator.next(); + await clock.tickAsync(0); + await first; + chai.expect(captured?.aborted).to.equal(false); - await wrappedIterator.next().should.eventually.deep.equal({ done: false, value: 0 }); - await wrappedIterator.throw(errorToThrow).should.eventually.be.rejectedWith(errorToThrow); - await wrappedIterator.next().should.eventually.deep.equal({ done: true, value: undefined }); - await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + const tossed = iterator.throw(errorToThrow).catch(err => err); + await clock.runAllAsync(); + await tossed; - (caught || {}).should.equal(errorToThrow); - throwSpy.should.have.been.calledOnceWithExactly(errorToThrow); - returnSpy.should.not.have.been.called; + chai.expect(captured?.aborted).to.equal(true); }); }); diff --git a/test/values.spec.js b/test/values.spec.js index a490bae..efac79e 100644 --- a/test/values.spec.js +++ b/test/values.spec.js @@ -612,7 +612,8 @@ describe('bufferedAsyncMap() values', () => { throw new Error('Expected a rejection'); }, err => { - err.should.equal(rejectionError); + const captured = err instanceof AggregateError ? err.errors[0] : err; + captured.should.equal(rejectionError); } ); @@ -648,6 +649,47 @@ describe('bufferedAsyncMap() values', () => { ); }); + it('should normalize a non-Error rejection from the callback into an Error', async () => { + baseAsyncIterable = yieldValuesOverTime(count, (i) => i % 2 === 1 ? 2000 : 100); + + let callCount = 0; + /** + * @param {number} item + * @returns {Promise} + */ + const callback = async (item) => { + callCount += 1; + if (callCount === 2) { + // Reject with a non-Error value — exercises normalizeError's fallback. + // eslint-disable-next-line no-throw-literal + throw 'a plain string rejection'; + } + return item; + }; + + /** @type {number[]} */ + const drained = []; + + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + for await (const value of bufferedAsyncMap(baseAsyncIterable, callback)) { + drained.push(value); + } + })() + .then( + () => { + throw new Error('Expected a rejection'); + }, + err => ({ rejectedWith: err }) + ); + + await clock.runAllAsync(); + const outcome = await promisedResult; + + outcome.rejectedWith.should.be.an.instanceOf(Error); + outcome.rejectedWith.message.should.equal('Unknown callback error'); + }); + it('should throw TypeError on non-object value from AsyncIterator interface', async () => { const { asyncIterable, @@ -673,7 +715,7 @@ describe('bufferedAsyncMap() values', () => { }, err => { if (err instanceof TypeError) { - err.message.should.equal('Expected an object value'); + err.message.should.equal('Expected source iterator next() result to be an object'); } else { throw new TypeError('Expected a TypeError'); } @@ -709,7 +751,7 @@ describe('bufferedAsyncMap() values', () => { }, err => { if (err instanceof TypeError) { - err.message.should.equal('Expected an object value'); + err.message.should.equal('Expected sub-iterator next() result to be an object'); } else { throw new TypeError('Expected a TypeError'); } @@ -720,6 +762,66 @@ describe('bufferedAsyncMap() values', () => { await promisedResult; }); + it('should surface a malformed async iterable from the callback (fail-eventually)', async () => { + const malformedError = new Error('bad iterable'); + + /** @returns {AsyncIterable} */ + const malformedIterable = () => ({ + [Symbol.asyncIterator]: () => { + throw malformedError; + }, + }); + + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {string[]} */ + const result = []; + + for await (const value of bufferedAsyncMap(baseAsyncIterable, malformedIterable)) { + result.push(value); + } + + return result; + })() + .then( + () => { + throw new Error('Expected a rejection'); + }, + err => ({ rejectedWith: err }) + ); + + await clock.runAllAsync(); + const outcome = await promisedResult; + + const captured = outcome.rejectedWith instanceof AggregateError + ? outcome.rejectedWith.errors[0] + : outcome.rejectedWith; + captured.should.equal(malformedError); + }); + + it('should surface a malformed async iterable from the callback (fail-fast)', async () => { + const malformedError = new Error('bad iterable'); + + /** @returns {AsyncIterable} */ + const malformedIterable = () => ({ + [Symbol.asyncIterator]: () => { + throw malformedError; + }, + }); + + const iterator = bufferedAsyncMap( + baseAsyncIterable, + malformedIterable, + { errors: 'fail-fast' } + )[Symbol.asyncIterator](); + + const rejected = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + const outcome = await rejected; + + outcome.should.deep.equal({ rejectedWith: malformedError }); + }); + it('should give back pressure', async () => { baseAsyncIterable = yieldValuesOverTime(100, (i) => i % 2 === 1 ? 2000 : 100); const baseAsyncIterator = baseAsyncIterable[Symbol.asyncIterator](); @@ -923,5 +1025,95 @@ describe('bufferedAsyncMap() values', () => { ]); duration.should.equal(6300); }); + + it('forwards options.signal to the underlying bufferedAsyncMap', async () => { + const reason = new Error('merge-aborted'); + const ac = new AbortController(); + + const iterator = mergeIterables([ + yieldValuesOverTimeWithPrefix(6, 100, 'a-'), + yieldValuesOverTimeWithPrefix(6, 100, 'b-'), + ], { signal: ac.signal })[Symbol.asyncIterator](); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await next).to.deep.equal({ rejectedWith: reason }); + }); + + it("forwards errors: 'fail-fast' to surface the original rejection", async () => { + const sourceError = new Error('merge-fail-fast'); + + async function * throwingInput () { + yield 'b-0'; + await promisableTimeout(100); + throw sourceError; + } + + const iterator = mergeIterables([ + yieldValuesOverTimeWithPrefix(6, 100, 'a-'), + throwingInput(), + ], { errors: 'fail-fast' })[Symbol.asyncIterator](); + + /** @type {Array<{ rejected: boolean, value?: unknown }>} */ + const results = []; + + const flow = (async () => { + for (let i = 0; i < 6; i += 1) { + try { + const r = await iterator.next(); + results.push({ rejected: false, value: r }); + if (r.done) break; + } catch (err) { + results.push({ rejected: true, value: err }); + } + } + })(); + + await clock.runAllAsync(); + await flow; + + const firstReject = results.findIndex(r => r.rejected); + chai.expect(firstReject).to.be.greaterThan(-1); + chai.expect(results[firstReject]?.value).to.equal(sourceError); + }); + + it('forwards ordered: true to drain inputs in source order', async () => { + // Asymmetric timing: the second source is ten times faster than the first. + // Under the default ordered:false this would interleave (second-* would + // arrive between first-0 and first-1); under ordered:true the first + // iterable is drained completely before any value from the second. + // Create the promise first, then have it be fully executed using clock.runAllAsync() + const promisedResult = (async () => { + /** @type {string[]} */ + const rawResult = []; + + for await (const value of mergeIterables([ + yieldValuesOverTimeWithPrefix(3, 1000, 'first-'), + yieldValuesOverTimeWithPrefix(3, 100, 'second-'), + ], { ordered: true })) { + rawResult.push(value); + } + + return rawResult; + })(); + + await clock.runAllAsync(); + const result = await promisedResult; + + result.should.deep.equal([ + 'first-0', + 'first-1', + 'first-2', + 'second-0', + 'second-1', + 'second-2', + ]); + }); }); }); diff --git a/test/yield.spec.js b/test/yield.spec.js index e26c239..e17ed42 100644 --- a/test/yield.spec.js +++ b/test/yield.spec.js @@ -96,8 +96,10 @@ describe('The built in "yield *" functionality', () => { } asyncIterator.next.should.have.been.calledTwice; + // Native `yield *` does not call the inner iterator's .return() when the + // inner .next() rejects — the delegation is already completing abruptly, + // so there is nothing to close. .throw() likewise isn't called here. asyncIterator.return.should.not.have.been.called; - // TODO: I thougt it should have been? asyncIterator.throw.should.not.have.been.called; await mainIterator[Symbol.asyncIterator]().next().should.eventually.deep.equal({ done: true, value: undefined }); diff --git a/tsconfig.json b/tsconfig.json index 28b2974..48916c9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,5 +1,5 @@ { - "extends": "@voxpelli/tsconfig/node18.json", + "extends": "@voxpelli/tsconfig/node20.json", "files": [ "index.js", ],