Skip to content

Commit fefd539

Browse files
Madhavanclaude
andcommitted
feat(kafka): complete & validate agent-side dual-provider (Pulsar+Kafka) CDC
Make the CDC agent publish change events to either Pulsar or Kafka/Confluent via the messaging abstraction, with configurable serialization. Pulsar wire format is unchanged; the active provider is selected at runtime. Serialization - commons: add MutationValueCodec, a canonical registry-less Avro codec for MutationValue shared by producer and (future) consumer. - messaging-kafka: introduce a KafkaSerde strategy (RawAvroSerde registry-less default + RegistryAvroSerde Confluent), replacing the registry-mandatory KafkaSchemaProvider. Wire producer/consumer/client to it. - agent: AbstractMessagingMutationSender branches by provider — Pulsar keeps its exact schema/format; Kafka uses raw Avro (default) or Confluent registry when kafkaSchemaRegistryUrl is set. Bug fixes (also affecting CI) - Agents passed Platform.PULSAR, rejecting all Kafka params; pass Platform.ALL and treat ALL as a wildcard in AgentConfig.configure. - ProducerConfigBuilder.sendTimeoutMs(0) was rejected though 0 = infinite (Pulsar-compatible); allow 0, reject only negative. - CassandraContainer.createCassandraContainerWithAgentKafka now sets messagingProvider=kafka. - Bump Testcontainers 1.19.1 -> 1.20.6 and force the docker-java API version via the api.version system property (newer Docker engines reject the 1.32 default); CI passes -Papi.version=1.43. Tests & CI - Add unit tests for the messaging modules (codec, raw-avro serde, serde selection, SPI discovery, producer config validation). - Add KafkaSingleNodeC4Tests (Testcontainers cp-kafka), tagged @tag("kafka"); excluded from default runs, included via -PkafkaTests. - Re-enable the test-kafka CI job; add -Papi.version to the test jobs. Docs - docs/KAFKA_SUPPORT.md describing agent Kafka config and serialization modes. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 72601ad commit fefd539

32 files changed

