@@ -527,7 +527,7 @@ export class AsyncIterator<T> extends EventEmitter {
527527 @returns {module:asynciterator.AsyncIterator } A new iterator with at most the given number of items
528528 */
529529 take ( limit : number ) : AsyncIterator < T > {
530- return this . transform ( { limit } ) ;
530+ return new UntilIterator ( this , limit ) ;
531531 }
532532
533533 /**
@@ -795,14 +795,12 @@ export function identity<S>(item: S): typeof item {
795795 return item ;
796796}
797797
798-
799798/**
800799 An iterator that synchronously transforms every item from its source
801800 by applying a mapping function.
802801 @extends module:asynciterator.AsyncIterator
803802*/
804- export class MappingIterator < S , D = S > extends AsyncIterator < D > {
805- protected readonly _map : MapFunction < S , D > ;
803+ export abstract class SynchronousTransformIterator < S , D = S > extends AsyncIterator < D > {
806804 protected readonly _source : InternalSource < S > ;
807805 protected readonly _destroySource : boolean ;
808806
@@ -811,11 +809,9 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
811809 */
812810 constructor (
813811 source : AsyncIterator < S > ,
814- map : MapFunction < S , D > = identity as MapFunction < S , D > ,
815812 options : SourcedIteratorOptions = { }
816813 ) {
817814 super ( ) ;
818- this . _map = map ;
819815 this . _source = ensureSourceAvailable ( source ) ;
820816 this . _destroySource = options . destroySource !== false ;
821817
@@ -833,15 +829,16 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
833829 }
834830 }
835831
832+ protected abstract safeRead ( ) : D | null ;
833+
836834 /* Tries to read the next item from the iterator. */
837835 read ( ) : D | null {
838836 if ( ! this . done ) {
839837 // Try to read an item that maps to a non-null value
840838 if ( this . _source . readable ) {
841- let item : S | null , mapped : D | null ;
842- while ( ( item = this . _source . read ( ) ) !== null ) {
843- if ( ( mapped = this . _map ( item ) ) !== null )
844- return mapped ;
839+ const item = this . safeRead ( ) ;
840+ if ( item !== null ) {
841+ return item ;
845842 }
846843 }
847844 this . readable = false ;
@@ -865,6 +862,89 @@ export class MappingIterator<S, D = S> extends AsyncIterator<D> {
865862 }
866863}
867864
865+ export class MappingIterator < S , D = S > extends SynchronousTransformIterator < S , D > {
866+ private _map : MapFunction < S , D > ;
867+
868+ constructor (
869+ source : AsyncIterator < S > ,
870+ map : MapFunction < S , D > = identity as MapFunction < S , D > ,
871+ options : SourcedIteratorOptions = { }
872+ ) {
873+ super ( source , options ) ;
874+ this . _map = map ;
875+ }
876+
877+ safeRead ( ) {
878+ let item : S | null ;
879+ while ( ( item = this . _source . read ( ) ) !== null ) {
880+ const mapped = this . _map ( item ) ;
881+ if ( mapped !== null )
882+ return mapped ;
883+ }
884+ return null ;
885+ }
886+
887+ map < K > ( map : MapFunction < D , K > , self ?: any ) : AsyncIterator < K > {
888+ return new CompositeMappingIterator ( this . _source , [ this . _map , bind ( map , self ) ] , this ) ;
889+ }
890+ }
891+
892+ export class CompositeMappingIterator < S , D = S > extends SynchronousTransformIterator < S , D > {
893+ constructor (
894+ private root : AsyncIterator < S > ,
895+ private mappings : MapFunction < any , any > [ ] = [ ] ,
896+ source : AsyncIterator < any > ,
897+ options : SourcedIteratorOptions = { } ,
898+ ) {
899+ super ( source , options ) ;
900+ }
901+
902+ safeRead ( ) {
903+ // TODO: See if this is actually necessary
904+ // A source should only be read from if readable is true
905+ if ( ! this . root . readable ) {
906+ this . readable = false ;
907+ // TODO: See if this should be here
908+ if ( this . root . done )
909+ this . close ( ) ;
910+ return null ;
911+ }
912+
913+ let mapped : any = null ;
914+ while ( mapped === null && ( mapped = this . root . read ( ) ) !== null ) {
915+ for ( let i = 0 ; i < this . mappings . length ; i ++ ) {
916+ mapped = this . mappings [ i ] ( mapped ) ;
917+ if ( mapped === null )
918+ break ;
919+ }
920+ }
921+ return mapped ;
922+ }
923+
924+ map < K > ( map : MapFunction < D , K > , self ?: any ) : AsyncIterator < K > {
925+ return new CompositeMappingIterator ( this . root , [ ...this . mappings , bind ( map , self ) ] , this ) ;
926+ }
927+ }
928+
929+ export class UntilIterator < S > extends SynchronousTransformIterator < S > {
930+ constructor (
931+ source : AsyncIterator < S > ,
932+ private limit : number ,
933+ options : SourcedIteratorOptions = { }
934+ ) {
935+ super ( source , options ) ;
936+ }
937+
938+ safeRead ( ) {
939+ if ( this . limit <= 0 ) {
940+ this . close ( ) ;
941+ return null ;
942+ }
943+ this . limit -= 1 ;
944+ return this . _source . read ( ) ;
945+ }
946+ }
947+
868948// Validates an AsyncIterator for use as a source within another AsyncIterator
869949function ensureSourceAvailable < S > ( source ?: AsyncIterator < S > , allowDestination = false ) {
870950 if ( ! source || ! isFunction ( source . read ) || ! isFunction ( source . on ) )
0 commit comments