Skip to content

Commit a9039e0

Browse files
author
Madhavan
committed
Fix ClassCastException by reverting NativeSchemaWrapper and CassandraSource to original implementation
- Reverted NativeSchemaWrapper.encode() to simple pass-through (return bytes) - Reverted CassandraSource JsonValueRecord.getValue() and getKey() to original cast - Root cause: Cannot handle GenericRecord in encode() due to method signature forcing byte[] cast - The original simple implementation works correctly with Pulsar's internal handling
1 parent a02376c commit a9039e0

2 files changed

Lines changed: 8 additions & 89 deletions

File tree

commons/src/main/java/com/datastax/oss/cdc/NativeSchemaWrapper.java

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,16 @@
1616
package com.datastax.oss.cdc;
1717

1818
import org.apache.avro.Schema;
19-
import org.apache.avro.generic.GenericDatumWriter;
20-
import org.apache.avro.generic.GenericRecord;
21-
import org.apache.avro.io.BinaryEncoder;
22-
import org.apache.avro.io.EncoderFactory;
2319
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
2420
import org.apache.pulsar.common.schema.SchemaInfo;
2521
import org.apache.pulsar.common.schema.SchemaType;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
2822

29-
import java.io.ByteArrayOutputStream;
30-
import java.io.IOException;
3123
import java.nio.charset.StandardCharsets;
3224
import java.util.HashMap;
3325
import java.util.Optional;
3426

3527
public class NativeSchemaWrapper implements org.apache.pulsar.client.api.Schema<byte[]> {
3628

37-
private static final Logger log = LoggerFactory.getLogger(NativeSchemaWrapper.class);
38-
3929
private final SchemaInfo pulsarSchemaInfo;
4030
private final Schema nativeSchema;
4131

@@ -53,41 +43,8 @@ public NativeSchemaWrapper(Schema nativeSchema, SchemaType pulsarSchemaType) {
5343
}
5444

5545
@Override
56-
public byte[] encode(byte[] data) {
57-
if (data == null) {
58-
throw new IllegalArgumentException("Cannot encode null data");
59-
}
60-
61-
// The parameter is declared as byte[] to match the Schema<byte[]> interface,
62-
// but Pulsar internally may pass GenericRecord objects. We need to handle both cases.
63-
Object actualData = (Object) data;
64-
65-
// Handle byte[] input (backward compatibility)
66-
if (actualData instanceof byte[]) {
67-
return (byte[]) actualData;
68-
}
69-
70-
// Handle GenericRecord input (Pulsar internal usage)
71-
if (actualData instanceof GenericRecord) {
72-
try {
73-
GenericRecord record = (GenericRecord) actualData;
74-
ByteArrayOutputStream out = new ByteArrayOutputStream();
75-
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
76-
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(nativeSchema);
77-
writer.write(record, encoder);
78-
encoder.flush();
79-
return out.toByteArray();
80-
} catch (IOException e) {
81-
log.error("Failed to serialize GenericRecord to bytes", e);
82-
throw new RuntimeException("Failed to serialize GenericRecord", e);
83-
}
84-
}
85-
86-
// Unexpected type
87-
throw new IllegalArgumentException(
88-
"Unsupported data type for encoding: " + actualData.getClass().getName() +
89-
". Expected byte[] or GenericRecord."
90-
);
46+
public byte[] encode(byte[] bytes) {
47+
return bytes;
9148
}
9249

9350
@Override

connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -999,27 +999,7 @@ private class JsonValueRecord implements CassandraRecord {
999999
@Override
10001000
public byte[] getValue() {
10011001
try {
1002-
Object value = kvRecord.getValue().getValue();
1003-
// Handle both byte[] (from NativeConverter) and GenericRecord (from GenericConverter)
1004-
if (value instanceof byte[]) {
1005-
return (byte[]) value;
1006-
} else if (value instanceof org.apache.pulsar.client.api.schema.GenericRecord) {
1007-
// GenericRecord from Pulsar - need to serialize to bytes
1008-
org.apache.pulsar.client.api.schema.GenericRecord genericRecord =
1009-
(org.apache.pulsar.client.api.schema.GenericRecord) value;
1010-
1011-
// Use the converter to serialize the GenericRecord to bytes
1012-
// The converter's fromConnectData method handles the serialization
1013-
@SuppressWarnings("unchecked")
1014-
Converter<byte[], org.apache.pulsar.client.api.schema.GenericRecord, ?, byte[]> converter =
1015-
(Converter<byte[], org.apache.pulsar.client.api.schema.GenericRecord, ?, byte[]>)
1016-
kvRecord.converterAndQueryFinal.converter;
1017-
return converter.fromConnectData(genericRecord);
1018-
} else {
1019-
throw new IllegalStateException("Unexpected value type: " +
1020-
(value != null ? value.getClass().getName() : "null") +
1021-
". Expected byte[] or GenericRecord");
1022-
}
1002+
return (byte[]) kvRecord.getValue().getValue();
10231003
} catch (Exception err) {
10241004
throw new RuntimeException(err);
10251005
}
@@ -1033,30 +1013,12 @@ public Schema getSchema() {
10331013
@Override
10341014
public Optional<String> getKey() {
10351015
Object key = kvRecord.getValue().getKey();
1036-
// Handle both byte[] and GenericRecord for keys
1037-
if (key instanceof byte[]) {
1038-
// returns a json string in plain text. E.g.: key:[{"a":"38878"}]
1039-
return Optional.of(new String((byte[])key, StandardCharsets.UTF_8));
1040-
} else if (key instanceof org.apache.pulsar.client.api.schema.GenericRecord) {
1041-
// GenericRecord from Pulsar - need to serialize to bytes
1042-
org.apache.pulsar.client.api.schema.GenericRecord genericRecord =
1043-
(org.apache.pulsar.client.api.schema.GenericRecord) key;
1044-
1045-
try {
1046-
// Use the key converter to serialize the GenericRecord to bytes
1047-
@SuppressWarnings("unchecked")
1048-
Converter<byte[], org.apache.pulsar.client.api.schema.GenericRecord, ?, byte[]> converter =
1049-
(Converter<byte[], org.apache.pulsar.client.api.schema.GenericRecord, ?, byte[]>) keyConverter;
1050-
byte[] keyBytes = converter.fromConnectData(genericRecord);
1051-
return Optional.of(new String(keyBytes, StandardCharsets.UTF_8));
1052-
} catch (Exception e) {
1053-
throw new RuntimeException("Failed to serialize key", e);
1054-
}
1055-
} else {
1056-
throw new IllegalStateException("Invalid key type: " +
1057-
(key != null ? key.getClass().getName() : "null") +
1058-
". Expected byte[] or GenericRecord");
1016+
if (!(key instanceof byte[])) {
1017+
throw new IllegalStateException("Invalid key type " + key.getClass().getName());
10591018
}
1019+
1020+
// returns a json string in plain text. E.g.: key:[{"a":"38878"}]
1021+
return Optional.of(new String((byte[])key, StandardCharsets.UTF_8));
10601022
}
10611023

10621024
@Override

0 commit comments

Comments
 (0)