Skip to content

Commit 42cae48

Browse files
jeswrRubenVerborgh
authored andcommitted
Add MappingIterator.
1 parent d036485 commit 42cae48

6 files changed

Lines changed: 1171 additions & 164 deletions

File tree

asynciterator.ts

Lines changed: 114 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
2525
taskScheduler = scheduler;
2626
}
2727

28+
// Returns a function that calls `fn` with `self` as `this` pointer. */
29+
function bind<T extends Function>(fn: T, self?: object): T {
30+
return self ? fn.bind(self) : fn;
31+
}
32+
2833
/**
2934
ID of the INIT state.
3035
An iterator is initializing if it is preparing main item generation.
@@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter {
161166
@param {object?} self The `this` pointer for the callback
162167
*/
163168
forEach(callback: (item: T) => void, self?: object) {
164-
this.on('data', self ? callback.bind(self) : callback);
169+
this.on('data', bind(callback, self));
165170
}
166171

167172
/**
@@ -455,8 +460,8 @@ export class AsyncIterator<T> extends EventEmitter {
455460
@param {object?} self The `this` pointer for the mapping function
456461
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
457462
*/
458-
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
459-
return this.transform({ map: self ? map.bind(self) : map });
463+
map<D>(map: MapFunction<T, D>, self?: any): AsyncIterator<D> {
464+
return new MappingIterator(this, bind(map, self));
460465
}
461466

462467
/**
@@ -469,7 +474,8 @@ export class AsyncIterator<T> extends EventEmitter {
469474
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
470475
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
471476
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
472-
return this.transform({ filter: self ? filter.bind(self) : filter });
477+
filter = bind(filter, self);
478+
return this.map(item => filter(item) ? item : null);
473479
}
474480

475481
/**
@@ -510,7 +516,7 @@ export class AsyncIterator<T> extends EventEmitter {
510516
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
511517
*/
512518
skip(offset: number): AsyncIterator<T> {
513-
return this.transform({ offset });
519+
return this.map(item => offset-- > 0 ? null : item);
514520
}
515521

516522
/**
@@ -575,6 +581,14 @@ function addSingleListener(source: EventEmitter, eventName: string,
575581
source.on(eventName, listener);
576582
}
577583

584+
// Validates an AsyncIterator for use as a source within another AsyncIterator
585+
function ensureSourceAvailable<S>(source?: AsyncIterator<S>, allowDestination = false) {
586+
if (!source || !isFunction(source.read) || !isFunction(source.on))
587+
throw new Error(`Invalid source: ${source}`);
588+
if (!allowDestination && (source as any)._destination)
589+
throw new Error('The source already has a destination');
590+
return source as InternalSource<S>;
591+
}
578592

579593
/**
580594
An iterator that doesn't emit any items.
@@ -777,9 +791,90 @@ export class IntegerIterator extends AsyncIterator<number> {
777791
}
778792
}
779793

794+
/**
795+
* A synchronous mapping function from one element to another.
796+
* A return value of `null` means that nothing should be emitted for a particular item.
797+
*/
798+
export type MapFunction<S, D = S> = (item: S) => D | null;
799+
800+
/**
801+
* Function that maps an element to itself.
802+
*/
803+
export function identity<S>(item: S): typeof item {
804+
return item;
805+
}
806+
807+
/**
808+
An iterator that synchronously transforms every item from its source
809+
by applying a mapping function.
810+
@extends module:asynciterator.AsyncIterator
811+
*/
812+
export class MappingIterator<S, D = S> extends AsyncIterator<D> {
813+
protected readonly _map: MapFunction<S, D>;
814+
protected readonly _source: InternalSource<S>;
815+
protected readonly _destroySource: boolean;
816+
817+
/**
818+
* Applies the given mapping to the source iterator.
819+
*/
820+
constructor(
821+
source: AsyncIterator<S>,
822+
map: MapFunction<S, D> = identity as MapFunction<S, D>,
823+
options: SourcedIteratorOptions = {}
824+
) {
825+
super();
826+
this._map = map;
827+
this._source = ensureSourceAvailable(source);
828+
this._destroySource = options.destroySource !== false;
829+
830+
// Close if the source is already empty
831+
if (source.done) {
832+
this.close();
833+
}
834+
// Otherwise, wire up the source for reading
835+
else {
836+
this._source._destination = this;
837+
this._source.on('end', destinationClose);
838+
this._source.on('error', destinationEmitError);
839+
this._source.on('readable', destinationSetReadable);
840+
this.readable = this._source.readable;
841+
}
842+
}
843+
844+
/** Tries to read the next item from the iterator. */
845+
read(): D | null {
846+
if (!this.done) {
847+
// Try to read an item that maps to a non-null value
848+
if (this._source.readable) {
849+
let item: S | null, mapped: D | null;
850+
while ((item = this._source.read()) !== null) {
851+
if ((mapped = this._map(item)) !== null)
852+
return mapped;
853+
}
854+
}
855+
this.readable = false;
856+
857+
// Close this iterator if the source is empty
858+
if (this._source.done)
859+
this.close();
860+
}
861+
return null;
862+
}
863+
864+
/* Cleans up the source iterator and ends. */
865+
protected _end(destroy: boolean) {
866+
this._source.removeListener('end', destinationClose);
867+
this._source.removeListener('error', destinationEmitError);
868+
this._source.removeListener('readable', destinationSetReadable);
869+
delete this._source._destination;
870+
if (this._destroySource)
871+
this._source.destroy();
872+
super._end(destroy);
873+
}
874+
}
780875

