Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
61967b8
Initial plan for adding Kafka support
Mar 17, 2026
a340dff
Phase 1 - implemented
Mar 17, 2026
c41a66b
Phase 2: Core Abstraction Layer
Mar 17, 2026
ea1fde6
Phase 3: Pulsar Implementation
Mar 18, 2026
1c71917
Phase 4: Kafka Implementation - week 1
Mar 18, 2026
28962c9
Phase 2: Core Abstraction Layer - Week 2-3 Deliverables. deleted ph 3…
Mar 18, 2026
eca36ef
Phase 3 - week1 - Pulsar Impl
Mar 18, 2026
53b7b99
Phase 3 - weeks2&3 - Pulsar Impl
Mar 18, 2026
a7581a8
Phase 4 - Kafka implementation initial
Mar 18, 2026
f86f3aa
Phase 4 - kafka impl basic docs updates
Mar 18, 2026
fda0a5e
Phase 5: Kafka Integration Tests & CI Workflows - Implementation
Mar 19, 2026
9a3951d
Remove Made with Bob comment on files
Mar 19, 2026
ce9f65f
Phase 1: CI Stabilization - Disable Kafka tests and add resource limits
Mar 20, 2026
81bb0f6
ci failures
Mar 20, 2026
de57f23
backfill-ci failure fix attempt
Mar 20, 2026
eb8aa9f
attempt to fix backfill-ci
Mar 21, 2026
effc3c5
temp comment jdk17 from matrix
Mar 21, 2026
fa2e97f
attempt to remove kafka premature
Mar 21, 2026
271d8ff
backfill cli and ci fix attempt
Mar 21, 2026
aad35aa
bob created mess to fix backfill-ci job attempt 5
Mar 21, 2026
4aef5bf
phase 3 impl + refactor
Apr 3, 2026
bd561a5
interim
Apr 3, 2026
94e98e1
interim fixes;slf4j reverts
Apr 3, 2026
b581068
interim phase3 fix attempts
Apr 6, 2026
23bc3f6
interim fixes backfill-ci failures
Apr 6, 2026
28e48b2
interim backfill CI fixes
Apr 6, 2026
0e0036b
s
Apr 6, 2026
fa04dbe
s
Apr 6, 2026
d696075
interim backfill-ci fixes
Apr 6, 2026
39a82a8
interim ci fixes
Apr 7, 2026
723d4c3
Fix ClassCastException by reverting NativeSchemaWrapper and Cassandra…
Apr 7, 2026
2d10e7c
Update BOB_CONTEXT_SUMMARY with ClassCastException fix details
Apr 7, 2026
1d3cd5a
backfill-ci fail fix attempt
Apr 7, 2026
85c1777
Fix connector test failures: Revert to direct Pulsar API usage
Apr 8, 2026
01c7578
ci failures fixes
Apr 8, 2026
29ae85b
CI Failure fix attempt
Apr 15, 2026
e854cdc
fix(kafka_support): correct SSL keystore/truststore mapping and resto…
Apr 15, 2026
c60f1d2
feat(kafka): complete & validate agent-side dual-provider (Pulsar+Kaf…
May 30, 2026
1826d51
feat(kafka): add Kafka Connect source connector (events -> Cassandra …
May 30, 2026
c3950c6
fix(backfill): repair messaging-abstraction backfill (SPI discovery +…
May 30, 2026
e0a5d65
fix(connector): make assertMapsEqual tolerant of shaded/non-shaded Av…
May 30, 2026
428a094
fix(backfill): forward Docker API version to the e2eTest task and bac…
May 30, 2026
1eccb2f
fix(backfill): discover dsbulk codec providers in the NAR CLI-extensi…
May 30, 2026
6c4ccd1
ci: expand Pulsar/Kafka test matrices and restore 360m timeout
May 30, 2026
990d1df
fix(agent): reject invalid messagingProvider and validate provider co…
May 30, 2026
9f930e4
docs(kafka): publish Kafka/Confluent user docs and regenerate agent p…
Jun 12, 2026
f85aa44
feat(backfill): support Kafka as a backfill destination + e2e + CI
May 30, 2026
0ce0e6d
docs(backfill): document Kafka as a backfill destination
May 30, 2026
6fa9bce
fix(backfill-ci): pass cassandraFamily to the Kafka e2e so the right …
May 30, 2026
5400fb3
fix(backfill-ci): wait on CQL readiness for the no-agent Cassandra node
May 30, 2026
97b2380
fix(backfill-ci): wait on the CQL port, not a log line, for the no-ag…
May 30, 2026
53a2b15
fix(backfill-ci): revert to CQL-log wait and make the c4 logback log …
May 30, 2026
091579a
fix(connector): make testSchema tolerant of non-shaded Avro and JSON …
May 31, 2026
64d7952
fix(connector): type assertGenericMap keys as Object for shaded/non-s…
May 31, 2026
de6706e
fix(connector): normalize non-shaded Avro collections nested inside a…
May 31, 2026
c023bcd
chore(ci): Bump HardNorth/github-version-generate@v1.4.1 to use Nodej…
Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions .github/workflows/backfill-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ jobs:
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m" # PHASE 1: Limit JVM memory

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

💡 Code reviewers tip - we may not need this

GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false" # PHASE 1: Limit Gradle memory
run: |
./gradlew -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
backfill-cli:build
Expand All @@ -54,7 +56,7 @@ jobs:
distribution: 'adopt'

- name: Get project version
uses: HardNorth/github-version-generate@v1.4.0
uses: HardNorth/github-version-generate@v1.4.1
with:
version-source: file
version-file: gradle.properties
Expand All @@ -72,6 +74,8 @@ jobs:
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m" # PHASE 1: Limit JVM memory
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false" # PHASE 1: Limit Gradle memory
run: |
set -e
PREV_IFS=$IFS
Expand All @@ -80,9 +84,58 @@ jobs:
IFS=$PREV_IFS
PULSAR_IMAGE=${PULSAR_FULL_IMAGE[0]}
PULSAR_IMAGE_TAG=${PULSAR_FULL_IMAGE[1]}

./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PtestPulsarImage=$PULSAR_IMAGE \
-PtestPulsarImageTag=$PULSAR_IMAGE_TAG \
-PcassandraFamily=${{ matrix.cassandraFamily }} \
backfill-cli:e2eTest

test-kafka:
needs: build
name: Test Backfill CLI (Kafka)
runs-on: ubuntu-latest
timeout-minutes: 90
strategy:
fail-fast: false
matrix:
jdk: ['11']
kafkaImage: ['confluentinc/cp-kafka:7.8.8', 'confluentinc/cp-kafka:7.9.7', 'confluentinc/cp-kafka:8.1.3']
cassandraFamily: ['c3', 'c4', 'dse4']
steps:
- uses: actions/checkout@v6
- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v5
with:
java-version: ${{ matrix.jdk }}
distribution: 'adopt'

- name: Get project version
uses: HardNorth/github-version-generate@v1.4.1
with:
version-source: file
version-file: gradle.properties
version-file-extraction-pattern: '(?<=version=).+'

- name: Cache Docker layers
uses: actions/cache@v5
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-

- name: Test with Gradle
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m"
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false"
run: |
set -e
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PkafkaImage=${{ matrix.kafkaImage }} \
-PcassandraFamily=${{ matrix.cassandraFamily }} \
backfill-cli:e2eTestKafka
67 changes: 64 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,16 @@ jobs:
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
run: |
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
build -x test -x backfill-cli:compileJava
build -x test -x backfill-cli:compileJava -x license -x licenseMain -x licenseTest

test:
needs: build
name: Test
name: Test Pulsar
runs-on: ubuntu-latest
timeout-minutes: 360
strategy:
fail-fast: false
#max-parallel: 10 # PHASE 1: Limit parallel test execution
matrix:
module: ['agent', 'agent-c3', 'agent-c4', 'agent-dse4', 'connector']
jdk: ['11', '17']
Expand All @@ -54,7 +55,7 @@ jobs:
distribution: 'adopt'

- name: Get project version
uses: HardNorth/github-version-generate@v1.4.0
uses: HardNorth/github-version-generate@v1.4.1
with:
version-source: file
version-file: gradle.properties
Expand All @@ -72,6 +73,8 @@ jobs:
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m" # PHASE 1: Limit JVM memory
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false" # PHASE 1: Limit Gradle memory, disable daemon
run: |
set -e
PREV_IFS=$IFS
Expand All @@ -82,6 +85,64 @@ jobs:
PULSAR_IMAGE_TAG=${PULSAR_FULL_IMAGE[1]}

./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PtestPulsarImage=$PULSAR_IMAGE \
-PtestPulsarImageTag=$PULSAR_IMAGE_TAG \
${{ matrix.module }}:test

test-kafka:
needs: build
name: Test Kafka
runs-on: ubuntu-latest
timeout-minutes: 360
strategy:
fail-fast: false
matrix:
module: ['agent-c3', 'agent-c4', 'agent-dse4', 'connector-kafka']
jdk: ['11', '17']
kafkaImage: ['confluentinc/cp-kafka:7.8.8', 'confluentinc/cp-kafka:7.9.7', 'confluentinc/cp-kafka:8.1.3']
steps:
- uses: actions/checkout@v6
- name: Set up JDK ${{ matrix.jdk }}
uses: actions/setup-java@v5
with:
java-version: ${{ matrix.jdk }}
distribution: 'adopt'

- name: Get project version
uses: HardNorth/github-version-generate@v1.4.1
with:
version-source: file
version-file: gradle.properties
version-file-extraction-pattern: '(?<=version=).+'

- name: Cache Docker layers
uses: actions/cache@v5
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-

- name: Test with Gradle (Kafka)
env:
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m"
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false"
run: |
set -e
PREV_IFS=$IFS
IFS=':'
read -ra KAFKA_FULL_IMAGE <<< "${{ matrix.kafkaImage }}"
IFS=$PREV_IFS
KAFKA_IMAGE=${KAFKA_FULL_IMAGE[0]}
KAFKA_IMAGE_TAG=${KAFKA_FULL_IMAGE[1]}

# -PkafkaTests includes the @Tag("kafka") integration tests (excluded by default).
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
-Papi.version=1.43 \
-PkafkaTests \
-PtestKafkaImage=$KAFKA_IMAGE \
-PtestKafkaImageTag=$KAFKA_IMAGE_TAG \
${{ matrix.module }}:test
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
distribution: 'adopt'

- name: Get project version
uses: HardNorth/github-version-generate@v1.4.0
uses: HardNorth/github-version-generate@v1.4.1
with:
version-source: file
version-file: gradle.properties
Expand Down
8 changes: 7 additions & 1 deletion agent-c3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ shadowJar {
manifest {
inheritFrom project.tasks.jar.manifest
}
// Merge service provider files for SPI
mergeServiceFiles()
}

jar.enabled = false
Expand All @@ -37,8 +39,11 @@ assemble.dependsOn(shadowJar)
dependencies {
implementation project(':commons')
implementation project(':agent')
implementation("org.apache.avro:avro:${avroVersion}")
implementation project(':messaging-api')
implementation project(':messaging-pulsar')
implementation project(':messaging-kafka')

implementation("org.apache.avro:avro:${avroVersion}")
implementation("org.apache.pulsar:pulsar-client:${pulsarVersion}")

compileOnly("org.apache.cassandra:cassandra-all:${cassandra3Version}")
Expand Down Expand Up @@ -68,6 +73,7 @@ test {
useJUnitPlatform()

environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag
environment 'KAFKA_IMAGE', testKafkaImage + ':' + testKafkaImageTag
environment 'CASSANDRA_IMAGE', 'cassandra:' + cassandra3Version

systemProperty "buildDir", buildDir
Expand Down
4 changes: 3 additions & 1 deletion agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ static void main(String agentArgs, Instrumentation inst) throws Exception {

static void startCdcAgent(String agentArgs) throws Exception {
log.info("Starting CDC agent, cdc_raw_directory={}", DatabaseDescriptor.getCDCLogLocation());
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);
// Platform.ALL: the agent is provider-agnostic and accepts both Pulsar and Kafka
// parameters; the active provider is selected at runtime via 'messagingProvider'.
AgentConfig config = AgentConfig.create(AgentConfig.Platform.ALL, agentArgs);

// With C* 3.11, CL are immutable, we don't need to keep the last sent position.
SegmentOffsetWriter segmentOffsetFileWriter = new SegmentOffsetDummyWriter(config.cdcWorkingDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.UUID;

@Slf4j
public class PulsarMutationSender extends AbstractPulsarMutationSender<CFMetaData> {
public class PulsarMutationSender extends AbstractMessagingMutationSender<CFMetaData> {

private static final ImmutableMap<String, org.apache.avro.Schema> avroNativeTypes = ImmutableMap.<String, org.apache.avro.Schema>builder()
.put(UTF8Type.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING))
Expand Down Expand Up @@ -106,6 +106,12 @@ public org.apache.avro.Schema getNativeSchema(String cql3Type) {
*/
@Override
public boolean isSupported(final AbstractMutation<CFMetaData> mutation) {
// Check if metadata is null (table may have been dropped)
if (mutation.metadata == null) {
log.warn("Table metadata is null for mutation key={}, table may have been dropped, skipping mutation", mutation.key());
return false;
}

if (!pkSchemas.containsKey(mutation.key())) {
for (ColumnDefinition cm : mutation.metadata.primaryKeyColumns()) {
if (!avroNativeTypes.containsKey(cm.type.asCQL3Type().toString())) {
Expand Down
6 changes: 6 additions & 0 deletions agent-c4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ shadowJar {
manifest {
inheritFrom project.tasks.jar.manifest
}
// Merge service provider files for SPI
mergeServiceFiles()
}

jar.enabled = true
Expand All @@ -43,6 +45,9 @@ assemble.dependsOn(shadowJar)
dependencies {
implementation project(':commons')
implementation project(':agent')
implementation project(':messaging-api')
implementation project(':messaging-pulsar')
implementation project(':messaging-kafka')

implementation("org.apache.avro:avro:${avroVersion}")
implementation("commons-io:commons-io:${commonsIOVersion}") // Override transitive dependency version to fix vulnerability
Expand Down Expand Up @@ -77,6 +82,7 @@ test {
useJUnitPlatform()

environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag
environment 'KAFKA_IMAGE', testKafkaImage + ':' + testKafkaImageTag
environment 'CASSANDRA_IMAGE', 'cassandra:' + cassandra4Version

systemProperty "buildDir", buildDir
Expand Down
4 changes: 3 additions & 1 deletion agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ static void main(String agentArgs, Instrumentation inst) throws Exception {

static void startCdcAgent(String agentArgs) throws Exception {
log.info("Starting CDC agent, cdc_raw_directory={}", DatabaseDescriptor.getCDCLogLocation());
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);
// Platform.ALL: the agent is provider-agnostic and accepts both Pulsar and Kafka
// parameters; the active provider is selected at runtime via 'messagingProvider'.
AgentConfig config = AgentConfig.create(AgentConfig.Platform.ALL, agentArgs);

SegmentOffsetFileWriter segmentOffsetFileWriter = new SegmentOffsetFileWriter(config.cdcWorkingDir);
segmentOffsetFileWriter.loadOffsets();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.UUID;

@Slf4j
public class PulsarMutationSender extends AbstractPulsarMutationSender<TableMetadata> {
public class PulsarMutationSender extends AbstractMessagingMutationSender<TableMetadata> {

private static final ImmutableMap<String, org.apache.avro.Schema> avroSchemaTypes = ImmutableMap.<String, org.apache.avro.Schema>builder()
.put(UTF8Type.instance.asCQL3Type().toString(), org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING))
Expand Down Expand Up @@ -110,6 +110,12 @@ public org.apache.avro.Schema getNativeSchema(String cql3Type) {
*/
@Override
public boolean isSupported(final AbstractMutation<TableMetadata> mutation) {
// Check if metadata is null (table may have been dropped)
if (mutation.metadata == null) {
log.warn("Table metadata is null for mutation key={}, table may have been dropped, skipping mutation", mutation.key());
return false;
}

if (!pkSchemas.containsKey(mutation.key())) {
for (ColumnMetadata cm : mutation.metadata.primaryKeyColumns()) {
if (!avroSchemaTypes.containsKey(cm.type.asCQL3Type().toString())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

import com.datastax.oss.cdc.AgentTestUtil;
import com.datastax.oss.cdc.KafkaSingleNodeTests;
import com.datastax.testcontainers.cassandra.CassandraContainer;
import lombok.extern.slf4j.Slf4j;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

import java.util.Optional;

@Slf4j
public class KafkaSingleNodeC4Tests extends KafkaSingleNodeTests {

public static final DockerImageName CASSANDRA_IMAGE = DockerImageName.parse(
Optional.ofNullable(System.getenv("CASSANDRA_IMAGE"))
.orElse("cassandra:" + System.getProperty("cassandraVersion"))
).asCompatibleSubstituteFor("cassandra");

public KafkaSingleNodeC4Tests() {
super(AgentTestUtil.Version.C4);
}

@Override
public CassandraContainer<?> createCassandraContainer(int nodeIndex, String kafkaBootstrapServers, Network testNetwork) {
return CassandraContainer.createCassandraContainerWithAgentKafka(
CASSANDRA_IMAGE, testNetwork, nodeIndex, "c4", kafkaBootstrapServers);
}

@Override
public int getSegmentSize() {
return 1024 * 1024;
}
}
18 changes: 15 additions & 3 deletions agent-dse4/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
# DSE CDC agent for Apache Pulsar
# DSE CDC agent for Apache Pulsar and Apache Kafka

## Overview

CDC agent for DataStax Enterprise 4.x with support for both Apache Pulsar and Apache Kafka.

## Build

./gradlew agent-dse4:shadowJar

## Run
## Run with Pulsar (Default)

export JVM_EXTRA_OPTS="-javaagent:agent-dse4/build/libs/agent-dse4-<version>-all.jar=pulsarServiceUrl=pulsar://pulsar:6650,cdcWorkingDir=/var/lib/cassandra/cdc"

## Run with Kafka

export JVM_EXTRA_OPTS="-javaagent:agent-dse4/build/libs/agent-dse4-<version>-all.jar=messagingProvider=KAFKA,kafkaBootstrapServers=localhost:9092,cdcWorkingDir=/var/lib/cassandra/cdc"

## Configuration

export JVM_EXTRA_OPTS="-javaagent:agent-dse4/build/libs/agent-dse4-<version>-SNAPSHOT-all.jar=pulsarServiceUrl=pulsar://pulsar:6650,cdcWorkingDir=/var/lib/cassandra/cdc"
See [agent/README.md](../agent/README.md) for full configuration options.


Loading