feat(infra): add Kafka Dead Letter Queue config to platform-infra (STA-218)#193
Conversation
…A-218) Shared KafkaDlqConfig with DefaultErrorHandler + DeadLetterPublishingRecoverer. Failed messages retry 3x with exponential backoff (1s, 2s, 4s) then route to <topic>.dlt. Auto-activates via @ConditionalOnClass for all Kafka consumers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughAdds Kafka Dead Letter Queue support: a Spring configuration providing a DeadLetterPublishingRecoverer and a DefaultErrorHandler with exponential backoff retries, plus test dependencies and unit tests verifying bean creation. Changes
Sequence Diagram(s)sequenceDiagram
participant Listener as Spring Kafka Listener
participant ErrorHandler as DefaultErrorHandler
participant Recoverer as DeadLetterPublishingRecoverer
participant Kafka as Kafka Broker
Listener->>ErrorHandler: onListenerException(record, exception)
ErrorHandler->>ErrorHandler: apply ExponentialBackOff retries (3, 1s→10s)
ErrorHandler-->>Recoverer: delegate after retries exhausted
Recoverer->>Kafka: publish to "<original-topic>.dlt" (same partition)
Recoverer-->>ErrorHandler: log publishing result
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`:
- Around line 49-62: The KafkaDlqConfig is creating a DefaultErrorHandler bean
unconditionally which can conflict with downstream services; update the bean
definitions (the kafkaErrorHandler method that returns DefaultErrorHandler and
the bean method that produces DeadLetterPublishingRecoverer) to be conditional
by adding `@ConditionalOnMissingBean`(DefaultErrorHandler.class) on the
DefaultErrorHandler bean and
`@ConditionalOnMissingBean`(DeadLetterPublishingRecoverer.class) on the recoverer
bean so the shared infra only provides these beans when an application hasn’t
defined its own; keep the existing configuration logic inside the methods
(ExponentialBackOffWithMaxRetries, logging, etc.) unchanged.
- Around line 37-47: The current deadLetterPublishingRecoverer in KafkaDlqConfig
uses record.partition() when constructing the DLT TopicPartition (in
deadLetterPublishingRecoverer -> recoverer), which can fail if the .dlt topic
has fewer partitions; replace the custom lambda resolver with the single-arg
DeadLetterPublishingRecoverer constructor (new
DeadLetterPublishingRecoverer(kafkaOperations)) so Spring Kafka's default DLT
suffix and destination resolving handles partition selection, or alternately
modify the lambda to return a destination that lets the producer choose
partition (i.e., do not hardcode record.partition()); update the
deadLetterPublishingRecoverer method to use that change.
- Around line 24-25: The Javadoc comment is inconsistent with the MAX_RETRIES
constant: MAX_RETRIES = 3 results in 4 total attempts (1 initial + 3 retries).
Fix by either updating the Javadoc text to state "4 total attempts (1 initial +
3 retries) with exponential back-off (1s, 2s, 4s)" or change the constant
MAX_RETRIES to 2 to preserve "3 attempts" semantics; update the comment on the
same block (and any other mention at line 32) to match whichever choice you make
so documentation and the MAX_RETRIES constant are consistent.
In
`@platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java`:
- Around line 20-41: The tests only assert types — enhance them to assert
behavior: call config.deadLetterPublishingRecoverer(kafkaOperations) and inspect
its destination resolver (via reflection or by exposing it) to assert it maps a
ConsumerRecord with topic "orders" to a TopicPartition "orders.dlt"; inspect the
recoverer's BackOff (or the BackOff passed into DeadLetterPublishingRecoverer)
to assert expected interval/retry settings; and when creating
config.kafkaErrorHandler(recoverer) assert the returned DefaultErrorHandler is
configured with the same recoverer instance (use reflection to read the
handler's recoverer field or use any provided accessor) so the error handler
actually delegates to the recoverer.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: b5d979bf-5baf-4809-9e50-92117612b348
📒 Files selected for processing (3)
platform-infra/build.gradle.ktsplatform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.javaplatform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java
- Remove verbose Javadoc from config class - Replace inline mock() with @ExtendWith(MockitoExtension.class) + @mock - Remove @SuppressWarnings("unchecked") no longer needed Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java (2)
5-7: 🛠️ Refactor suggestion | 🟠 MajorMake shared DLQ beans conditional to prevent downstream bean collisions.
In platform-infra, these beans should not override service-specific handlers. Add
@ConditionalOnMissingBeanfor both beans.Suggested change
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.kafka.listener.CommonErrorHandler; @@ `@Bean` + `@ConditionalOnMissingBean`(DeadLetterPublishingRecoverer.class) public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( KafkaOperations<String, Object> kafkaOperations) { @@ `@Bean` + `@ConditionalOnMissingBean`(CommonErrorHandler.class) public DefaultErrorHandler kafkaErrorHandler(DeadLetterPublishingRecoverer recoverer) {In Spring Boot, if two beans of type CommonErrorHandler/DefaultErrorHandler are present without qualifiers, how does autowiring and Kafka listener container factory resolution behave?Also applies to: 24-25, 36-37
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java` around lines 5 - 7, The shared DLQ beans in KafkaDlqConfig are overriding service-specific handlers; annotate the two `@Bean` methods that return CommonErrorHandler and DefaultErrorHandler (in class KafkaDlqConfig) with `@ConditionalOnMissingBean` for their respective types (e.g., `@ConditionalOnMissingBean`(CommonErrorHandler.class) and `@ConditionalOnMissingBean`(DefaultErrorHandler.class)) so these shared beans are only created if no service-specific beans exist.
31-31:⚠️ Potential issue | 🟠 MajorDLT partition pinning can fail when topic partition counts differ.
Line 31 hardcodes the source partition for the DLT publish. If
<topic>.dlthas fewer partitions, publishing can fail. Prefer letting the producer choose partition (-1) or default resolver behavior.In Spring Kafka DeadLetterPublishingRecoverer, what is the recommended destination resolver behavior when the dead-letter topic has fewer partitions than the source topic?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java` at line 31, KafkaDlqConfig currently pins the DLT partition by returning new TopicPartition(record.topic() + ".dlt", record.partition()), which fails when the dead-letter topic has fewer partitions; change the resolver to let the producer choose the partition by returning new TopicPartition(record.topic() + ".dlt", -1) (or otherwise use the default resolver behavior) so the DeadLetterPublishingRecoverer does not hardcode the source partition and avoids partition count mismatches.platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java (1)
23-44:⚠️ Potential issue | 🟠 MajorStrengthen tests with behavioral assertions, not type-only checks.
Current assertions only validate instantiation. Please assert behavior that matters: DLT destination naming (
<topic>.dlt), retry/backoff configuration, and thatDefaultErrorHandlerdelegates to the configured recoverer.As per coding guidelines, "Assert on meaningful values — avoid assertTrue(result != null)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java` around lines 23 - 44, Tests currently only assert types; update them to assert meaningful behavior: for deadLetterPublishingRecoverer, verify it resolves the expected DLT topic name (e.g., "<originalTopic>.dlt") by invoking its destination resolver or resolving with a sample ConsumerRecord and asserting the produced TopicPartition name, and assert any configured topic mapper if present; for kafkaErrorHandler, assert its backoff/retry configuration by inspecting the DefaultErrorHandler's backOff/backOffExecution policy (or exposing the BackOff via getter/reflection) to match the expected retry/backoff values; finally assert the DefaultErrorHandler delegates to the provided recoverer instance (e.g., compare the handler's recoverer field to the recoverer passed in via reflection or trigger handle() for a failed record and verify the recoverer is invoked). Ensure you reference the existing methods/variables deadLetterPublishingRecoverer(kafkaOperations), kafkaErrorHandler(recoverer), recoverer, and errorHandler when locating code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`:
- Around line 29-30: The error log in KafkaDlqConfig currently logs record.key()
directly which may expose PII; update the log.error call in KafkaDlqConfig to
avoid raw keys by either omitting record.key(), logging only non-identifying
metadata (topic/partition/offset) or replacing the key with a deterministic,
non-reversible representation (e.g., a hash or masked form) before logging, and
keep the exception (ex) as the throwable parameter; locate the log.error
invocation that references record.topic(), record.partition(), record.offset(),
record.key() and change the logged key to a safe representation.
---
Duplicate comments:
In
`@platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.java`:
- Around line 5-7: The shared DLQ beans in KafkaDlqConfig are overriding
service-specific handlers; annotate the two `@Bean` methods that return
CommonErrorHandler and DefaultErrorHandler (in class KafkaDlqConfig) with
`@ConditionalOnMissingBean` for their respective types (e.g.,
`@ConditionalOnMissingBean`(CommonErrorHandler.class) and
`@ConditionalOnMissingBean`(DefaultErrorHandler.class)) so these shared beans are
only created if no service-specific beans exist.
- Line 31: KafkaDlqConfig currently pins the DLT partition by returning new
TopicPartition(record.topic() + ".dlt", record.partition()), which fails when
the dead-letter topic has fewer partitions; change the resolver to let the
producer choose the partition by returning new TopicPartition(record.topic() +
".dlt", -1) (or otherwise use the default resolver behavior) so the
DeadLetterPublishingRecoverer does not hardcode the source partition and avoids
partition count mismatches.
In
`@platform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java`:
- Around line 23-44: Tests currently only assert types; update them to assert
meaningful behavior: for deadLetterPublishingRecoverer, verify it resolves the
expected DLT topic name (e.g., "<originalTopic>.dlt") by invoking its
destination resolver or resolving with a sample ConsumerRecord and asserting the
produced TopicPartition name, and assert any configured topic mapper if present;
for kafkaErrorHandler, assert its backoff/retry configuration by inspecting the
DefaultErrorHandler's backOff/backOffExecution policy (or exposing the BackOff
via getter/reflection) to match the expected retry/backoff values; finally
assert the DefaultErrorHandler delegates to the provided recoverer instance
(e.g., compare the handler's recoverer field to the recoverer passed in via
reflection or trigger handle() for a failed record and verify the recoverer is
invoked). Ensure you reference the existing methods/variables
deadLetterPublishingRecoverer(kafkaOperations), kafkaErrorHandler(recoverer),
recoverer, and errorHandler when locating code to change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: ca6f2652-1f94-4364-a35b-164983425724
📒 Files selected for processing (2)
platform-infra/src/main/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfig.javaplatform-infra/src/test/java/com/stablecoin/payments/platform/infrastructure/messaging/KafkaDlqConfigTest.java
…n (STA-218) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…(STA-218) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
KafkaDlqConfigtoplatform-infraproviding aDefaultErrorHandlerwithDeadLetterPublishingRecovererthat routes failed consumer records to<topic>.dltafter exponential back-off retries (3 attempts: 1s → 2s → 4s)CommonErrorHandlerbean and wires it into all@KafkaListenercontainer factoriesCloses STA-218
Test plan
KafkaDlqConfigTest— verifiesDeadLetterPublishingRecovererandDefaultErrorHandlerbeans are created@KafkaListener🤖 Generated with Claude Code
Summary by CodeRabbit
Tests
New Features
Chores