Skip to content

Commit fe028e4

Browse files
authored
GH-4328: Expose native Kafka Streams DLQ configuration
Fixes: #4328 Extract the DLT record header-building logic into a new DeadLetterRecordManager class so it can be used outside of DeadLetterPublishingRecoverer. Add two new handlers, RecoveringProcessingExceptionHandler and RecoveringProductionExceptionHandler, to handle processing and production errors, respectively. Add a common AbstractRecoveringExceptionHandler that centralizes the shared error-handling logic for all provided recovering exception handlers. Add a KafkaStreamsDeadLetterDestinationResolver to allow users to define dead-letter routing logic used by the Kafka Streams native DLQ in the provided exception handler implementations. Update StreamsBuilderFactoryBean to expose a dead-letter queue topic name as a possible destination for all provided recovering exception handler implementations. Signed-off-by: Loïc Greffier <loic.greffier@michelin.com>
1 parent bbe192f commit fe028e4

14 files changed

Lines changed: 1490 additions & 312 deletions

spring-kafka-docs/src/main/antora/modules/ROOT/pages/streams.adoc

Lines changed: 139 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -297,26 +297,103 @@ Spring Integration automatically provides an implementation using its `GatewayPr
297297
It also requires a `MessagingMessageConverter` to convert the key, value and metadata (including headers) to/from a Spring Messaging `Message<?>`.
298298
See {spring-integration-url}/kafka.html#streams-integration[Calling a Spring Integration Flow from a `KStream`] for more information.
299299

300+
[[recovery-strategies]]
301+
== Recovery Strategies
302+
303+
The framework provides the following exception handlers, which follow the same recovery strategies:
304+
305+
- xref:streams.adoc#streams-deser-recovery[`RecoveringDeserializationExceptionHandler`]
306+
- xref:streams.adoc#streams-processing-recovery[`RecoveringProcessingExceptionHandler`]
307+
- xref:streams.adoc#streams-production-recovery[`RecoveringProductionExceptionHandler`]
308+
309+
The recovery strategies are as follows, by priority order:
310+
311+
- If a xref:streams.adoc#dead-letter-destination-resolver[`KafkaStreamsDeadLetterDestinationResolver`] is defined, resume the stream and forward the failed record to the resolved topic-partition using the native Kafka Streams DLQ.
312+
- If xref:streams.adoc#dead-letter-queue-topic-name-property[`errors.dead.letter.queue.topic.name`] is defined and set to a topic name, resume the stream and forward the failed record to that topic using the native Kafka Streams DLQ.
313+
- If a `ConsumerRecordRecoverer` implementation is defined, invoke it and resume the stream without producing dead-letter records, as handling is delegated to the `ConsumerRecordRecoverer`. For example, the provided xref:streams.adoc#dead-letter-publishing-recoverer[`DeadLetterPublishingRecoverer`] implementation can be used.
314+
- Fail the stream without producing dead-letter records.
315+
316+
When a dead-letter record is published to a dead-letter topic, whether through the native Kafka Streams DLQ or via a `ConsumerRecordRecoverer` implementation, the record is enriched with the xref:kafka/annotation-error-handling.adoc#dead-letters[Spring for Apache Kafka DLT headers].
317+
300318
[[streams-deser-recovery]]
301319
== Recovery from Deserialization Exceptions
302320

303-
Version 2.3 introduced the `RecoveringDeserializationExceptionHandler` which can take some action when a deserialization exception occurs.
304-
Refer to the Kafka documentation about `DeserializationExceptionHandler`, of which the `RecoveringDeserializationExceptionHandler` is an implementation.
305-
The `RecoveringDeserializationExceptionHandler` is configured with a `ConsumerRecordRecoverer` implementation.
306-
The framework provides the `DeadLetterPublishingRecoverer` which sends the failed record to a dead-letter topic.
307-
See xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter Records] for more information about this recoverer.
321+
Version 2.3 introduced the `RecoveringDeserializationExceptionHandler`, which can take some action when a deserialization exception occurs.
322+
It implements the `DeserializationExceptionHandler` interface (refer to the Kafka documentation for details) and follows the xref:streams.adoc#recovery-strategies[Spring for Apache Kafka recovery strategies].
323+
324+
To configure the `RecoveringDeserializationExceptionHandler`, add the following property to your streams configuration:
325+
326+
[source, java]
327+
----
328+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
329+
public KafkaStreamsConfiguration kStreamsConfigs() {
330+
Map<String, Object> props = new HashMap<>();
331+
...
332+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
333+
RecoveringDeserializationExceptionHandler.class);
334+
...
335+
return new KafkaStreamsConfiguration(props);
336+
}
337+
----
338+
339+
[[streams-processing-recovery]]
340+
== Recovery from Processing Exceptions
341+
342+
Version 4.1 introduces the `RecoveringProcessingExceptionHandler`, which can take some action when an exception occurs during stream processing.
343+
It implements the `ProcessingExceptionHandler` interface (refer to the Kafka documentation for details), introduced by https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing[KIP-1033]
344+
and follows the xref:streams.adoc#recovery-strategies[Spring for Apache Kafka recovery strategies].
345+
346+
To enable the `RecoveringProcessingExceptionHandler`, add the following property to your streams configuration:
347+
348+
[source, java]
349+
----
350+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
351+
public KafkaStreamsConfiguration kStreamsConfigs() {
352+
Map<String, Object> props = new HashMap<>();
353+
...
354+
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
355+
RecoveringProcessingExceptionHandler.class);
356+
...
357+
}
358+
----
359+
360+
[[streams-production-recovery]]
361+
== Recovery from Production Exceptions
362+
363+
Version 4.1 introduces the `RecoveringProductionExceptionHandler`, which can take some action when an exception occurs during record production or serialization.
364+
It implements the `ProductionExceptionHandler` interface (refer to the Kafka documentation for details) and follows the xref:streams.adoc#recovery-strategies[Spring for Apache Kafka recovery strategies].
365+
366+
To configure the `RecoveringProductionExceptionHandler`, add the following property to your streams configuration:
367+
368+
[source, java]
369+
----
370+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
371+
public KafkaStreamsConfiguration kStreamsConfigs() {
372+
Map<String, Object> props = new HashMap<>();
373+
...
374+
props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
375+
RecoveringProductionExceptionHandler.class);
376+
...
377+
}
378+
----
379+
380+
[[dead-letter-publishing-recoverer]]
381+
== Dead Letter Publishing Recoverer
382+
383+
The framework provides the `DeadLetterPublishingRecoverer`, which sends the failed record to a dead-letter topic.
384+
See xref:kafka/annotation-error-handling.adoc#dead-letters[Publishing Dead-letter records] for more information about this recoverer.
308385

309-
To configure the recoverer, add the following properties to your streams configuration:
386+
To configure the recoverer for an exception handler, add the following properties to your streams configuration:
310387

311388
[source, java]
312389
----
313390
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
314391
public KafkaStreamsConfiguration kStreamsConfigs() {
315392
Map<String, Object> props = new HashMap<>();
316393
...
317-
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
394+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
318395
RecoveringDeserializationExceptionHandler.class);
319-
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
396+
props.put(RecoveringDeserializationExceptionHandler.RECOVERER, recoverer());
320397
...
321398
return new KafkaStreamsConfiguration(props);
322399
}
@@ -330,6 +407,60 @@ public DeadLetterPublishingRecoverer recoverer() {
330407

331408
Of course, the `recoverer()` bean can be your own implementation of `ConsumerRecordRecoverer`.
332409

410+
[[dead-letter-destination-resolver]]
411+
== Dead Letter Destination Resolver
412+
413+
A bean of type `KafkaStreamsDeadLetterDestinationResolver` can be defined to activate native DLQ routing in the exception handlers.
414+
415+
It determines the DLQ topic name and partition resolution logic to use, based on the error handler context, the input record of the failed processor, and the exception thrown:
416+
417+
[source, java]
418+
----
419+
@Bean
420+
public KafkaStreamsDeadLetterDestinationResolver resolver() {
421+
return (context, record, ex) -> {
422+
if (ex instanceof FooException) return new TopicPartition("dlqTopic1", -1);
423+
if (record instanceof Foo) return new TopicPartition("dlqTopic2", -1);
424+
if (context.processorNodeId().equals("processor-1")) return new TopicPartition("dlqTopic3", -1);
425+
return new TopicPartition("defaultDlqTopic", -1);
426+
};
427+
}
428+
----
429+
430+
A negative partition number indicates that the partition will be determined by the default partitioner.
431+
432+
The `KafkaStreamsDeadLetterDestinationResolver` can then be injected into the exception handlers as follows:
433+
434+
[source, java]
435+
----
436+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
437+
public KafkaStreamsConfiguration kStreamsConfigs() {
438+
Map<String, Object> props = new HashMap<>();
439+
...
440+
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
441+
RecoveringProcessingExceptionHandler.class);
442+
props.put(RecoveringProcessingExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver());
443+
...
444+
return new KafkaStreamsConfiguration(props);
445+
}
446+
----
447+
448+
[[dead-letter-queue-topic-name-property]]
449+
== Dead Letter Queue Topic Name Property
450+
451+
The Kafka Streams property `errors.dead.letter.queue.topic.name` can be defined and set to a topic name to activate native DLQ routing in the exception handlers.
452+
This directly specifies the DLQ topic to which the enabled exception handlers will route all failed records.
453+
454+
Alternatively, this can be set programmatically through the `StreamsBuilderFactoryBean`:
455+
456+
[source, java]
457+
----
458+
@Bean
459+
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
460+
return sfb -> sfb.setDeadLetterTopicName("deadLetterQueueTopic");
461+
}
462+
----
463+
333464
[[kafka-streams-iq-support]]
334465
== Interactive Query Support
335466

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,20 @@ See xref:kafka/kafka-queues.adoc#share-acknowledgment-api[ShareAcknowledgment AP
3636

3737
The default value of `sameIntervalTopicReuseStrategy` in `RetryTopicConfigurationBuilder` has been changed from `MULTIPLE_TOPICS` to `SINGLE_TOPIC` to align with the `@RetryableTopic` annotation default.
3838
See xref:retrytopic/topic-naming.adoc[Topic Naming] for more information.
39+
40+
[[x41-kafka-streams-native-dlq]]
41+
=== Kafka Streams Native DLQ Support
42+
43+
Spring for Apache Kafka now provides exception handlers that support Kafka Streams DLQ (KIP-1034, Kafka 4.2):
44+
45+
- xref:streams.adoc#streams-deser-recovery[`RecoveringDeserializationExceptionHandler`] (updated).
46+
- xref:streams.adoc#streams-processing-recovery[`RecoveringProcessingExceptionHandler`] (new).
47+
- xref:streams.adoc#streams-production-recovery[`RecoveringProductionExceptionHandler`] (new).
48+
49+
The provided exception handlers support multiple Kafka Streams DLQ enabling strategies and dead-letter topic resolution options:
50+
51+
- The Kafka Streams property xref:streams.adoc#dead-letter-queue-topic-name-property[`errors.dead.letter.queue.topic.name`].
52+
- An implementation of the new xref:streams.adoc#dead-letter-destination-resolver[`KafkaStreamsDeadLetterDestinationResolver`] functional interface for dynamic resolution.
53+
- The `setDeadLetterTopicName()` method of the xref:streams.adoc#dead-letter-queue-topic-name-property[`StreamsBuilderFactoryBean`].
54+
55+
`RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER` has been deprecated in favor of `RecoveringDeserializationExceptionHandler.RECOVERER` to align with the new handlers properties.

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBean.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class StreamsBuilderFactoryBean extends AbstractFactoryBean<StreamsBuilde
105105

106106
private @Nullable StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler;
107107

108+
private @Nullable String deadLetterTopicName;
109+
108110
private boolean autoStartup = true;
109111

110112
private int phase = Integer.MAX_VALUE - 1000; // NOSONAR magic #
@@ -163,6 +165,24 @@ public synchronized void setBeanName(String name) {
163165
this.beanName = name;
164166
}
165167

168+
/**
169+
* Set the dead letter topic name.
170+
* @param deadLetterTopicName the dead letter topic name.
171+
* @since 4.1.0
172+
*/
173+
public void setDeadLetterTopicName(String deadLetterTopicName) {
174+
this.deadLetterTopicName = deadLetterTopicName;
175+
}
176+
177+
/**
178+
* Get the dead letter topic name.
179+
* @return the dead letter topic name.
180+
* @since 4.1.0
181+
*/
182+
public @Nullable String getDeadLetterTopicName() {
183+
return this.deadLetterTopicName;
184+
}
185+
166186
/**
167187
* Set the streams configuration {@link Properties} on this factory.
168188
* @param streamsConfig the streams configuration.
@@ -365,6 +385,9 @@ public void start() {
365385
try {
366386
Assert.state(this.properties != null,
367387
"streams configuration properties must not be null");
388+
if (this.deadLetterTopicName != null) {
389+
this.properties.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, this.deadLetterTopicName);
390+
}
368391
this.kafkaStreams = this.kafkaStreamsCustomizer.initKafkaStreams(
369392
this.topology, this.properties, this.clientSupplier
370393
);

0 commit comments

Comments
 (0)