Skip to content

Commit 9af21cb

Browse files
jacoscazRubenVerborgh
authored andcommitted
Add destroySource to WrappingIterator.
1 parent 348e53f commit 9af21cb

3 files changed

Lines changed: 115 additions & 11 deletions

File tree

asynciterator.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,9 +1998,11 @@ class HistoryReader<T> {
19981998
*/
19991999
export class WrappingIterator<T> extends AsyncIterator<T> {
20002000
protected _source: InternalSource<T> | null = null;
2001+
protected _destroySource: boolean;
20012002

2002-
constructor(source?: MaybePromise<IterableSource<T>>) {
2003+
constructor(source?: MaybePromise<IterableSource<T>>, opts?: SourcedIteratorOptions) {
20032004
super();
2005+
this._destroySource = opts?.destroySource !== false;
20042006

20052007
// If promise, set up a temporary source and replace when ready
20062008
if (isPromise(source)) {
@@ -2018,9 +2020,6 @@ export class WrappingIterator<T> extends AsyncIterator<T> {
20182020

20192021
set source(value: IterableSource<T>) {
20202022
let source: InternalSource<T> = value as any;
2021-
// Do not change sources if the iterator is already done
2022-
if (this.done)
2023-
return;
20242023
if (this._source !== null)
20252024
throw new Error('The source cannot be changed after it has been set');
20262025

@@ -2051,6 +2050,13 @@ export class WrappingIterator<T> extends AsyncIterator<T> {
20512050
source = ensureSourceAvailable(source);
20522051
}
20532052

2053+
// Do not change sources if the iterator is already done
2054+
if (this.done) {
2055+
if (this._destroySource && isFunction(source.destroy))
2056+
source.destroy();
2057+
return;
2058+
}
2059+
20542060
// Set up event handling
20552061
source[DESTINATION] = this;
20562062
source.on('end', destinationClose);
@@ -2073,15 +2079,17 @@ export class WrappingIterator<T> extends AsyncIterator<T> {
20732079
}
20742080

20752081
protected _end(destroy: boolean = false) {
2076-
super._end(destroy);
2077-
// Clean up event handlers
20782082
if (this._source !== null) {
20792083
this._source.removeListener('end', destinationClose);
20802084
this._source.removeListener('error', destinationEmitError);
20812085
this._source.removeListener('readable', destinationSetReadable);
20822086
delete this._source[DESTINATION];
2087+
2088+
if (this._destroySource && isFunction(this._source.destroy))
2089+
this._source.destroy();
20832090
this._source = null;
20842091
}
2092+
super._end(destroy);
20852093
}
20862094
}
20872095

@@ -2098,7 +2106,7 @@ export class WrappingIterator<T> extends AsyncIterator<T> {
20982106
export function wrap<T>(source?: MaybePromise<IterableSource<T>> | null,
20992107
options?: TransformIteratorOptions<T>): AsyncIterator<T> {
21002108
// TransformIterator if TransformIteratorOptions were specified
2101-
if (options)
2109+
if (options && ('autoStart' in options || 'optional' in options || 'source' in options || 'maxBufferSize' in options))
21022110
return new TransformIterator<T>(source as MaybePromise<AsyncIterator<T>>, options);
21032111

21042112
// Empty iterator if no source specified
@@ -2107,7 +2115,7 @@ export function wrap<T>(source?: MaybePromise<IterableSource<T>> | null,
21072115

21082116
// Unwrap promised sources
21092117
if (isPromise<T>(source))
2110-
return new WrappingIterator(source);
2118+
return new WrappingIterator(source, options);
21112119

21122120
// Directly return any AsyncIterator
21132121
if (source instanceof AsyncIterator)
@@ -2117,7 +2125,7 @@ export function wrap<T>(source?: MaybePromise<IterableSource<T>> | null,
21172125
if (Array.isArray(source))
21182126
return fromArray<T>(source);
21192127
if (isIterable(source) || isIterator(source) || isEventEmitter(source))
2120-
return new WrappingIterator<T>(source);
2128+
return new WrappingIterator<T>(source, options);
21212129

21222130
// Other types are unsupported
21232131
throw new TypeError(`Invalid source: ${source}`);

test/TransformIterator-test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ describe('TransformIterator', () => {
3434
});
3535
});
3636

37-
describe('the result when called through `wrap`', () => {
37+
describe('the result when called through `wrap` and a TransformIterator option', () => {
3838
let instance;
39-
before(() => { instance = wrap({}, {}); });
39+
before(() => { instance = wrap({}, { autoStart: false }); });
4040

4141
it('should be an TransformIterator object', () => {
4242
instance.should.be.an.instanceof(TransformIterator);

test/WrappingIterator-test.js

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,4 +585,100 @@ describe('wrap', () => {
585585
expect(await iterator.toArray()).to.deep.equal([0, 1, 2, 3, 4]);
586586
});
587587
});
588+
589+
describe('with a source that is explicitly set to be destroyed', () => {
590+
let source;
591+
592+
before(async () => {
593+
source = new IntegerIterator({ start: 1, step: 1, end: 4 });
594+
sinon.spy(source, 'destroy');
595+
await wrap(Promise.resolve(source), { destroySource: true }).toArray();
596+
});
597+
598+
it('should have destroyed the source', async () => {
599+
source.destroy.should.have.been.calledOnce;
600+
});
601+
});
602+
603+
describe('with a source that is explicitly set not to be destroyed', () => {
604+
let source;
605+
606+
before(async () => {
607+
source = new IntegerIterator({ start: 1, step: 1, end: 4 });
608+
sinon.spy(source, 'destroy');
609+
await wrap(Promise.resolve(source), { destroySource: false }).toArray();
610+
});
611+
612+
it('should have destroyed the source', async () => {
613+
source.destroy.should.not.have.been.called;
614+
});
615+
});
616+
617+
describe('with a source that is implicitly set to be destroyed', () => {
618+
let source;
619+
620+
before(async () => {
621+
source = new IntegerIterator({ start: 1, step: 1, end: 4 });
622+
sinon.spy(source, 'destroy');
623+
await wrap(Promise.resolve(source)).toArray();
624+
});
625+
626+
it('should have destroyed the source', async () => {
627+
source.destroy.should.have.been.calledOnce;
628+
});
629+
});
630+
631+
describe('with a source provided by a slow-resolving promise that is explicitly set to be destroyed', () => {
632+
let source;
633+
let iterator;
634+
let resolvePromise;
635+
636+
before(async () => {
637+
source = new IntegerIterator({ start: 1, step: 1, end: 4 });
638+
sinon.spy(source, 'destroy');
639+
iterator = wrap(new Promise(resolve => { resolvePromise = resolve; }), { destroySource: true });
640+
});
641+
642+
describe('Before the promise resolves', () => {
643+
it('should allow destruction', () => {
644+
iterator.destroy();
645+
});
646+
});
647+
648+
describe('After the promise resolves', () => {
649+
before(() => {
650+
resolvePromise(source);
651+
});
652+
it('should have destroyed the source', async () => {
653+
source.destroy.should.have.been.calledOnce;
654+
});
655+
});
656+
});
657+
658+
describe('with a source provided by a slow-resolving promise that is explicitly set not to be destroyed', () => {
659+
let source;
660+
let iterator;
661+
let resolvePromise;
662+
663+
before(async () => {
664+
source = new IntegerIterator({ start: 1, step: 1, end: 4 });
665+
sinon.spy(source, 'destroy');
666+
iterator = wrap(new Promise(resolve => { resolvePromise = resolve; }), { destroySource: false });
667+
});
668+
669+
describe('Before the promise resolves', () => {
670+
it('should allow destruction', () => {
671+
iterator.destroy();
672+
});
673+
});
674+
675+
describe('After the promise resolves', () => {
676+
before(() => {
677+
resolvePromise(source);
678+
});
679+
it('should not have destroyed the source', async () => {
680+
source.destroy.should.not.have.been.called;
681+
});
682+
});
683+
});
588684
});

0 commit comments

Comments
 (0)