Skip to content

Commit 876f49a

Browse files
committed
Error handling
1 parent fb7e355 commit 876f49a

1 file changed

Lines changed: 4 additions & 2 deletions

File tree

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ export abstract class AbstractKafkaConsumer<
191191
})
192192

193193
this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
194-
this.consumerStream.on('error', (error) => this.handlerError(error))
195194

196195
if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) {
197196
this.messageBatchStream = new KafkaMessageBatchStream<
@@ -201,10 +200,13 @@ export abstract class AbstractKafkaConsumer<
201200
timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds,
202201
})
203202

204-
// Use pipeline for better error handling and backpressure management
203+
// Use pipeline for better error handling and backpressure management.
204+
// pipeline() internally listens for errors on all streams, so no separate
205205
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
206206
this.handlerError(error),
207207
)
208+
} else {
209+
this.consumerStream.on('error', (error) => this.handlerError(error))
208210
}
209211
} catch (error) {
210212
throw new InternalError({

0 commit comments

Comments
 (0)