Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .knip.jsonc
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
{
"$schema": "https://unpkg.com/knip@5/schema.json",
"entry": [
"index.js!",
"benchmark/*.js"
]
"$schema": "https://unpkg.com/knip@5/schema.json"
}
150 changes: 84 additions & 66 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// TODO: THERE'S ACTUALLY A "throw" method MENTION IN https://tc39.es/ecma262/#sec-generator-function-definitions-runtime-semantics-evaluation: "NOTE: Exceptions from the inner iterator throw method are propagated. Normal completions from an inner throw method are processed similarly to an inner next." THOUGH NOT SURE HOW TO TRIGGER IT IN PRACTICE, SEE yield.spec.js

import { findLeastTargeted } from './lib/find-least-targeted.js';
import { arrayDeleteInPlace, makeIterableAsync } from './lib/misc.js';
import { arrayDeleteInPlace, makeIterableAsync, normalizeError } from './lib/misc.js';
import { isAsyncIterable, isIterable, isPartOfArray } from './lib/type-checks.js';

/**
Expand All @@ -19,6 +19,20 @@ async function * yieldIterable (item) {
yield * item;
}

/**
* Generator that yields items from array and an additional item
* Avoids array spread in hot paths
*
* @template T, U
* @param {T[]} array
* @param {U} item
* @returns {Iterable<T | U>}
*/
function * yieldArrayWithItem (array, item) {
yield * array;
yield item;
}

