Skip to content

Commit ef36e2d

Browse files
GH-3164 Support multiple StreamsBuilderFactoryBeanConfigurers
1 parent 161f91b commit ef36e2d

5 files changed

Lines changed: 80 additions & 11 deletions

File tree

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Arrays;
2020
import java.util.HashMap;
21+
import java.util.List;
2122
import java.util.Locale;
2223
import java.util.Map;
2324
import java.util.Objects;
@@ -190,7 +191,7 @@ else if (parameterType.isAssignableFrom(GlobalKTable.class)) {
190191
protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix,
191192
ApplicationContext applicationContext, String inboundName,
192193
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
193-
StreamsBuilderFactoryBeanConfigurer customizer,
194+
List<StreamsBuilderFactoryBeanConfigurer> customizers,
194195
ConfigurableEnvironment environment, BindingProperties bindingProperties) {
195196
ConfigurableListableBeanFactory beanFactory = this.applicationContext
196197
.getBeanFactory();
@@ -347,8 +348,8 @@ else if (deserializationExceptionHandler == DeserializationExceptionHandler.skip
347348

348349
extendedConsumerProperties.setApplicationId((String) streamConfiguration.get(StreamsConfig.APPLICATION_ID_CONFIG));
349350

350-
if (customizer != null) {
351-
customizer.configure(streamsBuilderFactoryBean);
351+
if (!CollectionUtils.isEmpty(customizers)) {
352+
customizers.forEach(customizer -> customizer.configure(streamsBuilderFactoryBean));
352353
}
353354
return applicationContext.getBean(
354355
"&stream-builder-" + beanNamePostPrefix, StreamsBuilderFactoryBean.class);

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ public KafkaStreamsFunctionProcessor kafkaStreamsFunctionProcessor(BindingServic
387387
return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
388388
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate,
389389
cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties,
390-
customizerProvider.getIfUnique(), environment);
390+
customizerProvider.orderedStream().toList(), environment);
391391
}
392392

393393
@Bean

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
8787
private BeanFactory beanFactory;
8888
private final StreamFunctionProperties streamFunctionProperties;
8989
private final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties;
90-
StreamsBuilderFactoryBeanConfigurer customizer;
90+
final List<StreamsBuilderFactoryBeanConfigurer> customizers;
9191
ConfigurableEnvironment environment;
9292

9393
public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties,
@@ -98,7 +98,8 @@ public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProp
9898
CleanupConfig cleanupConfig,
9999
StreamFunctionProperties streamFunctionProperties,
100100
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
101-
StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment) {
101+
List<StreamsBuilderFactoryBeanConfigurer> customizers,
102+
ConfigurableEnvironment environment) {
102103
super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties,
103104
keyValueSerdeResolver, cleanupConfig);
104105
this.bindingServiceProperties = bindingServiceProperties;
@@ -108,7 +109,7 @@ public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProp
108109
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
109110
this.streamFunctionProperties = streamFunctionProperties;
110111
this.kafkaStreamsBinderConfigurationProperties = kafkaStreamsBinderConfigurationProperties;
111-
this.customizer = customizer;
112+
this.customizers = customizers != null ? customizers : List.of();
112113
this.environment = environment;
113114
}
114115

@@ -523,7 +524,7 @@ private Object[] adaptAndRetrieveInboundArguments(Map<String, ResolvableType> st
523524
//Otherwise, create the StreamsBuilderFactory and get the underlying config.
524525
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(functionName)) {
525526
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(functionName, applicationContext,
526-
input, kafkaStreamsBinderConfigurationProperties, customizer, this.environment, bindingProperties);
527+
input, kafkaStreamsBinderConfigurationProperties, customizers, this.environment, bindingProperties);
527528
this.methodStreamsBuilderFactoryBeanMap.put(functionName, streamsBuilderFactoryBean);
528529
}
529530
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka.streams;
18+
19+
import java.util.stream.Stream;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.beans.factory.ObjectProvider;
24+
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
25+
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
26+
import org.springframework.cloud.stream.config.BindingServiceProperties;
27+
import org.springframework.cloud.stream.function.StreamFunctionProperties;
28+
import org.springframework.core.env.ConfigurableEnvironment;
29+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
30+
import org.springframework.kafka.core.CleanupConfig;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.when;
35+
36+
/**
37+
* Tests for {@link KafkaStreamsBinderSupportAutoConfiguration}.
38+
*
39+
* @author wadhwaroh-lang
40+
*/
41+
class KafkaStreamsBinderSupportAutoConfigurationTests {
42+
43+
@Test
44+
@SuppressWarnings("unchecked")
45+
void kafkaStreamsFunctionProcessorUsesAllStreamsBuilderFactoryBeanConfigurers() {
46+
StreamsBuilderFactoryBeanConfigurer first = mock(StreamsBuilderFactoryBeanConfigurer.class);
47+
StreamsBuilderFactoryBeanConfigurer second = mock(StreamsBuilderFactoryBeanConfigurer.class);
48+
ObjectProvider<StreamsBuilderFactoryBeanConfigurer> customizerProvider = mock(ObjectProvider.class);
49+
ObjectProvider<CleanupConfig> cleanupConfigProvider = mock(ObjectProvider.class);
50+
when(customizerProvider.orderedStream()).thenReturn(Stream.of(first, second));
51+
52+
KafkaStreamsFunctionProcessor processor = new KafkaStreamsBinderSupportAutoConfiguration()
53+
.kafkaStreamsFunctionProcessor(
54+
mock(BindingServiceProperties.class),
55+
mock(KafkaStreamsExtendedBindingProperties.class),
56+
mock(KeyValueSerdeResolver.class),
57+
mock(KafkaStreamsBindingInformationCatalogue.class),
58+
mock(KafkaStreamsMessageConversionDelegate.class),
59+
cleanupConfigProvider,
60+
mock(StreamFunctionProperties.class),
61+
mock(KafkaStreamsBinderConfigurationProperties.class),
62+
customizerProvider,
63+
mock(ConfigurableEnvironment.class)
64+
);
65+
66+
assertThat(processor.customizers).containsExactly(first, second);
67+
}
68+
}

docs/modules/ROOT/pages/kafka/kafka-streams-binder/streamsbuilderfactorybean-customizer.adoc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer()
4343

4444
`KafkaStreamsCustomizer` will be called by the `StreamsBuilderFactoryBeabn` right before the underlying `KafkaStreams` gets started.
4545

46-
There can only be one `StreamsBuilderFactoryBeanConfigurer` in the entire application.
47-
Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual `StreamsBuilderFactoryBean` objects?
48-
In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.
46+
The binder invokes all `StreamsBuilderFactoryBeanConfigurer` beans in order before the factory bean is started.
47+
If the customization needs to be different for multiple Kafka Streams processors, each backed by its own `StreamsBuilderFactoryBean`, then the application needs to apply some filter based on the application ID.
4948

5049
For e.g,
5150

0 commit comments

Comments
 (0)