11import Logger , { ILogger } from 'js-logger' ;
22import { BaseListener , BaseObserver } from './BaseObserver.js' ;
33
4- export type DataStreamOptions = {
4+ export type DataStreamOptions < ParsedData , SourceData > = {
5+ mapLine ?: ( line : SourceData ) => ParsedData ;
6+
57 /**
68 * Close the stream if any consumer throws an error
79 */
@@ -33,8 +35,8 @@ export const DEFAULT_PRESSURE_LIMITS = {
3335 * native JS streams or async iterators.
3436 * This is handy for environments such as React Native which need polyfills for the above.
3537 */
36- export class DataStream < Data extends any = any > extends BaseObserver < DataStreamListener < Data > > {
37- dataQueue : Data [ ] ;
38+ export class DataStream < ParsedData , SourceData = any > extends BaseObserver < DataStreamListener < ParsedData > > {
39+ dataQueue : SourceData [ ] ;
3840
3941 protected isClosed : boolean ;
4042
@@ -43,11 +45,14 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
4345
4446 protected logger : ILogger ;
4547
46- constructor ( protected options ?: DataStreamOptions ) {
48+ protected mapLine : ( line : SourceData ) => ParsedData ;
49+
50+ constructor ( protected options ?: DataStreamOptions < ParsedData , SourceData > ) {
4751 super ( ) ;
4852 this . processingPromise = null ;
4953 this . isClosed = false ;
5054 this . dataQueue = [ ] ;
55+ this . mapLine = options ?. mapLine ?? ( ( line ) => line as any ) ;
5156
5257 this . logger = options ?. logger ?? Logger . get ( 'DataStream' ) ;
5358
@@ -85,7 +90,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
8590 /**
8691 * Enqueues data for the consumers to read
8792 */
88- enqueueData ( data : Data ) {
93+ enqueueData ( data : SourceData ) {
8994 if ( this . isClosed ) {
9095 throw new Error ( 'Cannot enqueue data into closed stream.' ) ;
9196 }
@@ -100,7 +105,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
100105 * Reads data once from the data stream
101106 * @returns a Data payload or Null if the stream closed.
102107 */
103- async read ( ) : Promise < Data | null > {
108+ async read ( ) : Promise < ParsedData | null > {
104109 if ( this . closed ) {
105110 return null ;
106111 }
@@ -129,7 +134,7 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
129134 /**
130135 * Executes a callback for each data item in the stream
131136 */
132- forEach ( callback : DataStreamCallback < Data > ) {
137+ forEach ( callback : DataStreamCallback < ParsedData > ) {
133138 if ( this . dataQueue . length <= this . lowWatermark ) {
134139 this . iterateAsyncErrored ( async ( l ) => l . lowWater ?.( ) ) ;
135140 }
@@ -139,58 +144,40 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
139144 } ) ;
140145 }
141146
142- protected async processQueue ( ) {
147+ protected processQueue ( ) {
143148 if ( this . processingPromise ) {
144149 return ;
145150 }
146151
147- /**
148- * Allow listeners to mutate the queue before processing.
149- * This allows for operations such as dropping or compressing data
150- * on high water or requesting more data on low water.
151- */
152- if ( this . dataQueue . length >= this . highWatermark ) {
153- await this . iterateAsyncErrored ( async ( l ) => l . highWater ?.( ) ) ;
154- }
155-
156152 const promise = ( this . processingPromise = this . _processQueue ( ) ) ;
157153 promise . finally ( ( ) => {
158154 return ( this . processingPromise = null ) ;
159155 } ) ;
160156 return promise ;
161157 }
162158
163- /**
164- * Creates a new data stream which is a map of the original
165- */
166- map < ReturnData > ( callback : ( data : Data ) => ReturnData ) : DataStream < ReturnData > {
167- const stream = new DataStream ( this . options ) ;
168- const l = this . registerListener ( {
169- data : async ( data ) => {
170- stream . enqueueData ( callback ( data ) ) ;
171- } ,
172- closed : ( ) => {
173- stream . close ( ) ;
174- l ?.( ) ;
175- }
176- } ) ;
177-
178- return stream ;
179- }
180-
181159 protected hasDataReader ( ) {
182160 return Array . from ( this . listeners . values ( ) ) . some ( ( l ) => ! ! l . data ) ;
183161 }
184162
185163 protected async _processQueue ( ) {
164+ /**
165+ * Allow listeners to mutate the queue before processing.
166+ * This allows for operations such as dropping or compressing data
167+ * on high water or requesting more data on low water.
168+ */
169+ if ( this . dataQueue . length >= this . highWatermark ) {
170+ await this . iterateAsyncErrored ( async ( l ) => l . highWater ?.( ) ) ;
171+ }
172+
186173 if ( this . isClosed || ! this . hasDataReader ( ) ) {
187- await Promise . resolve ( ) ;
188174 return ;
189175 }
190176
191177 if ( this . dataQueue . length ) {
192178 const data = this . dataQueue . shift ( ) ! ;
193- await this . iterateAsyncErrored ( async ( l ) => l . data ?.( data ) ) ;
179+ const mapped = this . mapLine ( data ) ;
180+ await this . iterateAsyncErrored ( async ( l ) => l . data ?.( mapped ) ) ;
194181 }
195182
196183 if ( this . dataQueue . length <= this . lowWatermark ) {
@@ -202,14 +189,17 @@ export class DataStream<Data extends any = any> extends BaseObserver<DataStreamL
202189 this . notifyDataAdded = null ;
203190 }
204191
205- if ( this . dataQueue . length ) {
192+ if ( this . dataQueue . length > 0 ) {
206193 // Next tick
207194 setTimeout ( ( ) => this . processQueue ( ) ) ;
208195 }
209196 }
210197
211- protected async iterateAsyncErrored ( cb : ( l : BaseListener ) => Promise < void > ) {
212- for ( let i of Array . from ( this . listeners . values ( ) ) ) {
198+ protected async iterateAsyncErrored ( cb : ( l : Partial < DataStreamListener < ParsedData > > ) => Promise < void > ) {
199+ // Important: We need to copy the listeners, as calling a listener could result in adding another
200+ // listener, resulting in infinite loops.
201+ const listeners = Array . from ( this . listeners . values ( ) ) ;
202+ for ( let i of listeners ) {
213203 try {
214204 await cb ( i ) ;
215205 } catch ( ex ) {
0 commit comments