@@ -61,6 +61,19 @@ export class KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartiti
6161 private pendingCallback : CallbackFunction | undefined
6262 private isBackPressured : boolean
6363
64+ // Debug counters for diagnosing flow issues
65+ public readonly debug = {
66+ writeCalls : 0 ,
67+ readCalls : 0 ,
68+ flushCalls : 0 ,
69+ pushCalls : 0 ,
70+ pushBackpressured : 0 ,
71+ callbacksHeld : 0 ,
72+ callbacksReleased : 0 ,
73+ flushDeferredByBackpressure : 0 ,
74+ finalCalled : false ,
75+ }
76+
6477 constructor ( options : KafkaMessageBatchOptions ) {
6578 super ( { objectMode : true , readableHighWaterMark : options . readableHighWaterMark } )
6679 this . batchSize = options . batchSize
@@ -75,9 +88,11 @@ export class KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartiti
7588 * by calling the pending callback that was held during backpressure.
7689 */
7790 override _read ( ) {
91+ this . debug . readCalls ++
7892 this . isBackPressured = false
7993 if ( ! this . pendingCallback ) return
8094
95+ this . debug . callbacksReleased ++
8196 const cb = this . pendingCallback
8297 this . pendingCallback = undefined
8398 cb ( ) // Resume the writable side
@@ -89,6 +104,7 @@ export class KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartiti
89104 * Implements backpressure by holding the callback when downstream cannot consume.
90105 */
91106 override _write ( message : TMessage , _encoding : BufferEncoding , callback : CallbackFunction ) {
107+ this . debug . writeCalls ++
92108 let canContinue = true
93109
94110 try {
@@ -104,12 +120,17 @@ export class KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartiti
104120 }
105121 } finally {
106122 // Backpressure handling: hold the callback if push() returned false
107- if ( ! canContinue ) this . pendingCallback = callback
108- else callback ( )
123+ if ( ! canContinue ) {
124+ this . debug . callbacksHeld ++
125+ this . pendingCallback = callback
126+ } else {
127+ callback ( )
128+ }
109129 }
110130 }
111131
112132 override _final ( callback : CallbackFunction ) {
133+ this . debug . finalCalled = true
113134 // Clean timeout
114135 clearTimeout ( this . existingTimeout )
115136 this . existingTimeout = undefined
@@ -120,11 +141,21 @@ export class KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartiti
120141 callback ( )
121142 }
122143
144+ public getPendingCount ( ) : number {
145+ return this . messages . length
146+ }
147+
148+ public hasPendingCallback ( ) : boolean {
149+ return this . pendingCallback !== undefined
150+ }
151+
123152 private flushMessages ( ) : boolean {
124153 clearTimeout ( this . existingTimeout )
125154 this . existingTimeout = undefined
155+ this . debug . flushCalls ++
126156
127157 if ( this . isBackPressured ) {
158+ this . debug . flushDeferredByBackpressure ++
128159 this . existingTimeout = setTimeout ( ( ) => this . flushMessages ( ) , this . timeout )
129160 return false
130161 }
@@ -146,7 +177,9 @@ export class KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartiti
146177 // same tick also return false, so the last value correctly reflects backpressure.
147178 let canContinue = true
148179 for ( const messagesForKey of Object . values ( messagesByTopicPartition ) ) {
180+ this . debug . pushCalls ++
149181 canContinue = this . push ( messagesForKey )
182+ if ( ! canContinue ) this . debug . pushBackpressured ++
150183 }
151184
152185 if ( ! canContinue ) this . isBackPressured = true
0 commit comments