/**
* @template T
* @param {Array<AsyncIterable<T> | Iterable<T> | T[]>} input
Expand Down Expand Up @@ -114,8 +128,12 @@ export function bufferedAsyncMap (input, callback, options) {
if (ordered) {
currentSubIterator = subIterators[0];
} else {
const targets = mainReturnedDone
? subIterators
: yieldArrayWithItem(subIterators, asyncIterator);

const iterator = findLeastTargeted(
mainReturnedDone ? subIterators : [...subIterators, asyncIterator],
targets,
bufferedPromises,
promisesToSourceIteratorMap
);
Expand All @@ -126,83 +144,83 @@ export function bufferedAsyncMap (input, callback, options) {
/** @type {BufferPromise} */
const bufferPromise = currentSubIterator
? Promise.resolve(currentSubIterator.next())
.catch(err => ({
err: err instanceof Error ? err : new Error('Unknown subiterator error'),
}))
.then(async result => {
if (typeof result !== 'object') {
throw new TypeError('Expected an object value');
}
if ('err' in result || result.done) {
arrayDeleteInPlace(subIterators, currentSubIterator);
}

/** @type {Awaited<BufferPromise>} */
const promiseValue = {
bufferPromise,
fromSubIterator: true,
...(
'err' in result
? { done: true, value: undefined, ...result }
: result
),
};

return promiseValue;
})
: Promise.resolve(asyncIterator.next())
.catch(err => ({
err: err instanceof Error ? err : new Error('Unknown iterator error'),
}))
.then(async result => {
if (typeof result !== 'object') {
throw new TypeError('Expected an object value');
}
if ('err' in result || result.done) {
mainReturnedDone = true;
return {
.catch(err => ({
err: normalizeError(err, 'Unknown subiterator error'),
}))
.then(async result => {
if (typeof result !== 'object') {
throw new TypeError('Expected an object value');
}
if ('err' in result || result.done) {
arrayDeleteInPlace(subIterators, currentSubIterator);
}

/** @type {Awaited<BufferPromise>} */
const promiseValue = {
bufferPromise,
fromSubIterator: true,
...(
'err' in result
? { done: true, value: undefined, ...result }
: result
),
};
}

// eslint-disable-next-line promise/no-callback-in-promise
const callbackResult = callback(result.value);
const isSubIterator = isAsyncIterable(callbackResult);

/** @type {Awaited<BufferPromise>} */
let promiseValue;

try {
const value = await callbackResult;

promiseValue = {
bufferPromise,
isSubIterator,
value,
};
} catch (err) {
promiseValue = {
bufferPromise,
done: true,
err: err instanceof Error ? err : new Error('Unknown callback error'),
value: undefined,
};
}

return promiseValue;
});
return promiseValue;
})
: Promise.resolve(asyncIterator.next())
.catch(err => ({
err: normalizeError(err, 'Unknown iterator error'),
}))
.then(async result => {
if (typeof result !== 'object') {
throw new TypeError('Expected an object value');
}
if ('err' in result || result.done) {
mainReturnedDone = true;
return {
bufferPromise,
...(
'err' in result
? { done: true, value: undefined, ...result }
: result
),
};
}

// eslint-disable-next-line promise/no-callback-in-promise
const callbackResult = callback(result.value);
const isSubIterator = isAsyncIterable(callbackResult);

/** @type {Awaited<BufferPromise>} */
let promiseValue;

try {
const value = await callbackResult;

promiseValue = {
bufferPromise,
isSubIterator,
value,
};
} catch (err) {
promiseValue = {
bufferPromise,
done: true,
err: normalizeError(err, 'Unknown callback error'),
value: undefined,
};
}

return promiseValue;
});

promisesToSourceIteratorMap.set(bufferPromise, currentSubIterator || asyncIterator);

if (ordered && currentSubIterator) {
let i = 0;

while (promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) {
while (i < bufferedPromises.length && promisesToSourceIteratorMap.get(/** @type {BufferPromise} */ (bufferedPromises[i])) === currentSubIterator) {
i += 1;
}

Expand Down Expand Up @@ -241,7 +259,7 @@ export function bufferedAsyncMap (input, callback, options) {
return { done: true, value: undefined };
} else if (err || done) {
if (err && !hasError) {
hasError = err instanceof Error ? err : new Error('Unknown error');
hasError = normalizeError(err, 'Unknown error');
}

if (fromSubIterator || subIterators.length > 0) {
Expand Down
11 changes: 11 additions & 0 deletions lib/misc.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ export function arrayDeleteInPlace (list, value) {
list.splice(index, 1);
}
}

/**
* Normalizes an error to ensure it's an Error instance
*
* @param {unknown} err
* @param {string} defaultMessage
* @returns {Error}
*/
export function normalizeError (err, defaultMessage) {
return err instanceof Error ? err : new Error(defaultMessage);
}
12 changes: 10 additions & 2 deletions lib/type-checks.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
/**
* Internal helper to check if a value is an object
*
* @param {unknown} value
* @returns {value is object}
*/
const isValueObject = (value) => Boolean(value && typeof value === 'object');

/**
* @param {unknown} value
* @returns {value is Iterable<unknown>}
*/
export const isIterable = (value) => Boolean(value && typeof value === 'object' && Symbol.iterator in value);
export const isIterable = (value) => isValueObject(value) && Symbol.iterator in value;

/**
* @param {unknown} value
* @returns {value is AsyncIterable<unknown>}
*/
export const isAsyncIterable = (value) => Boolean(value && typeof value === 'object' && Symbol.asyncIterator in value);
export const isAsyncIterable = (value) => isValueObject(value) && Symbol.asyncIterator in value;

/**
* @template Values
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"author": "Pelle Wessman <pelle@kodfabrik.se> (http://kodfabrik.se/)",
"license": "MIT",
"engines": {
"node": ">=18.6.0"
"node": ">=18.18.0"
},
"type": "module",
"exports": "./index.js",
Expand Down Expand Up @@ -49,7 +49,6 @@
"devDependencies": {
"@types/chai": "^4.3.19",
"@types/chai-as-promised": "^7.1.8",
"@types/chai-quantifiers": "^1.0.4",
"@types/mocha": "^10.0.8",
"@types/node": "^18.19.50",
"@types/sinon": "^17.0.3",
Expand Down
Loading