Skip to content

Commit 3a4a911

Browse files
jeswrRubenVerborgh
authored andcommitted
Introduce MappingIterator.
1 parent fec14f4 commit 3a4a911

6 files changed

Lines changed: 670 additions & 62 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jobs:
99
fail-fast: false
1010
matrix:
1111
node-version:
12-
- 10.x
12+
- 12.x
1313
- 12.x
1414
- 14.x
1515
- 16.x

asynciterator.ts

Lines changed: 110 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ export function setTaskScheduler(scheduler: TaskScheduler): void {
2525
taskScheduler = scheduler;
2626
}
2727

28+
/** Binds a function to an object */
29+
function bind(fn: Function, self: any) {
30+
return self ? fn.bind(self) : fn;
31+
}
32+
2833
/**
2934
ID of the INIT state.
3035
An iterator is initializing if it is preparing main item generation.
@@ -161,7 +166,7 @@ export class AsyncIterator<T> extends EventEmitter {
161166
@param {object?} self The `this` pointer for the callback
162167
*/
163168
forEach(callback: (item: T) => void, self?: object) {
164-
this.on('data', self ? callback.bind(self) : callback);
169+
this.on('data', bind(callback, self));
165170
}
166171

167172
/**
@@ -451,12 +456,14 @@ export class AsyncIterator<T> extends EventEmitter {
451456
/**
452457
Maps items from this iterator using the given function.
453458
After this operation, only read the returned iterator instead of the current one.
454-
@param {Function} map A mapping function to call on this iterator's (remaining) items
459+
@param {Function} map A mapping function to call on this iterator's (remaining) items.
460+
A `null` value indicates that nothing should be returned for a particular item..
455461
@param {object?} self The `this` pointer for the mapping function
462+
@param {boolean?} close Close the iterator after an item is mapped to null
456463
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
457464
*/
458-
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
459-
return this.transform({ map: self ? map.bind(self) : map });
465+
map<D>(map: (item: T, it: AsyncIterator<any>) => D | null, self?: any): AsyncIterator<D> {
466+
return new MappingIterator<T, D>(this, [bind(map, self)]);
460467
}
461468

462469
/**
@@ -469,7 +476,8 @@ export class AsyncIterator<T> extends EventEmitter {
469476
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
470477
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
471478
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
472-
return this.transform({ filter: self ? filter.bind(self) : filter });
479+
filter = bind(filter, self);
480+
return this.map(item => filter(item) ? item : null);
473481
}
474482

475483
/**
@@ -510,7 +518,7 @@ export class AsyncIterator<T> extends EventEmitter {
510518
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
511519
*/
512520
skip(offset: number): AsyncIterator<T> {
513-
return this.transform({ offset });
521+
return this.map(item => offset-- > 0 ? null : item);
514522
}
515523

516524
/**
@@ -520,7 +528,7 @@ export class AsyncIterator<T> extends EventEmitter {
520528
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items
521529
*/
522530
take(limit: number): AsyncIterator<T> {
523-
return this.transform({ limit });
531+
return this.map((item, it) => limit-- > 0 ? item : (it.close(), null));
524532
}
525533

526534
/**
@@ -531,7 +539,7 @@ export class AsyncIterator<T> extends EventEmitter {
531539
@returns {module:asynciterator.AsyncIterator} A new iterator with items in the given range
532540
*/
533541
range(start: number, end: number): AsyncIterator<T> {
534-
return this.transform({ offset: start, limit: Math.max(end - start + 1, 0) });
542+
return this.skip(start).take(Math.max(end - start + 1, 0));
535543
}
536544

