Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 106 additions & 11 deletions content/microservices/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,13 +440,15 @@ Along with the default error handling mechanisms, you can create a custom Except
```typescript
import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
import { BaseExceptionFilter } from '@nestjs/core';
import { KafkaContext } from '../ctx-host';
import { KafkaContext } from '@nestjs/microservices';
import { Producer } from 'kafkajs';

@Catch()
export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);

constructor(
private readonly producer: Producer,
private readonly maxRetries: number,
// Optional custom function executed when max retries are exceeded
private readonly skipHandler?: (message: any) => Promise<void>,
Expand Down Expand Up @@ -482,24 +484,60 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
return; // Stop propagating the exception
}

// If retry count is below the maximum, proceed with the default Exception Filter logic
super.catch(exception, host);
// Republish the message to the same topic with incremented retry count
try {
await this.republishWithRetry(kafkaContext, currentRetryCount + 1);
await this.commitOffset(kafkaContext);
} catch (republishError) {
this.logger.error('Failed to republish message for retry:', republishError);
// Fall back to default exception handling
super.catch(exception, host);
}
}

private getRetryCountFromContext(context: KafkaContext): number {
const headers = context.getMessage().headers || {};
const retryHeader = headers['retryCount'] || headers['retry-count'];
return retryHeader ? Number(retryHeader) : 0;
const retryHeader = headers['retry-count'];
if (!retryHeader) {
return 0;
}
// Header values are Buffers, so convert to string first
const value = Buffer.isBuffer(retryHeader)
? retryHeader.toString()
: String(retryHeader);
return parseInt(value, 10) || 0;
}

private async republishWithRetry(
context: KafkaContext,
retryCount: number,
): Promise<void> {
const topic = context.getTopic();
const message = context.getMessage();

await this.producer.send({
topic,
messages: [
{
key: message.key,
value: message.value,
headers: {
...message.headers,
'retry-count': retryCount.toString(),
},
},
],
});
}

private async commitOffset(context: KafkaContext): Promise<void> {
const consumer = context.getConsumer && context.getConsumer();
const consumer = context.getConsumer();
if (!consumer) {
throw new Error('Consumer instance is not available from KafkaContext.');
}

const topic = context.getTopic && context.getTopic();
const partition = context.getPartition && context.getPartition();
const topic = context.getTopic();
const partition = context.getPartition();
const message = context.getMessage();
const offset = message.offset;

Expand All @@ -521,18 +559,75 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
}
```

This filter offers a way to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption.
This filter offers a way to retry processing a Kafka event up to a configurable number of times. When an exception occurs, it republishes the message to the same topic with an incremented `retry-count` header, then commits the current offset. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption.

You can integrate this filter by adding it to your event handlers:
You can integrate this filter by registering it globally or at the controller level. Note that you need to provide a Kafka producer instance:

```typescript
@UseFilters(new KafkaMaxRetryExceptionFilter(5))
@@filename(kafka-retry.filter)
import { Inject, Injectable } from '@nestjs/common';
import { Producer } from 'kafkajs';

@Injectable()
export class AppKafkaRetryFilter extends KafkaMaxRetryExceptionFilter {
constructor(@Inject('KAFKA_PRODUCER') producer: Producer) {
super(producer, 5); // maxRetries = 5
}
}
@@switch
import { Inject, Injectable } from '@nestjs/common';

@Injectable()
export class AppKafkaRetryFilter extends KafkaMaxRetryExceptionFilter {
constructor(@Inject('KAFKA_PRODUCER') producer) {
super(producer, 5); // maxRetries = 5
}
}
```

```typescript
@@filename(my-event.handler)
@Controller()
@UseFilters(AppKafkaRetryFilter)
export class MyEventHandler {
@EventPattern('your-topic')
async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
// Your event processing logic...
}
}
@@switch
@Controller()
@UseFilters(AppKafkaRetryFilter)
export class MyEventHandler {
@Bind(Payload(), Ctx())
@EventPattern('your-topic')
async handleEvent(data, context) {
// Your event processing logic...
}
}
```

Make sure to provide the Kafka producer in your module:

```typescript
@@filename(app.module)
import { Kafka } from 'kafkajs';

@Module({
providers: [
AppKafkaRetryFilter,
{
provide: 'KAFKA_PRODUCER',
useFactory: async () => {
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
await producer.connect();
return producer;
},
},
],
})
export class AppModule {}
```

#### Commit offsets
Expand Down