Skip to content

Commit 3ecdfb3

Browse files
Madhavanclaude
andcommitted
feat(kafka): add Kafka Connect source connector (events -> Cassandra -> data)
New connector-kafka module: a Kafka Connect SinkConnector (com.datastax.oss.kafka.sink.CassandraSinkConnector) that mirrors the Pulsar CassandraSource for Kafka. It consumes the events-<ks>.<table> topic produced by the CDC agent, de-duplicates mutations, queries Cassandra for the current row, and publishes the row to data-<ks>.<table>. Maximal reuse of the existing connector module: CassandraClient, MutationCache, CassandraSourceConnectorConfig and the NativeAvroConverter/NativeJsonConverter (whose toConnectData(Row) already returns raw AVRO/JSON bytes, provider-agnostic). The events key (raw AVRO primary key) is decoded via the key converter's native schema; the data record key reuses the event key bytes and the value is the AVRO/JSON row (or null tombstone for a delete). Registry-less serialization, matching the agent default. - CassandraSinkConfig delegates Cassandra/cache/output settings to CassandraSourceConnectorConfig and injects a default events.topic. - CassandraKafkaSinkE2ETest validates the full pipeline end-to-end (agent -> events -> connector -> data) for both AVRO and JSON output. - CI: test-kafka job matrix extended to ['agent-c4','connector-kafka']. - docs/KAFKA_SUPPORT.md documents deployment, config and known follow-ups. Verified no regressions: Pulsar agent (testSchema) and Pulsar connector (testSinglePk) still pass under the Testcontainers 1.20.6 bump. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent fefd539 commit 3ecdfb3

11 files changed

Lines changed: 2419 additions & 7 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ jobs:
9999
strategy:
100100
fail-fast: false
101101
matrix:
102-
module: ['agent-c4']
102+
module: ['agent-c4', 'connector-kafka']
103103
jdk: ['11']
104104
kafkaImage: ['confluentinc/cp-kafka:7.8.0', 'confluentinc/cp-kafka:7.9.6']
105105
steps:

connector-kafka/build.gradle

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
plugins {
2+
id 'java'
3+
id 'java-library'
4+
id "com.github.johnrengelman.shadow"
5+
}
6+
7+
description = 'Cassandra source connector for Kafka Connect'
8+
9+
dependencies {
10+
// Reuse the Cassandra query + AVRO/JSON conversion + dedup + config from the Pulsar connector.
11+
// (NativeAvroConverter.toConnectData already returns raw AVRO bytes, provider-agnostic.)
12+
implementation project(':connector')
13+
implementation project(':commons')
14+
implementation project(':messaging-api')
15+
implementation project(':messaging-kafka')
16+
17+
compileOnly "org.projectlombok:lombok:${lombokVersion}"
18+
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"
19+
20+
// Kafka Connect plugin API + producer client for the data topic.
21+
implementation "org.apache.kafka:connect-api:${kafkaVersion}"
22+
implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
23+
24+
implementation "org.apache.avro:avro:${avroVersion}"
25+
implementation "org.apache.cassandra:java-driver-core:${ossDriverVersion}"
26+
implementation "io.vavr:vavr:${vavrVersion}"
27+
implementation "org.slf4j:slf4j-api:${slf4jVersion}"
28+
29+
testImplementation "org.junit.jupiter:junit-jupiter-api:${junitJupiterVersion}"
30+
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitJupiterVersion}"
31+
testImplementation "org.apache.cassandra:java-driver-core:${ossDriverVersion}"
32+
testImplementation "org.apache.cassandra:java-driver-query-builder:${ossDriverVersion}"
33+
testImplementation "org.testcontainers:testcontainers:${testContainersVersion}"
34+
testImplementation "org.testcontainers:kafka:${testContainersVersion}"
35+
testImplementation project(':testcontainers')
36+
testRuntimeOnly "ch.qos.logback:logback-classic:${logbackVersion}"
37+
}
38+
39+
shadowJar {
40+
archiveClassifier.set('')
41+
// Merge SPI service files (messaging providers) when building the deployable plugin jar.
42+
mergeServiceFiles()
43+
}
44+
45+
assemble.dependsOn(shadowJar)
46+
47+
test {
48+
useJUnitPlatform()
49+
50+
// The e2e test runs the CDC agent inside the Cassandra container, so it needs the agent-c4
51+
// shadow jar and the same buildDir/projectVersion the testcontainers helpers expect.
52+
dependsOn project(':agent-c4').shadowJar
53+
54+
environment 'KAFKA_IMAGE', testKafkaImage + ':' + testKafkaImageTag
55+
environment 'CASSANDRA_IMAGE', 'cassandra:' + cassandra4Version
56+
57+
systemProperty "buildDir", project(':agent-c4').buildDir
58+
systemProperty "projectVersion", project.version
59+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/**
2+
* Copyright DataStax, Inc 2021.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.kafka.sink;
17+
18+
import com.datastax.oss.cdc.CassandraSourceConnectorConfig;
19+
import org.apache.kafka.common.config.ConfigDef;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
/**
25+
* Configuration for the Cassandra Kafka Connect sink connector.
26+
*
27+
* <p>Cassandra connection, cache and output-format settings are delegated to the existing
28+
* {@link CassandraSourceConnectorConfig} (the same keys as the Pulsar connector: {@code keyspace},
29+
* {@code table}, {@code contactPoints}, {@code loadBalancing.localDc}, {@code cache.*},
30+
* {@code outputFormat}, SSL/auth, etc.). This class adds the Kafka-specific settings needed to
31+
* publish the resulting rows to the data topic.
32+
*/
33+
public class CassandraSinkConfig {
34+
35+
/** Bootstrap servers for the producer that writes enriched rows to the data topic. */
36+
public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
37+
38+
/** Prefix for the data topic; the data topic is {@code <prefix><keyspace>.<table>}. */
39+
public static final String DATA_TOPIC_PREFIX = "data.topic.prefix";
40+
public static final String DATA_TOPIC_PREFIX_DEFAULT = "data-";
41+
42+
/**
43+
* Optional Confluent Schema Registry URL. When unset (default) the connector reads the events
44+
* topic and writes the data topic using registry-less raw AVRO, matching the agent default.
45+
*/
46+
public static final String SCHEMA_REGISTRY_URL = "kafka.schema.registry.url";
47+
48+
private final Map<String, String> originals;
49+
private final CassandraSourceConnectorConfig cassandraConfig;
50+
private final String bootstrapServers;
51+
private final String dataTopicPrefix;
52+
private final String schemaRegistryUrl;
53+
54+
public CassandraSinkConfig(Map<String, String> props) {
55+
this.originals = props;
56+
// The reused CassandraSourceConnectorConfig requires the Pulsar-specific 'events.topic'.
57+
// For Kafka, events are delivered by the Connect framework (the input 'topics'); derive a
58+
// sensible default of events-<keyspace>.<table> when not explicitly provided.
59+
Map<String, String> effective = new HashMap<>(props);
60+
if (!effective.containsKey("events.topic")) {
61+
String keyspace = props.get("keyspace");
62+
String table = props.get("table");
63+
if (keyspace != null && table != null) {
64+
effective.put("events.topic", "events-" + keyspace + "." + table);
65+
}
66+
}
67+
this.cassandraConfig = new CassandraSourceConnectorConfig(effective);
68+
this.bootstrapServers = props.get(KAFKA_BOOTSTRAP_SERVERS);
69+
this.dataTopicPrefix = props.getOrDefault(DATA_TOPIC_PREFIX, DATA_TOPIC_PREFIX_DEFAULT);
70+
String registry = props.get(SCHEMA_REGISTRY_URL);
71+
this.schemaRegistryUrl = (registry == null || registry.trim().isEmpty()) ? null : registry.trim();
72+
}
73+
74+
public CassandraSourceConnectorConfig getCassandraConfig() {
75+
return cassandraConfig;
76+
}
77+
78+
public String getBootstrapServers() {
79+
return bootstrapServers;
80+
}
81+
82+
public String getKeyspace() {
83+
return cassandraConfig.getKeyspaceName();
84+
}
85+
86+
public String getTable() {
87+
return cassandraConfig.getTableName();
88+
}
89+
90+
/** Data topic name: {@code <prefix><keyspace>.<table>}. */
91+
public String getDataTopic() {
92+
return dataTopicPrefix + getKeyspace() + "." + getTable();
93+
}
94+
95+
public String getSchemaRegistryUrl() {
96+
return schemaRegistryUrl;
97+
}
98+
99+
public Map<String, String> originals() {
100+
return originals;
101+
}
102+
103+
/**
104+
* Minimal {@link ConfigDef} for Kafka Connect validation/UI. The remaining Cassandra settings
105+
* are read directly from the raw configuration by {@link CassandraSourceConnectorConfig}.
106+
*/
107+
public static ConfigDef configDef() {
108+
return new ConfigDef()
109+
.define("keyspace", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
110+
"Cassandra keyspace to query.")
111+
.define("table", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
112+
"Cassandra table to query.")
113+
.define("contactPoints", ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
114+
"Comma-separated Cassandra contact points.")
115+
.define("loadBalancing.localDc", ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
116+
"Cassandra local datacenter.")
117+
.define(KAFKA_BOOTSTRAP_SERVERS, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
118+
"Bootstrap servers for the data-topic producer.")
119+
.define(DATA_TOPIC_PREFIX, ConfigDef.Type.STRING, DATA_TOPIC_PREFIX_DEFAULT,
120+
ConfigDef.Importance.MEDIUM, "Prefix for the output data topic.")
121+
.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM,
122+
"Optional Confluent Schema Registry URL (registry-less raw AVRO if unset).");
123+
}
124+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright DataStax, Inc 2021.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.kafka.sink;
17+
18+
import com.datastax.oss.cdc.Version;
19+
import org.apache.kafka.common.config.ConfigDef;
20+
import org.apache.kafka.connect.connector.Task;
21+
import org.apache.kafka.connect.sink.SinkConnector;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* Kafka Connect sink connector that mirrors the Pulsar {@code CassandraSource}: it consumes CDC
32+
* mutation events from the {@code events-<keyspace>.<table>} topic (produced by the CDC agent),
33+
* queries Cassandra for the current row, and publishes the row to the {@code data-<keyspace>.<table>}
34+
* topic.
35+
*
36+
* <p>Deploy with {@code key.converter} / {@code value.converter} set to
37+
* {@code org.apache.kafka.connect.converters.ByteArrayConverter} so the task receives the raw AVRO
38+
* bytes produced by the agent.
39+
*/
40+
public class CassandraSinkConnector extends SinkConnector {
41+
42+
private static final Logger log = LoggerFactory.getLogger(CassandraSinkConnector.class);
43+
44+
private Map<String, String> configProps;
45+
46+
@Override
47+
public void start(Map<String, String> props) {
48+
this.configProps = new HashMap<>(props);
49+
// Fail fast on invalid configuration.
50+
new CassandraSinkConfig(this.configProps);
51+
log.info("Starting CassandraSinkConnector");
52+
}
53+
54+
@Override
55+
public Class<? extends Task> taskClass() {
56+
return CassandraSinkTask.class;
57+
}
58+
59+
@Override
60+
public List<Map<String, String>> taskConfigs(int maxTasks) {
61+
List<Map<String, String>> configs = new ArrayList<>(maxTasks);
62+
for (int i = 0; i < maxTasks; i++) {
63+
configs.add(new HashMap<>(configProps));
64+
}
65+
return configs;
66+
}
67+
68+
@Override
69+
public void stop() {
70+
log.info("Stopping CassandraSinkConnector");
71+
}
72+
73+
@Override
74+
public ConfigDef config() {
75+
return CassandraSinkConfig.configDef();
76+
}
77+
78+
@Override
79+
public String version() {
80+
return Version.getVersion();
81+
}
82+
}

0 commit comments

Comments
 (0)