537545
/**
@@ -1055,6 +1063,14 @@ export class BufferedIterator<T> extends AsyncIterator<T> {
10551063
}
10561064
}
10571065

1066+
function _validateSource<S>(source?: AsyncIterator<S>, allowDestination = false) {
1067+
if (!source || !isFunction(source.read) || !isFunction(source.on))
1068+
throw new Error(`Invalid source: ${source}`);
1069+
if (!allowDestination && (source as any)._destination)
1070+
throw new Error('The source already has a destination');
1071+
return source as InternalSource<S>;
1072+
}
1073+
10581074
/**
10591075
An iterator that generates items based on a source iterator.
10601076
This class serves as a base class for other iterators.
@@ -1153,11 +1169,7 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
11531169
protected _validateSource(source?: AsyncIterator<S>, allowDestination = false) {
11541170
if (this._source || typeof this._createSource !== 'undefined')
11551171
throw new Error('The source cannot be changed after it has been set');
1156-
if (!source || !isFunction(source.read) || !isFunction(source.on))
1157-
throw new Error(`Invalid source: ${source}`);
1158-
if (!allowDestination && (source as any)._destination)
1159-
throw new Error('The source already has a destination');
1160-
return source as InternalSource<S>;
1172+
return _validateSource(source, allowDestination);
11611173
}
11621174

11631175
/**
@@ -1251,6 +1263,91 @@ function destinationFillBuffer<S>(this: InternalSource<S>) {
12511263
(this._destination as any)._fillBuffer();
12521264
}
12531265

1266+
export class MappingIterator<T, D = T> extends AsyncIterator<D> {
1267+
private _destroySource: boolean;
1268+
1269+
get readable() {
1270+
return this.source.readable;
1271+
}
1272+
1273+
set readable(readable) {
1274+
this.source.readable = readable;
1275+
}
1276+
1277+
constructor(
1278+
protected source: AsyncIterator<T>,
1279+
private transforms: ((item: any, iterator: AsyncIterator<any>) => any)[],
1280+
private upstream: AsyncIterator<any> = source,
1281+
options: { destroySource?: boolean } = {}
1282+
) {
1283+
// Subscribe the iterator directly upstream rather than the original source to avoid over-subscribing
1284+
// listeners to the original source
1285+
super();
1286+
this._destroySource = options.destroySource !== false;
1287+
if (upstream.done) {
1288+
this.close();
1289+
}
1290+
else {
1291+
_validateSource(upstream);
1292+
// @ts-ignore
1293+
upstream._destination = this;
1294+
upstream.on('end', onSourceEnd);
1295+
upstream.on('error', onSourceError);
1296+
upstream.on('readable', onSourceReadable);
1297+
}
1298+
}
1299+
1300+
read(): D | null {
1301+
const { source, transforms } = this;
1302+
let item, i;
1303+
while ((item = source.read()) !== null) {
1304+
i = transforms.length;
1305+
// Applies each of the transforms in sequence, and terminates
1306+
// early if a transform returns null
1307+
//
1308+
// Do not use a for-of loop here, it slows down transformations
1309+
// by approximately a factor of 2.
1310+
while (i-- >= 1 && (item = transforms[i](item, this)) !== null)
1311+
;
1312+
if (item !== null)
1313+
return item;
1314+
}
1315+
return null;
1316+
}
1317+
1318+
map<K>(map: (item: D, it: AsyncIterator<any>) => K | null, self?: any): AsyncIterator<K> {
1319+
return new MappingIterator<T, K>(this.source, [bind(map, self), ...this.transforms], this);
1320+
}
1321+
1322+
destroy(cause?: Error): void {
1323+
this.upstream.destroy(cause);
1324+
super.destroy(cause);
1325+
}
1326+
1327+
public close() {
1328+
this.upstream.removeListener('end', onSourceEnd);
1329+
this.upstream.removeListener('error', onSourceError);
1330+
this.upstream.removeListener('readable', onSourceReadable);
1331+
if (this._destroySource)
1332+
this.upstream.destroy();
1333+
scheduleTask(() => {
1334+
// @ts-ignore
1335+
delete this.upstream._destination;
1336+
delete this.source;
1337+
});
1338+
super.close();
1339+
}
1340+
}
1341+
1342+
function onSourceError<S>(this: InternalSource<S>, error: Error) {
1343+
this._destination.emit('error', error);
1344+
}
1345+
function onSourceEnd<S>(this: InternalSource<S>) {
1346+
this._destination.close();
1347+
}
1348+
function onSourceReadable<S>(this: InternalSource<S>) {
1349+
this._destination.emit('readable');
1350+
}
12541351

12551352
/**
12561353
An iterator that generates items based on a source iterator

test.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { ArrayIterator } from './dist/asynciterator.js'
2+
3+
let i = 0;
4+
const arr = new Array(20_000).fill(true).map(() => i++);
5+
6+
const now = Date.now();
7+
let times = 100;
8+
9+
const loop = () => {
10+
if (times === 0) {
11+
console.log('elapsed', Date.now() - now);
12+
return;
13+
}
14+
const iterator = new ArrayIterator(arr)
15+
.map((item) => item)
16+
.map((item) => item)
17+
.filter((item) => item % 2 === 0)
18+
;
19+
iterator.on('data', () => {}).on('end', () => {
20+
times -= 1;
21+
loop();
22+
});
23+
};
24+
25+
loop();

0 commit comments

Comments
 (0)