Skip to content

feat(infra): add Kafka Dead Letter Queue config to platform-infra (STA-218)#193

Merged
Puneethkumarck merged 5 commits into
mainfrom
feature/STA-218-kafka-dlq
Mar 17, 2026
Merged

feat(infra): add Kafka Dead Letter Queue config to platform-infra (STA-218)#193
Puneethkumarck merged 5 commits into
mainfrom
feature/STA-218-kafka-dlq

Conversation

@Puneethkumarck

@Puneethkumarck Puneethkumarck commented Mar 17, 2026

Copy link
Copy Markdown
Owner

Summary

  • Add shared KafkaDlqConfig to platform-infra providing a DefaultErrorHandler with DeadLetterPublishingRecoverer that routes failed consumer records to <topic>.dlt after exponential back-off retries (3 attempts: 1s → 2s → 4s)
  • Spring Boot auto-detects the CommonErrorHandler bean and wires it into all @KafkaListener container factories
  • Includes unit tests for bean creation

Closes STA-218

Test plan

  • KafkaDlqConfigTest — verifies DeadLetterPublishingRecoverer and DefaultErrorHandler beans are created
  • Integration: verify DLQ routing in a service with @KafkaListener

🤖 Generated with Claude Code

Summary by CodeRabbit

  • Tests

    • Added unit tests validating Dead Letter Queue and error handler bean creation.
  • New Features

    • Implemented Dead Letter Queue (DLQ) messaging infrastructure with configurable exponential backoff retry strategy and automatic routing to dedicated error-recovery topics.
  • Chores

    • Added Spring Boot test starter and JUnit Platform launcher dependencies to support testing.

…A-218)

Shared KafkaDlqConfig with DefaultErrorHandler + DeadLetterPublishingRecoverer.
Failed messages retry 3x with exponential backoff (1s, 2s, 4s) then route
to <topic>.dlt. Auto-activates via @ConditionalOnClass for all Kafka consumers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Puneethkumarck Puneethkumarck added the infra Infrastructure / build label Mar 17, 2026
@coderabbitai

coderabbitai Bot commented Mar 17, 2026

Copy link
Copy Markdown

Warning

Rate limit exceeded

@Puneethkumarck has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 3 minutes and 47 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: b71b162b-9b2c-45bf-bcb0-b8bbfb56cd30

📥 Commits

Reviewing files that changed from the base of the PR and between 1c468d1 and 46a1d72.

📒 Files selected for processing (2)
  • platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java
  • platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java

Walkthrough

Adds Kafka Dead Letter Queue support: a Spring configuration providing a DeadLetterPublishingRecoverer and a DefaultErrorHandler with exponential backoff retries, plus test dependencies and unit tests verifying bean creation.

Changes

Cohort / File(s) Summary
Build Dependencies
platform-infra/build.gradle.kts
Added test dependencies: org.springframework.boot:spring-boot-starter-test and runtime org.junit.platform:junit-platform-launcher.
Kafka DLQ Configuration
platform-infra/src/main/java/.../messaging/KafkaDlqConfig.java
New @Configuration class (conditional on KafkaTemplate) exposing DeadLetterPublishingRecoverer (publishes to <topic>.dlt on same partition, logs errors) and DefaultErrorHandler with ExponentialBackOffWithMaxRetries (3 retries, 1s initial, multiplier 2.0, max 10s).
Configuration Tests
platform-infra/src/test/java/.../messaging/KafkaDlqConfigTest.java
New unit tests verifying bean instantiation for DeadLetterPublishingRecoverer and DefaultErrorHandler using a mocked KafkaOperations.

Sequence Diagram(s)