Lines changed: 1261 additions & 445 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 57 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -86,62 +86,64 @@ jobs:
8686
PULSAR_IMAGE_TAG=${PULSAR_FULL_IMAGE[1]}
8787
8888
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
89+
-Papi.version=1.43 \
8990
-PtestPulsarImage=$PULSAR_IMAGE \
9091
-PtestPulsarImageTag=$PULSAR_IMAGE_TAG \
9192
${{ matrix.module }}:test
9293
93-
# PHASE 1 STABILIZATION - TEMPORARILY DISABLED
94-
# Kafka tests will be re-enabled in Phase 4 after Pulsar tests are stable
95-
# See docs/CI_FAILURE_COMPREHENSIVE_RECOVERY_PLAN.md for details
96-
#
97-
# test-kafka:
98-
# needs: build
99-
# name: Test Kafka
100-
# runs-on: ubuntu-latest
101-
# timeout-minutes: 360
102-
# strategy:
103-
# fail-fast: false
104-
# matrix:
105-
# module: ['agent-c4']
106-
# jdk: ['11', '17']
107-
# kafkaImage: ['apache/kafka:4.2.0', 'confluentinc/cp-kafka:7.9.6', 'confluentinc/cp-kafka:8.1.0']
108-
# steps:
109-
# - uses: actions/checkout@v6
110-
# - name: Set up JDK ${{ matrix.jdk }}
111-
# uses: actions/setup-java@v5
112-
# with:
113-
# java-version: ${{ matrix.jdk }}
114-
# distribution: 'adopt'
115-
#
116-
# - name: Get project version
117-
# uses: HardNorth/github-version-generate@v1.4.0
118-
# with:
119-
# version-source: file
120-
# version-file: gradle.properties
121-
# version-file-extraction-pattern: '(?<=version=).+'
122-
#
123-
# - name: Cache Docker layers
124-
# uses: actions/cache@v5
125-
# with:
126-
# path: /tmp/.buildx-cache
127-
# key: ${{ runner.os }}-buildx-${{ github.sha }}
128-
# restore-keys: |
129-
# ${{ runner.os }}-buildx-
130-
#
131-
# - name: Test with Gradle (Kafka)
132-
# env:
133-
# DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
134-
# DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
135-
# run: |
136-
# set -e
137-
# PREV_IFS=$IFS
138-
# IFS=':'
139-
# read -ra KAFKA_FULL_IMAGE <<< "${{ matrix.kafkaImage }}"
140-
# IFS=$PREV_IFS
141-
# KAFKA_IMAGE=${KAFKA_FULL_IMAGE[0]}
142-
# KAFKA_IMAGE_TAG=${KAFKA_FULL_IMAGE[1]}
143-
#
144-
# ./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
145-
# -PtestKafkaImage=$KAFKA_IMAGE \
146-
# -PtestKafkaImageTag=$KAFKA_IMAGE_TAG \
147-
# ${{ matrix.module }}:test
94+
test-kafka:
95+
needs: build
96+
name: Test Kafka
97+
runs-on: ubuntu-latest
98+
timeout-minutes: 90
99+
strategy:
100+
fail-fast: false
101+
matrix:
102+
module: ['agent-c4']
103+
jdk: ['11']
104+
kafkaImage: ['confluentinc/cp-kafka:7.8.0', 'confluentinc/cp-kafka:7.9.6']
105+
steps:
106+
- uses: actions/checkout@v6
107+
- name: Set up JDK ${{ matrix.jdk }}
108+
uses: actions/setup-java@v5
109+
with:
110+
java-version: ${{ matrix.jdk }}
111+
distribution: 'adopt'
112+
113+
- name: Get project version
114+
uses: HardNorth/github-version-generate@v1.4.0
115+
with:
116+
version-source: file
117+
version-file: gradle.properties
118+
version-file-extraction-pattern: '(?<=version=).+'
119+
120+
- name: Cache Docker layers
121+
uses: actions/cache@v5
122+
with:
123+
path: /tmp/.buildx-cache
124+
key: ${{ runner.os }}-buildx-${{ github.sha }}
125+
restore-keys: |
126+
${{ runner.os }}-buildx-
127+
128+
- name: Test with Gradle (Kafka)
129+
env:
130+
DSE_REPO_USERNAME: ${{ secrets.DSE_REPO_USERNAME }}
131+
DSE_REPO_PASSWORD: ${{ secrets.DSE_REPO_PASSWORD }}
132+
MAVEN_OPTS: "-Xmx2g -XX:MaxMetaspaceSize=512m"
133+
GRADLE_OPTS: "-Xmx2g -Dorg.gradle.daemon=false"
134+
run: |
135+
set -e
136+
PREV_IFS=$IFS
137+
IFS=':'
138+
read -ra KAFKA_FULL_IMAGE <<< "${{ matrix.kafkaImage }}"
139+
IFS=$PREV_IFS
140+
KAFKA_IMAGE=${KAFKA_FULL_IMAGE[0]}
141+
KAFKA_IMAGE_TAG=${KAFKA_FULL_IMAGE[1]}
142+
143+
# -PkafkaTests includes the @Tag("kafka") integration tests (excluded by default).
144+
./gradlew -Pdse4 -PdseRepoUsername=$DSE_REPO_USERNAME -PdseRepoPassword=$DSE_REPO_PASSWORD \
145+
-Papi.version=1.43 \
146+
-PkafkaTests \
147+
-PtestKafkaImage=$KAFKA_IMAGE \
148+
-PtestKafkaImageTag=$KAFKA_IMAGE_TAG \
149+
${{ matrix.module }}:test

