Describe the issue
The Spring Kafka project provides RecordInterceptor for regular Kafka consumers, enabling cross-cutting concerns (logging, metrics, tracing) to be applied automatically before record processing. However, the Kafka Streams binder in Spring Cloud Stream has no equivalent mechanism.
Currently, users must manually add .process() or .peek() calls in every Function<KStream, KStream> bean to apply common logic. This is repetitive and error-prone, especially in applications with many stream functions.
Use case
A common need is to apply shared logic to every record across all KStream bindings, for example:
- Structured logging of record metadata (topic, partition, offset)
- Micrometer metrics collection per topic
- Distributed tracing context propagation from record headers
Proposed solution
Introduce a KafkaStreamsRecordInterceptor interface that users register as a Spring bean. The binder would auto-detect it and insert a .process() side-effect processor node into the topology in AbstractKafkaStreamsBinderProcessor.getKStream(), before event type routing and user function processing.
@FunctionalInterface
public interface KafkaStreamsRecordInterceptor {
void intercept(Record<Object, Object> record, RecordInterceptorContext context);
}
Multiple interceptors would be supported via @Order and composed through a CompositeKafkaStreamsRecordInterceptor.
Example usage
@Bean
public KafkaStreamsRecordInterceptor loggingInterceptor() {
return (record, ctx) -> {
log.info("topic={}, partition={}, offset={}", ctx.topic(), ctx.partition(), ctx.offset());
};
}
// Business logic stays clean, interceptor is applied automatically
@Bean
public Function<KStream<String, Order>, KStream<String, Result>> process() {
return input -> input.mapValues(this::processOrder);
}
Related issues
Version of the framework
Spring Cloud Stream 5.0.x / Kafka Streams binder
Describe the issue
The Spring Kafka project provides
RecordInterceptorfor regular Kafka consumers, enabling cross-cutting concerns (logging, metrics, tracing) to be applied automatically before record processing. However, the Kafka Streams binder in Spring Cloud Stream has no equivalent mechanism.Currently, users must manually add
.process()or.peek()calls in everyFunction<KStream, KStream>bean to apply common logic. This is repetitive and error-prone, especially in applications with many stream functions.Use case
A common need is to apply shared logic to every record across all KStream bindings, for example:
Proposed solution
Introduce a
KafkaStreamsRecordInterceptorinterface that users register as a Spring bean. The binder would auto-detect it and insert a.process()side-effect processor node into the topology inAbstractKafkaStreamsBinderProcessor.getKStream(), before event type routing and user function processing.Multiple interceptors would be supported via
@Orderand composed through aCompositeKafkaStreamsRecordInterceptor.Example usage
Related issues
Version of the framework
Spring Cloud Stream 5.0.x / Kafka Streams binder