sequenceDiagram
  participant Listener as Spring Kafka Listener
  participant ErrorHandler as DefaultErrorHandler
  participant Recoverer as DeadLetterPublishingRecoverer
  participant Kafka as Kafka Broker

  Listener->>ErrorHandler: onListenerException(record, exception)
  ErrorHandler->>ErrorHandler: apply ExponentialBackOff retries (3, 1s→10s)
  ErrorHandler-->>Recoverer: delegate after retries exhausted
  Recoverer->>Kafka: publish to "<original-topic>.dlt" (same partition)
  Recoverer-->>ErrorHandler: log publishing result
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the primary change—adding Kafka DLQ configuration to platform-infra—and is clear, specific, and references the ticket (STA-218).
Description check ✅ Passed The description covers Summary, Related Issue (STA-218), Type of Change, What Changed, and How Was It Tested with test results. Security Considerations is marked N/A appropriately. Only the Checklist items are incomplete, but core content is present.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/STA-218-kafka-dlq
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`:
- Around line 49-62: The KafkaDlqConfig is creating a DefaultErrorHandler bean
unconditionally which can conflict with downstream services; update the bean
definitions (the kafkaErrorHandler method that returns DefaultErrorHandler and
the bean method that produces DeadLetterPublishingRecoverer) to be conditional
by adding `@ConditionalOnMissingBean`(DefaultErrorHandler.class) on the
DefaultErrorHandler bean and
`@ConditionalOnMissingBean`(DeadLetterPublishingRecoverer.class) on the recoverer
bean so the shared infra only provides these beans when an application hasn’t
defined its own; keep the existing configuration logic inside the methods
(ExponentialBackOffWithMaxRetries, logging, etc.) unchanged.
- Around line 37-47: The current deadLetterPublishingRecoverer in KafkaDlqConfig
uses record.partition() when constructing the DLT TopicPartition (in
deadLetterPublishingRecoverer -> recoverer), which can fail if the .dlt topic
has fewer partitions; replace the custom lambda resolver with the single-arg
DeadLetterPublishingRecoverer constructor (new
DeadLetterPublishingRecoverer(kafkaOperations)) so Spring Kafka's default DLT
suffix and destination resolving handles partition selection, or alternately
modify the lambda to return a destination that lets the producer choose
partition (i.e., do not hardcode record.partition()); update the
deadLetterPublishingRecoverer method to use that change.
- Around line 24-25: The Javadoc comment is inconsistent with the MAX_RETRIES
constant: MAX_RETRIES = 3 results in 4 total attempts (1 initial + 3 retries).
Fix by either updating the Javadoc text to state "4 total attempts (1 initial +
3 retries) with exponential back-off (1s, 2s, 4s)" or change the constant
MAX_RETRIES to 2 to preserve "3 attempts" semantics; update the comment on the
same block (and any other mention at line 32) to match whichever choice you make
so documentation and the MAX_RETRIES constant are consistent.

In
`@platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java`:
- Around line 20-41: The tests only assert types — enhance them to assert
behavior: call config.deadLetterPublishingRecoverer(kafkaOperations) and inspect
its destination resolver (via reflection or by exposing it) to assert it maps a
ConsumerRecord with topic "orders" to a TopicPartition "orders.dlt"; inspect the
recoverer's BackOff (or the BackOff passed into DeadLetterPublishingRecoverer)
to assert expected interval/retry settings; and when creating
config.kafkaErrorHandler(recoverer) assert the returned DefaultErrorHandler is
configured with the same recoverer instance (use reflection to read the
handler's recoverer field or use any provided accessor) so the error handler
actually delegates to the recoverer.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: b5d979bf-5baf-4809-9e50-92117612b348

📥 Commits

Reviewing files that changed from the base of the PR and between f34136a and ca74332.

📒 Files selected for processing (3)
  • platform-infra/build.gradle.kts
  • platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java
  • platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java

- Remove verbose Javadoc from config class
- Replace inline mock() with @ExtendWith(MockitoExtension.class) + @mock
- Remove @SuppressWarnings("unchecked") no longer needed

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java (2)

5-7: 🛠️ Refactor suggestion | 🟠 Major

Make shared DLQ beans conditional to prevent downstream bean collisions.

In platform-infra, these beans should not override service-specific handlers. Add @ConditionalOnMissingBean for both beans.

Suggested change
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.kafka.listener.CommonErrorHandler;
@@
     `@Bean`
+    `@ConditionalOnMissingBean`(DeadLetterPublishingRecoverer.class)
     public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
             KafkaOperations<String, Object> kafkaOperations) {
@@
     `@Bean`
+    `@ConditionalOnMissingBean`(CommonErrorHandler.class)
     public DefaultErrorHandler kafkaErrorHandler(DeadLetterPublishingRecoverer recoverer) {
In Spring Boot, if two beans of type CommonErrorHandler/DefaultErrorHandler are present without qualifiers, how does autowiring and Kafka listener container factory resolution behave?

Also applies to: 24-25, 36-37

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`
around lines 5 - 7, The shared DLQ beans in KafkaDlqConfig are overriding
service-specific handlers; annotate the two `@Bean` methods that return
CommonErrorHandler and DefaultErrorHandler (in class KafkaDlqConfig) with
`@ConditionalOnMissingBean` for their respective types (e.g.,
`@ConditionalOnMissingBean`(CommonErrorHandler.class) and
`@ConditionalOnMissingBean`(DefaultErrorHandler.class)) so these shared beans are
only created if no service-specific beans exist.

31-31: ⚠️ Potential issue | 🟠 Major

DLT partition pinning can fail when topic partition counts differ.

Line 31 hardcodes the source partition for the DLT publish. If <topic>.dlt has fewer partitions, publishing can fail. Prefer letting the producer choose partition (-1) or default resolver behavior.

