Skip to content

Commit e8b50b8

Browse files
author
Madhavan
committed
fix(kafka_support): correct SSL keystore/truststore mapping and restore infinite producer timeout
1 parent 2bbb52d commit e8b50b8

3 files changed

Lines changed: 186 additions & 350 deletions

File tree

agent/src/main/java/com/datastax/oss/cdc/agent/AbstractMessagingMutationSender.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,9 @@ protected SslConfig buildSslConfig(AgentConfig config) {
207207

208208
if (config.useKeyStoreTls) {
209209
builder.keyStorePath(config.sslKeystorePath)
210-
.keyStorePassword(config.sslTruststorePassword)
211-
.keyStoreType(config.sslTruststoreType)
212-
.trustStorePath(config.sslKeystorePath)
210+
.keyStorePassword(config.sslKeystorePassword) // Fixed: use keystore password
211+
.keyStoreType(config.sslKeystoreType) // Fixed: use keystore type
212+
.trustStorePath(config.sslTruststorePath) // Fixed: use truststore path
213213
.trustStorePassword(config.sslTruststorePassword)
214214
.trustStoreType(config.sslTruststoreType);
215215
}
@@ -305,7 +305,7 @@ protected MessageProducer<byte[], MutationValue> getProducer(final TableInfo tm)
305305
ProducerConfigBuilder.<byte[], MutationValue>builder()
306306
.topic(k)
307307
.producerName("cdc-producer-" + getHostId() + "-" + tm.key())
308-
.sendTimeoutMs(30000) // 30 seconds (Pulsar default when timeout=0 means no timeout)
308+
.sendTimeoutMs(0) // 0 = infinite timeout for backward compatibility
309309
.maxPendingMessages(config.pulsarMaxPendingMessages)
310310
.blockIfQueueFull(true)
311311
.keySchema(keySchema)

agent/src/main/java/com/datastax/oss/cdc/agent/AgentConfig.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,21 +225,29 @@ public static long getEnvAsLong(String varName, long defaultValue) {
225225
null, "CDC_SSL_KEYSTORE_PASSWORD", Setting::getEnvAsString,
226226
"String", "ssl", 6);
227227

228+
public static final String SSL_KEYSTORE_TYPE = "sslKeystoreType";
229+
public String sslKeystoreType;
230+
public static final Setting<String> SSL_KEYSTORE_TYPE_SETTING =
231+
new Setting<>(SSL_KEYSTORE_TYPE, Platform.ALL, (c, s) -> c.sslKeystoreType = s, c -> c.sslKeystoreType,
232+
"The type of the SSL/TLS keystore.",
233+
"JKS", "CDC_SSL_KEYSTORE_TYPE", Setting::getEnvAsString,
234+
"String", "ssl", 7);
235+
228236
public static final String SSL_CIPHER_SUITES = "sslCipherSuites";
229237
public String sslCipherSuites;
230238
public static final Setting<String> SSL_CIPHER_SUITES_SETTING =
231239
new Setting<>(SSL_CIPHER_SUITES, Platform.ALL, (c, s) -> c.sslCipherSuites = s, c -> c.sslCipherSuites,
232240
"Defines one or more cipher suites to use for negotiating the SSL/TLS connection.",
233241
null, "CDC_SSL_CIPHER_SUITES", Setting::getEnvAsString,
234-
"String", "ssl", 7);
242+
"String", "ssl", 8);
235243

236244
public static final String SSL_ENABLED_PROTOCOLS = "sslEnabledProtocols";
237245
public String sslEnabledProtocols;
238246
public static final Setting<String> SSL_ENABLED_PROTOCOLS_SETTING =
239247
new Setting<>(SSL_ENABLED_PROTOCOLS, Platform.ALL, (c, s) -> c.sslEnabledProtocols = s, c -> c.sslEnabledProtocols,
240248
"Enabled SSL/TLS protocols",
241249
"TLSv1.2,TLSv1.1,TLSv1", "CDC_SSL_ENABLED_PROTOCOLS", Setting::getEnvAsString,
242-
"String", "ssl", 8);
250+
"String", "ssl", 9);
243251

244252
public static final String SSL_ALLOW_INSECURE_CONNECTION = "sslAllowInsecureConnection";
245253
public boolean sslAllowInsecureConnection;
@@ -407,6 +415,7 @@ public static long getEnvAsLong(String varName, long defaultValue) {
407415
set.add(SSL_TRUSTSTORE_TYPE_SETTING);
408416
set.add(SSL_KEYSTORE_PATH_SETTING);
409417
set.add(SSL_KEYSTORE_PASSWORD_SETTING);
418+
set.add(SSL_KEYSTORE_TYPE_SETTING);
410419
set.add(SSL_CIPHER_SUITES_SETTING);
411420
set.add(SSL_ENABLED_PROTOCOLS_SETTING);
412421
set.add(SSL_ALLOW_INSECURE_CONNECTION_SETTING);
@@ -448,6 +457,7 @@ public AgentConfig() {
448457
this.sslTruststoreType = SSL_TRUSTSTORE_TYPE_SETTING.initDefault();
449458
this.sslKeystorePath = SSL_KEYSTORE_PATH_SETTING.initDefault();
450459
this.sslKeystorePassword = SSL_KEYSTORE_PASSWORD_SETTING.initDefault();
460+
this.sslKeystoreType = SSL_KEYSTORE_TYPE_SETTING.initDefault();
451461
this.sslCipherSuites = SSL_CIPHER_SUITES_SETTING.initDefault();
452462
this.sslEnabledProtocols = SSL_ENABLED_PROTOCOLS_SETTING.initDefault();
453463
this.sslAllowInsecureConnection = SSL_ALLOW_INSECURE_CONNECTION_SETTING.initDefault();

0 commit comments

Comments
 (0)