Skip to content

Commit 8841268

Browse files
Gautam-amanspring-builds
authored andcommitted
GH-4384: Ensure non-string override properties are consistent
Fixes: #4384 * Fixes gh-4384: Ensure non-string override properties are consistently applied in DefaultKafkaConsumerFactory * Fixes gh-4384: Ensure consistent propagation of both string and non-string override properties * Fix inconsistent handling of non-string properties in DefaultKafkaConsumerFactory * Fix tests to validate final consumer config propagation and align with Kafka behavior * Fix failing tests by using valid Kafka configs and proper base configs Signed-off-by: Aman Gautam <amangautam2128@gmail.com> (cherry picked from commit f62f0bc)
1 parent 03f96a2 commit 8841268

2 files changed

Lines changed: 77 additions & 16 deletions

File tree

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Objects;
2727
import java.util.Optional;
2828
import java.util.Properties;
29-
import java.util.Set;
3029
import java.util.concurrent.ConcurrentHashMap;
3130
import java.util.function.Supplier;
3231

@@ -368,13 +367,14 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
368367
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
369368
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
370369
if (groupId == null
371-
&& (properties == null || properties.stringPropertyNames().isEmpty())
370+
&& (properties == null || properties.isEmpty())
372371
&& !shouldModifyClientId) {
372+
373373
return createKafkaConsumer(new HashMap<>(this.configs));
374374
}
375375
else {
376-
return createConsumerWithAdjustedProperties(groupId, clientIdPrefix, properties, overrideClientIdPrefix,
377-
clientIdSuffix, shouldModifyClientId);
376+
return createConsumerWithAdjustedProperties(groupId, clientIdPrefix, properties,
377+
overrideClientIdPrefix, clientIdSuffix, shouldModifyClientId);
378378
}
379379
}
380380

@@ -392,18 +392,23 @@ private Consumer<K, V> createConsumerWithAdjustedProperties(@Nullable String gro
392392
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
393393
}
394394
if (properties != null) {
395-
Set<String> stringPropertyNames = properties.stringPropertyNames(); // to get any nested default Properties
396-
stringPropertyNames
397-
.stream()
398-
.filter(name -> !name.equals(ConsumerConfig.CLIENT_ID_CONFIG)
399-
&& !name.equals(ConsumerConfig.GROUP_ID_CONFIG))
400-
.forEach(name -> modifiedConfigs.put(name, properties.getProperty(name)));
401-
properties.entrySet().stream()
402-
.filter(entry -> !entry.getKey().equals(ConsumerConfig.CLIENT_ID_CONFIG)
403-
&& !entry.getKey().equals(ConsumerConfig.GROUP_ID_CONFIG)
404-
&& !stringPropertyNames.contains(entry.getKey())
405-
&& entry.getKey() instanceof String)
406-
.forEach(entry -> modifiedConfigs.put((String) entry.getKey(), entry.getValue()));
395+
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
396+
397+
if (entry.getKey() instanceof String key
398+
&& !ConsumerConfig.CLIENT_ID_CONFIG.equals(key)
399+
&& !ConsumerConfig.GROUP_ID_CONFIG.equals(key)) {
400+
401+
Object value = entry.getValue();
402+
403+
if (value == null) {
404+
value = properties.getProperty(key);
405+
}
406+
407+
if (value != null) {
408+
modifiedConfigs.put(key, value);
409+
}
410+
}
411+
}
407412
checkInaccessible(properties, modifiedConfigs);
408413
}
409414
return createKafkaConsumer(modifiedConfigs);

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,62 @@ public void testSupplierProvidedDeserializersAreNotShared() {
9999
assertThat(target.getKeyDeserializer()).isNotSameAs(target.getKeyDeserializer());
100100
}
101101

102+
@Test
103+
public void testMixedTypeOverridesApplied() {
104+
Map<String, Object> configs = KafkaTestUtils.consumerProps(this.embeddedKafka, "testGroup", false);
105+
106+
Properties overrides = new Properties();
107+
overrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
108+
overrides.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
109+
110+
AtomicReference<ConsumerConfig> capturedConfig = new AtomicReference<>();
111+
112+
DefaultKafkaConsumerFactory<String, String> factory =
113+
new DefaultKafkaConsumerFactory<String, String>(configs) {
114+
115+
@Override
116+
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
117+
capturedConfig.set(new ConsumerConfig(configProps));
118+
return mock(KafkaConsumer.class);
119+
}
120+
};
121+
122+
factory.createConsumer(null, null, null, overrides);
123+
124+
ConsumerConfig consumerConfig = capturedConfig.get();
125+
126+
assertThat(consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(2);
127+
assertThat(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
128+
}
129+
130+
@Test
131+
public void testIntegerAndStringOverridesTogether() {
132+
Map<String, Object> configs = KafkaTestUtils.consumerProps(this.embeddedKafka, "testGroup", false);
133+
134+
Properties overrides = new Properties();
135+
overrides.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
136+
overrides.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
137+
138+
AtomicReference<ConsumerConfig> capturedConfig = new AtomicReference<>();
139+
140+
DefaultKafkaConsumerFactory<String, String> factory =
141+
new DefaultKafkaConsumerFactory<String, String>(configs) {
142+
143+
@Override
144+
protected Consumer<String, String> createRawConsumer(Map<String, Object> configProps) {
145+
capturedConfig.set(new ConsumerConfig(configProps));
146+
return mock(Consumer.class);
147+
}
148+
};
149+
150+
factory.createConsumer(null, null, null, overrides);
151+
152+
ConsumerConfig consumerConfig = capturedConfig.get();
153+
154+
assertThat(consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(5);
155+
assertThat(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
156+
}
157+
102158
@Test
103159
public void testNoOverridesWhenCreatingConsumer() {
104160
Map<String, Object> originalConfig = Collections.emptyMap();

0 commit comments

Comments
 (0)