In Spring Kafka DeadLetterPublishingRecoverer, what is the recommended destination resolver behavior when the dead-letter topic has fewer partitions than the source topic?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`
at line 31, KafkaDlqConfig currently pins the DLT partition by returning new
TopicPartition(record.topic() + ".dlt", record.partition()), which fails when
the dead-letter topic has fewer partitions; change the resolver to let the
producer choose the partition by returning new TopicPartition(record.topic() +
".dlt", -1) (or otherwise use the default resolver behavior) so the
DeadLetterPublishingRecoverer does not hardcode the source partition and avoids
partition count mismatches.
platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java (1)

23-44: ⚠️ Potential issue | 🟠 Major

Strengthen tests with behavioral assertions, not type-only checks.

Current assertions only validate instantiation. Please assert behavior that matters: DLT destination naming (<topic>.dlt), retry/backoff configuration, and that DefaultErrorHandler delegates to the configured recoverer.

As per coding guidelines, "Assert on meaningful values — avoid assertTrue(result != null)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java`
around lines 23 - 44, Tests currently only assert types; update them to assert
meaningful behavior: for deadLetterPublishingRecoverer, verify it resolves the
expected DLT topic name (e.g., "<originalTopic>.dlt") by invoking its
destination resolver or resolving with a sample ConsumerRecord and asserting the
produced TopicPartition name, and assert any configured topic mapper if present;
for kafkaErrorHandler, assert its backoff/retry configuration by inspecting the
DefaultErrorHandler's backOff/backOffExecution policy (or exposing the BackOff
via getter/reflection) to match the expected retry/backoff values; finally
assert the DefaultErrorHandler delegates to the provided recoverer instance
(e.g., compare the handler's recoverer field to the recoverer passed in via
reflection or trigger handle() for a failed record and verify the recoverer is
invoked). Ensure you reference the existing methods/variables
deadLetterPublishingRecoverer(kafkaOperations), kafkaErrorHandler(recoverer),
recoverer, and errorHandler when locating code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`:
- Around line 29-30: The error log in KafkaDlqConfig currently logs record.key()
directly which may expose PII; update the log.error call in KafkaDlqConfig to
avoid raw keys by either omitting record.key(), logging only non-identifying
metadata (topic/partition/offset) or replacing the key with a deterministic,
non-reversible representation (e.g., a hash or masked form) before logging, and
keep the exception (ex) as the throwable parameter; locate the log.error
invocation that references record.topic(), record.partition(), record.offset(),
record.key() and change the logged key to a safe representation.

---

Duplicate comments:
In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`:
- Around line 5-7: The shared DLQ beans in KafkaDlqConfig are overriding
service-specific handlers; annotate the two `@Bean` methods that return
CommonErrorHandler and DefaultErrorHandler (in class KafkaDlqConfig) with
`@ConditionalOnMissingBean` for their respective types (e.g.,
`@ConditionalOnMissingBean`(CommonErrorHandler.class) and
`@ConditionalOnMissingBean`(DefaultErrorHandler.class)) so these shared beans are
only created if no service-specific beans exist.
- Line 31: KafkaDlqConfig currently pins the DLT partition by returning new
TopicPartition(record.topic() + ".dlt", record.partition()), which fails when
the dead-letter topic has fewer partitions; change the resolver to let the
producer choose the partition by returning new TopicPartition(record.topic() +
".dlt", -1) (or otherwise use the default resolver behavior) so the
DeadLetterPublishingRecoverer does not hardcode the source partition and avoids
partition count mismatches.

In
`@platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java`:
- Around line 23-44: Tests currently only assert types; update them to assert
meaningful behavior: for deadLetterPublishingRecoverer, verify it resolves the
expected DLT topic name (e.g., "<originalTopic>.dlt") by invoking its
destination resolver or resolving with a sample ConsumerRecord and asserting the
produced TopicPartition name, and assert any configured topic mapper if present;
for kafkaErrorHandler, assert its backoff/retry configuration by inspecting the
DefaultErrorHandler's backOff/backOffExecution policy (or exposing the BackOff
via getter/reflection) to match the expected retry/backoff values; finally
assert the DefaultErrorHandler delegates to the provided recoverer instance
(e.g., compare the handler's recoverer field to the recoverer passed in via
reflection or trigger handle() for a failed record and verify the recoverer is
invoked). Ensure you reference the existing methods/variables
deadLetterPublishingRecoverer(kafkaOperations), kafkaErrorHandler(recoverer),
recoverer, and errorHandler when locating code to change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: ca6f2652-1f94-4364-a35b-164983425724

📥 Commits

Reviewing files that changed from the base of the PR and between ca74332 and 1c468d1.

📒 Files selected for processing (2)
  • platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java
  • platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java

Puneethkumarck and others added 3 commits March 17, 2026 22:51
…n (STA-218)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…(STA-218)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Puneethkumarck Puneethkumarck merged commit 4038e1f into main Mar 17, 2026
13 of 14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

infra Infrastructure / build

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant