Skip to content

Commit 4038e1f

Browse files
feat(infra): add Kafka Dead Letter Queue config to platform-infra (STA-218) (#193)
* feat(infra): add Kafka Dead Letter Queue config to platform-infra (STA-218) Shared KafkaDlqConfig with DefaultErrorHandler + DeadLetterPublishingRecoverer. Failed messages retry 3x with exponential backoff (1s, 2s, 4s) then route to <topic>.dlt. Auto-activates via @ConditionalOnClass for all Kafka consumers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(infra): apply coding standards to KafkaDlqConfig (STA-218) - Remove verbose Javadoc from config class - Replace inline mock() with @ExtendWith(MockitoExtension.class) + @mock - Remove @SuppressWarnings("unchecked") no longer needed Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor(infra): simplify KafkaDlqConfig with ConditionalOnMissingBean (STA-218) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(infra): add behavioral backoff assertions to KafkaDlqConfigTest (STA-218) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9f9f802 commit 4038e1f

2 files changed

Lines changed: 99 additions & 0 deletions

File tree

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.stablecoin.payments.platform.infrastructure.messaging;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
5+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.context.annotation.Configuration;
8+
import org.springframework.kafka.core.KafkaOperations;
9+
import org.springframework.kafka.core.KafkaTemplate;
10+
import org.springframework.kafka.listener.CommonErrorHandler;
11+
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
12+
import org.springframework.kafka.listener.DefaultErrorHandler;
13+
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
14+
15+
@Slf4j
16+
@Configuration
17+
@ConditionalOnClass(KafkaTemplate.class)
18+
public class KafkaDlqConfig {
19+
20+
private static final int MAX_RETRIES = 3;
21+
private static final long INITIAL_INTERVAL_MS = 1_000L;
22+
private static final double MULTIPLIER = 2.0;
23+
private static final long MAX_INTERVAL_MS = 10_000L;
24+
25+
@Bean
26+
@ConditionalOnMissingBean(DeadLetterPublishingRecoverer.class)
27+
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
28+
KafkaOperations<String, Object> kafkaOperations) {
29+
return new DeadLetterPublishingRecoverer(kafkaOperations);
30+
}
31+
32+
@Bean
33+
@ConditionalOnMissingBean(CommonErrorHandler.class)
34+
public DefaultErrorHandler kafkaErrorHandler(DeadLetterPublishingRecoverer recoverer) {
35+
var backOff = new ExponentialBackOffWithMaxRetries(MAX_RETRIES);
36+
backOff.setInitialInterval(INITIAL_INTERVAL_MS);
37+
backOff.setMultiplier(MULTIPLIER);
38+
backOff.setMaxInterval(MAX_INTERVAL_MS);
39+
40+
log.info("Configured Kafka DLQ error handler: maxRetries={}, initialInterval={}ms, multiplier={}",
41+
MAX_RETRIES, INITIAL_INTERVAL_MS, MULTIPLIER);
42+
43+
return new DefaultErrorHandler(recoverer, backOff);
44+
}
45+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.stablecoin.payments.platform.infrastructure.messaging;
2+
3+
import org.junit.jupiter.api.DisplayName;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.Mock;
7+
import org.mockito.junit.jupiter.MockitoExtension;
8+
import org.springframework.kafka.core.KafkaOperations;
9+
import org.springframework.kafka.support.ExponentialBackOffWithMaxRetries;
10+
import org.springframework.test.util.ReflectionTestUtils;
11+
import org.springframework.util.backoff.BackOff;
12+
13+
import static org.assertj.core.api.Assertions.assertThat;
14+
15+
@ExtendWith(MockitoExtension.class)
16+
@DisplayName("KafkaDlqConfig")
17+
class KafkaDlqConfigTest {
18+
19+
private final KafkaDlqConfig config = new KafkaDlqConfig();
20+
21+
@Mock
22+
private KafkaOperations<String, Object> kafkaOperations;
23+
24+
@Test
25+
@DisplayName("should create DeadLetterPublishingRecoverer wired with KafkaOperations")
26+
void shouldCreateDeadLetterPublishingRecoverer() {
27+
// when
28+
var recoverer = config.deadLetterPublishingRecoverer(kafkaOperations);
29+
30+
// then
31+
assertThat(recoverer).isNotNull();
32+
}
33+
34+
@Test
35+
@DisplayName("should configure error handler with exponential backoff: 3 retries, 1s initial, 2x multiplier")
36+
void shouldConfigureErrorHandlerWithExponentialBackoff() {
37+
// given
38+
var recoverer = config.deadLetterPublishingRecoverer(kafkaOperations);
39+
var expectedBackOff = new ExponentialBackOffWithMaxRetries(3);
40+
expectedBackOff.setInitialInterval(1_000L);
41+
expectedBackOff.setMultiplier(2.0);
42+
expectedBackOff.setMaxInterval(10_000L);
43+
44+
// when
45+
var errorHandler = config.kafkaErrorHandler(recoverer);
46+
47+
// then
48+
var failureTracker = ReflectionTestUtils.getField(errorHandler, "failureTracker");
49+
var actualBackOff = (BackOff) ReflectionTestUtils.getField(failureTracker, "backOff");
50+
assertThat(actualBackOff)
51+
.usingRecursiveComparison()
52+
.isEqualTo(expectedBackOff);
53+
}
54+
}

0 commit comments

Comments
 (0)