781876
/**
782-
A iterator that maintains an internal buffer of items.
877+
An iterator that maintains an internal buffer of items.
783878
This class serves as a base class for other iterators
784879
with a typically complex item generation process.
785880
@extends module:asynciterator.AsyncIterator
@@ -1150,14 +1245,10 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
11501245
@param {object} source The source to validate
11511246
@param {boolean} allowDestination Whether the source can already have a destination
11521247
*/
1153-
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false) {
1248+
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false): InternalSource<S> {
11541249
if (this._source || typeof this._createSource !== 'undefined')
11551250
throw new Error('The source cannot be changed after it has been set');
1156-
if (!source || !isFunction(source.read) || !isFunction(source.on))
1157-
throw new Error(`Invalid source: ${source}`);
1158-
if (!allowDestination && (source as any)._destination)
1159-
throw new Error('The source already has a destination');
1160-
return source as InternalSource<S>;
1251+
return ensureSourceAvailable(source, allowDestination);
11611252
}
11621253

11631254
/**
@@ -1240,9 +1331,15 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
12401331
}
12411332
}
12421333

1334+
function destinationSetReadable<S>(this: InternalSource<S>) {
1335+
this._destination!.readable = true;
1336+
}
12431337
function destinationEmitError<S>(this: InternalSource<S>, error: Error) {
12441338
this._destination!.emit('error', error);
12451339
}
1340+
function destinationClose<S>(this: InternalSource<S>) {
1341+
this._destination!.close();
1342+
}
12461343
function destinationCloseWhenDone<S>(this: InternalSource<S>) {
12471344
(this._destination as any)._closeWhenDone();
12481345
}
@@ -1956,15 +2053,18 @@ function isSourceExpression<T>(object: any): object is SourceExpression<T> {
19562053
return object && (isEventEmitter(object) || isPromise(object) || isFunction(object));
19572054
}
19582055

2056+
export interface SourcedIteratorOptions {
2057+
destroySource?: boolean;
2058+
}
2059+
19592060
export interface BufferedIteratorOptions {
19602061
maxBufferSize?: number;
19612062
autoStart?: boolean;
19622063
}
19632064

1964-
export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
2065+
export interface TransformIteratorOptions<S> extends SourcedIteratorOptions, BufferedIteratorOptions {
19652066
source?: SourceExpression<S>;
19662067
optional?: boolean;
1967-
destroySource?: boolean;
19682068
}
19692069

19702070
export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"test:microtask": "npm run mocha",
3232
"test:immediate": "npm run mocha -- --require test/config/useSetImmediate.js",
3333
"mocha": "c8 mocha",
34-
"lint": "eslint asynciterator.ts test",
34+
"lint": "eslint asynciterator.ts test perf",
3535
"docs": "npm run build:module && npm run jsdoc",
3636
"jsdoc": "jsdoc -c jsdoc.json"
3737
},

perf/.eslintrc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
rules: {
3+
no-console: off,
4+
},
5+
}
6+

perf/MappingIterator-perf.js

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { ArrayIterator, range } from '../dist/asynciterator.js';
2+
3+
function noop() {
4+
// empty function to drain an iterator
5+
}
6+
7+
async function perf(warmupIterator, iterator, description) {
8+
return new Promise(res => {
9+
const now = performance.now();
10+
iterator.on('data', noop);
11+
iterator.on('end', () => {
12+
console.log(description, performance.now() - now);
13+
res();
14+
});
15+
});
16+
}
17+
18+
function run(iterator) {
19+
return new Promise(res => {
20+
iterator.on('data', noop);
21+
iterator.on('end', () => {
22+
res();
23+
});
24+
});
25+
}
26+
27+
function baseIterator() {
28+
let i = 0;
29+
return new ArrayIterator(new Array(20000000).fill(true).map(() => i++));
30+
}
31+
32+
function createMapped(filter) {
33+
let iterator = baseIterator();
34+
for (let j = 0; j < 20; j++) {
35+
iterator = iterator.map(item => item);
36+
if (filter)
37+
iterator = iterator.filter(item => item % (j + 2) === 0);
38+
}
39+
return iterator;
40+
}
41+
42+
(async () => {
43+
await run(baseIterator()); // warm-up run
44+
45+
await perf(baseIterator(), createMapped(), '20000000 elems 20 maps\t\t\t\t\t');
46+
await perf(createMapped(true), createMapped(true), '20000000 elems 20 maps 20 filter\t\t\t');
47+
48+
const now = performance.now();
49+
for (let j = 0; j < 100_000; j++) {
50+
let it = range(1, 100);
51+
for (let k = 0; k < 5; k++)
52+
it = it.map(item => item);
53+
54+
await new Promise((resolve, reject) => {
55+
it.on('data', () => null);
56+
it.on('end', resolve);
57+
it.on('error', reject);
58+
});
59+
}
60+
console.log('100_000 iterators each with 5 maps and 100 elements\t', performance.now() - now);
61+
})();

0 commit comments

Comments
 (0)