Skip to content

Commit 87e8a1c

Browse files
committed
Addressing final comment
1 parent 9e81bc1 commit 87e8a1c

1 file changed

Lines changed: 6 additions & 2 deletions

File tree

packages/kafka/lib/utils/KafkaMessageBatchStream.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,12 @@ export class KafkaMessageBatchStream<TMessage extends MessageWithTopicAndPartiti
110110
}
111111

112112
override _final(callback: CallbackFunction) {
113-
this.flushMessages()
114-
this.push(null) // Signal end-of-stream to the readable side
113+
// Clean timeout
114+
clearTimeout(this.existingTimeout)
115+
this.existingTimeout = undefined
116+
// If there are remaining messages -> skip them
117+
// As they are not committed, the next consumer will process them
118+
this.messages = []
115119
callback()
116120
}
117121

0 commit comments

Comments
 (0)