diff --git a/content/microservices/kafka.md b/content/microservices/kafka.md index 2660093a67..13cd4d9819 100644 --- a/content/microservices/kafka.md +++ b/content/microservices/kafka.md @@ -440,13 +440,15 @@ Along with the default error handling mechanisms, you can create a custom Except ```typescript import { Catch, ArgumentsHost, Logger } from '@nestjs/common'; import { BaseExceptionFilter } from '@nestjs/core'; -import { KafkaContext } from '../ctx-host'; +import { KafkaContext } from '@nestjs/microservices'; +import { Producer } from 'kafkajs'; @Catch() export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter { private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name); constructor( + private readonly producer: Producer, private readonly maxRetries: number, // Optional custom function executed when max retries are exceeded private readonly skipHandler?: (message: any) => Promise, @@ -482,24 +484,60 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter { return; // Stop propagating the exception } - // If retry count is below the maximum, proceed with the default Exception Filter logic - super.catch(exception, host); + // Republish the message to the same topic with incremented retry count + try { + await this.republishWithRetry(kafkaContext, currentRetryCount + 1); + await this.commitOffset(kafkaContext); + } catch (republishError) { + this.logger.error('Failed to republish message for retry:', republishError); + // Fall back to default exception handling + super.catch(exception, host); + } } private getRetryCountFromContext(context: KafkaContext): number { const headers = context.getMessage().headers || {}; - const retryHeader = headers['retryCount'] || headers['retry-count']; - return retryHeader ? Number(retryHeader) : 0; + const retryHeader = headers['retry-count']; + if (!retryHeader) { + return 0; + } + // Header values are Buffers, so convert to string first + const value = Buffer.isBuffer(retryHeader) + ? retryHeader.toString() + : String(retryHeader); + return parseInt(value, 10) || 0; + } + + private async republishWithRetry( + context: KafkaContext, + retryCount: number, + ): Promise { + const topic = context.getTopic(); + const message = context.getMessage(); + + await this.producer.send({ + topic, + messages: [ + { + key: message.key, + value: message.value, + headers: { + ...message.headers, + 'retry-count': retryCount.toString(), + }, + }, + ], + }); } private async commitOffset(context: KafkaContext): Promise { - const consumer = context.getConsumer && context.getConsumer(); + const consumer = context.getConsumer(); if (!consumer) { throw new Error('Consumer instance is not available from KafkaContext.'); } - const topic = context.getTopic && context.getTopic(); - const partition = context.getPartition && context.getPartition(); + const topic = context.getTopic(); + const partition = context.getPartition(); const message = context.getMessage(); const offset = message.offset; @@ -521,18 +559,75 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter { } ``` -This filter offers a way to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption. +This filter offers a way to retry processing a Kafka event up to a configurable number of times. When an exception occurs, it republishes the message to the same topic with an incremented `retry-count` header, then commits the current offset. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption. -You can integrate this filter by adding it to your event handlers: +You can integrate this filter by registering it globally or at the controller level. Note that you need to provide a Kafka producer instance: ```typescript -@UseFilters(new KafkaMaxRetryExceptionFilter(5)) +@@filename(kafka-retry.filter) +import { Inject, Injectable } from '@nestjs/common'; +import { Producer } from 'kafkajs'; + +@Injectable() +export class AppKafkaRetryFilter extends KafkaMaxRetryExceptionFilter { + constructor(@Inject('KAFKA_PRODUCER') producer: Producer) { + super(producer, 5); // maxRetries = 5 + } +} +@@switch +import { Inject, Injectable } from '@nestjs/common'; + +@Injectable() +export class AppKafkaRetryFilter extends KafkaMaxRetryExceptionFilter { + constructor(@Inject('KAFKA_PRODUCER') producer) { + super(producer, 5); // maxRetries = 5 + } +} +``` + +```typescript +@@filename(my-event.handler) +@Controller() +@UseFilters(AppKafkaRetryFilter) export class MyEventHandler { @EventPattern('your-topic') async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) { // Your event processing logic... } } +@@switch +@Controller() +@UseFilters(AppKafkaRetryFilter) +export class MyEventHandler { + @Bind(Payload(), Ctx()) + @EventPattern('your-topic') + async handleEvent(data, context) { + // Your event processing logic... + } +} +``` + +Make sure to provide the Kafka producer in your module: + +```typescript +@@filename(app.module) +import { Kafka } from 'kafkajs'; + +@Module({ + providers: [ + AppKafkaRetryFilter, + { + provide: 'KAFKA_PRODUCER', + useFactory: async () => { + const kafka = new Kafka({ brokers: ['localhost:9092'] }); + const producer = kafka.producer(); + await producer.connect(); + return producer; + }, + }, + ], +}) +export class AppModule {} ``` #### Commit offsets