-
Notifications
You must be signed in to change notification settings - Fork 7
feat: Improve kafka backpreassure #410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 18 commits
ea6bb3a
a633009
1b967fa
8fed538
6b516d4
13aaacb
f54213e
37b9574
f3b2848
094ad19
1814df8
f2adcea
36dbdb5
719d19f
3e19de7
6d733b2
9e81bc1
87e8a1c
fb7e355
876f49a
983424f
041eefd
8b76727
b68c633
259ac31
75f6c40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,4 +1,5 @@ | ||||||||||||||||||||||
| import { randomUUID } from 'node:crypto' | ||||||||||||||||||||||
| import { pipeline } from 'node:stream/promises' | ||||||||||||||||||||||
| import { setTimeout } from 'node:timers/promises' | ||||||||||||||||||||||
| import { | ||||||||||||||||||||||
| InternalError, | ||||||||||||||||||||||
|
|
@@ -195,14 +196,15 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) { | ||||||||||||||||||||||
| this.messageBatchStream = new KafkaMessageBatchStream< | ||||||||||||||||||||||
| DeserializedMessage<SupportedMessageValues<TopicsConfig>> | ||||||||||||||||||||||
| >( | ||||||||||||||||||||||
| (batch) => | ||||||||||||||||||||||
| this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)), | ||||||||||||||||||||||
| this.options.batchProcessingOptions, | ||||||||||||||||||||||
| >({ | ||||||||||||||||||||||
| batchSize: this.options.batchProcessingOptions.batchSize, | ||||||||||||||||||||||
| timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Use pipeline for better error handling and backpressure management | ||||||||||||||||||||||
| pipeline(this.consumerStream, this.messageBatchStream).catch((error) => | ||||||||||||||||||||||
| this.handlerError(error), | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
|
Comment on lines
+204
to
208
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incomplete comment. The comment appears to be truncated mid-sentence: "so no separate" doesn't complete the thought. Consider completing it, e.g., "so no separate error handler is needed for consumerStream." 📝 Suggested fix- // Use pipeline for better error handling and backpressure management.
- // pipeline() internally listens for errors on all streams, so no separate
+ // Use pipeline for better error handling and backpressure management.
+ // pipeline() internally listens for errors on all streams, so no separate error handler is needed.
pipeline(this.consumerStream, this.messageBatchStream).catch((error) =>
this.handlerError(error),
)📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||
| this.consumerStream.pipe(this.messageBatchStream) | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
CarlosGamero marked this conversation as resolved.
|
||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||
| throw new InternalError({ | ||||||||||||||||||||||
|
|
@@ -211,6 +213,12 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| cause: error, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (this.messageBatchStream) { | ||||||||||||||||||||||
| this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private async handleSyncStream( | ||||||||||||||||||||||
|
|
@@ -223,6 +231,13 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| ) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| private async handleSyncStreamBatch( | ||||||||||||||||||||||
| stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>, | ||||||||||||||||||||||
| ): Promise<void> { | ||||||||||||||||||||||
| for await (const messageBatch of stream) { | ||||||||||||||||||||||
| await this.consume(messageBatch[0].topic, messageBatch) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| async close(): Promise<void> { | ||||||||||||||||||||||
| if (!this.consumerStream && !this.messageBatchStream) { | ||||||||||||||||||||||
|
|
@@ -371,6 +386,7 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| ): Promise<MessageProcessingResult> { | ||||||||||||||||||||||
| try { | ||||||||||||||||||||||
| const isBatch = Array.isArray(messageOrBatch) | ||||||||||||||||||||||
| /* v8 ignore start */ | ||||||||||||||||||||||
| if (this.options.batchProcessingEnabled && !isBatch) { | ||||||||||||||||||||||
| throw new Error( | ||||||||||||||||||||||
| 'Batch processing is enabled, but a single message was passed to the handler', | ||||||||||||||||||||||
|
|
@@ -381,6 +397,7 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| 'Batch processing is disabled, but a batch of messages was passed to the handler', | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| /* v8 ignore stop */ | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| await handler( | ||||||||||||||||||||||
| // We need casting to match message type with handler type - it is safe as we verify the type above | ||||||||||||||||||||||
|
|
@@ -395,10 +412,7 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| const errorContext = Array.isArray(messageOrBatch) | ||||||||||||||||||||||
| ? { batchSize: messageOrBatch.length } | ||||||||||||||||||||||
| : { message: stringValueSerializer(messageOrBatch.value) } | ||||||||||||||||||||||
| this.handlerError(error, { | ||||||||||||||||||||||
| topic, | ||||||||||||||||||||||
| ...errorContext, | ||||||||||||||||||||||
| }) | ||||||||||||||||||||||
| this.handlerError(error, { topic, ...errorContext }) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| return { status: 'error', errorReason: 'handlerError' } | ||||||||||||||||||||||
|
|
@@ -443,7 +457,7 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||
| this.logger.debug(logDetails, 'Message commit failed') | ||||||||||||||||||||||
| if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) | ||||||||||||||||||||||
| throw error | ||||||||||||||||||||||
| this.handlerError(error) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -455,7 +469,7 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| error.apiCode && | ||||||||||||||||||||||
| commitErrorCodesToIgnore.has(error.apiCode) | ||||||||||||||||||||||
| ) { | ||||||||||||||||||||||
| this.logger.error( | ||||||||||||||||||||||
| this.logger.warn( | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
| apiCode: error.apiCode, | ||||||||||||||||||||||
| apiId: error.apiId, | ||||||||||||||||||||||
|
|
@@ -466,8 +480,7 @@ export abstract class AbstractKafkaConsumer< | |||||||||||||||||||||
| `Failed to commit message: ${error.message}`, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| // If error is not recognized, rethrow it | ||||||||||||||||||||||
| throw responseError | ||||||||||||||||||||||
| this.handlerError(error) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.