agent-c3/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ test {
7373
useJUnitPlatform()
7474

7575
environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag
76+
environment 'KAFKA_IMAGE', testKafkaImage + ':' + testKafkaImageTag
7677
environment 'CASSANDRA_IMAGE', 'cassandra:' + cassandra3Version
7778

7879
systemProperty "buildDir", buildDir

agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ static void main(String agentArgs, Instrumentation inst) throws Exception {
5858

5959
static void startCdcAgent(String agentArgs) throws Exception {
6060
log.info("Starting CDC agent, cdc_raw_directory={}", DatabaseDescriptor.getCDCLogLocation());
61-
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);
61+
// Platform.ALL: the agent is provider-agnostic and accepts both Pulsar and Kafka
62+
// parameters; the active provider is selected at runtime via 'messagingProvider'.
63+
AgentConfig config = AgentConfig.create(AgentConfig.Platform.ALL, agentArgs);
6264

6365
// With C* 3.11, CL are immutable, we don't need to keep the last sent position.
6466
SegmentOffsetWriter segmentOffsetFileWriter = new SegmentOffsetDummyWriter(config.cdcWorkingDir);

agent-c4/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ test {
8282
useJUnitPlatform()
8383

8484
environment 'PULSAR_IMAGE', testPulsarImage + ':' + testPulsarImageTag
85+
environment 'KAFKA_IMAGE', testKafkaImage + ':' + testKafkaImageTag
8586
environment 'CASSANDRA_IMAGE', 'cassandra:' + cassandra4Version
8687

8788
systemProperty "buildDir", buildDir

agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ static void main(String agentArgs, Instrumentation inst) throws Exception {
5858

5959
static void startCdcAgent(String agentArgs) throws Exception {
6060
log.info("Starting CDC agent, cdc_raw_directory={}", DatabaseDescriptor.getCDCLogLocation());
61-
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);
61+
// Platform.ALL: the agent is provider-agnostic and accepts both Pulsar and Kafka
62+
// parameters; the active provider is selected at runtime via 'messagingProvider'.
63+
AgentConfig config = AgentConfig.create(AgentConfig.Platform.ALL, agentArgs);
6264

6365
SegmentOffsetFileWriter segmentOffsetFileWriter = new SegmentOffsetFileWriter(config.cdcWorkingDir);
6466
segmentOffsetFileWriter.loadOffsets();
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Copyright DataStax, Inc 2021.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.cdc.agent;
17+
18+
import com.datastax.oss.cdc.AgentTestUtil;
19+
import com.datastax.oss.cdc.KafkaSingleNodeTests;
20+
import com.datastax.testcontainers.cassandra.CassandraContainer;
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.testcontainers.containers.Network;
23+
import org.testcontainers.utility.DockerImageName;
24+
25+
import java.util.Optional;
26+
27+
@Slf4j
28+
public class KafkaSingleNodeC4Tests extends KafkaSingleNodeTests {
29+
30+
public static final DockerImageName CASSANDRA_IMAGE = DockerImageName.parse(
31+
Optional.ofNullable(System.getenv("CASSANDRA_IMAGE"))
32+
.orElse("cassandra:" + System.getProperty("cassandraVersion"))
33+
).asCompatibleSubstituteFor("cassandra");
34+
35+
public KafkaSingleNodeC4Tests() {
36+
super(AgentTestUtil.Version.C4);
37+
}
38+
39+
@Override
40+
public CassandraContainer<?> createCassandraContainer(int nodeIndex, String kafkaBootstrapServers, Network testNetwork) {
41+
return CassandraContainer.createCassandraContainerWithAgentKafka(
42+
CASSANDRA_IMAGE, testNetwork, nodeIndex, "c4", kafkaBootstrapServers);
43+
}
44+
45+
@Override
46+
public int getSegmentSize() {
47+
return 1024 * 1024;
48+
}
49+
}

agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ static void main(String agentArgs, Instrumentation inst) throws Exception {
5959
static void startCdcAgent(String agentArgs) throws Exception {
6060
String agentVersion = Agent.class.getPackage().getImplementationVersion();
6161
log.info("Starting CDC agent v{}, cdc_raw_directory={}", agentVersion, DatabaseDescriptor.getCDCLogLocation());
62-
AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, agentArgs);
62+
// Platform.ALL: the agent is provider-agnostic and accepts both Pulsar and Kafka
63+
// parameters; the active provider is selected at runtime via 'messagingProvider'.
64+
AgentConfig config = AgentConfig.create(AgentConfig.Platform.ALL, agentArgs);
6365

6466
SegmentOffsetFileWriter segmentOffsetFileWriter = new SegmentOffsetFileWriter(config.cdcWorkingDir);
6567
segmentOffsetFileWriter.loadOffsets();

0 commit comments

Comments
 (0)