Skip to content

Commit be7d374

Browse files
haruki-830春栖
andauthored
[FLINK-39757] Fix kafka sink could not serialize debezium json with column default values (#4416)
When a column has a non-string default value expression, Debezium JSON serialization fails. This fix adds a convertDefaultValue method that converts the string default value expression to the correct Java type matching the Debezium schema, supporting BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, TIME, TIMESTAMP, BINARY, VARBINARY, and other types. Corresponding test cases are also added. Co-authored-by: 春栖 <chunxi.mjy@U-4KXDP7CK-0015.local>
1 parent 31665ac commit be7d374

2 files changed

Lines changed: 198 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@
5353
import org.apache.kafka.connect.json.JsonConverter;
5454
import org.apache.kafka.connect.storage.ConverterConfig;
5555
import org.apache.kafka.connect.storage.ConverterType;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
5658

59+
import java.math.BigDecimal;
60+
import java.math.RoundingMode;
5761
import java.time.ZoneId;
5862
import java.util.HashMap;
5963
import java.util.List;
@@ -80,6 +84,9 @@
8084
public class DebeziumJsonSerializationSchema implements SerializationSchema<Event> {
8185
private static final long serialVersionUID = 1L;
8286

87+
private static final Logger LOG =
88+
LoggerFactory.getLogger(DebeziumJsonSerializationSchema.class);
89+
8390
private static final StringData OP_INSERT = StringData.fromString("c"); // insert
8491
private static final StringData OP_DELETE = StringData.fromString("d"); // delete
8592
private static final StringData OP_UPDATE = StringData.fromString("u"); // update
@@ -260,14 +267,66 @@ private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column column)
260267
field.required();
261268
}
262269
if (column.getDefaultValueExpression() != null) {
263-
field.defaultValue(column.getDefaultValueExpression());
270+
Object convertedDefault =
271+
convertDefaultValue(column.getDefaultValueExpression(), columnType);
272+
if (convertedDefault != null) {
273+
field.defaultValue(convertedDefault);
274+
}
264275
}
265276
if (column.getComment() != null) {
266277
field.doc(column.getComment());
267278
}
268279
return field;
269280
}
270281

282+
/**
283+
* Convert a default value expression string to the Java object matching the Debezium schema
284+
* type.
285+
*/
286+
private static Object convertDefaultValue(
287+
String defaultValueExpression, org.apache.flink.cdc.common.types.DataType columnType) {
288+
try {
289+
switch (columnType.getTypeRoot()) {
290+
case BOOLEAN:
291+
return Boolean.parseBoolean(defaultValueExpression);
292+
case TINYINT:
293+
case SMALLINT:
294+
return Short.parseShort(defaultValueExpression);
295+
case INTEGER:
296+
case DATE:
297+
return Integer.parseInt(defaultValueExpression);
298+
case BIGINT:
299+
case TIME_WITHOUT_TIME_ZONE:
300+
case TIMESTAMP_WITHOUT_TIME_ZONE:
301+
case TIMESTAMP_WITH_TIME_ZONE:
302+
return Long.parseLong(defaultValueExpression);
303+
case FLOAT:
304+
return Float.parseFloat(defaultValueExpression);
305+
case DOUBLE:
306+
return Double.parseDouble(defaultValueExpression);
307+
case DECIMAL:
308+
DecimalType decimalType = (DecimalType) columnType;
309+
return new BigDecimal(defaultValueExpression)
310+
.setScale(decimalType.getScale(), RoundingMode.HALF_UP);
311+
case BINARY:
312+
case VARBINARY:
313+
return defaultValueExpression.getBytes();
314+
case CHAR:
315+
case VARCHAR:
316+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
317+
default:
318+
return defaultValueExpression;
319+
}
320+
} catch (NumberFormatException e) {
321+
LOG.warn(
322+
"Failed to convert default value '{}' for type {}, skipping default value.",
323+
defaultValueExpression,
324+
columnType.getTypeRoot(),
325+
e);
326+
return null;
327+
}
328+
}
329+
271330
private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(
272331
org.apache.flink.cdc.common.types.DataType columnType) {
273332
final SchemaBuilder field;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,144 @@ void testSerializeComplexTypes() throws Exception {
341341
assertThat(rowNode.has("f2")).isTrue();
342342
}
343343

344+
@Test
345+
void testSerializeWithNonStringDefaultValues() throws Exception {
346+
ObjectMapper mapper =
347+
JacksonMapperFactory.createObjectMapper()
348+
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
349+
Map<String, String> properties = new HashMap<>();
350+
properties.put("include-schema.enabled", "true");
351+
Configuration configuration = Configuration.fromMap(properties);
352+
SerializationSchema<Event> serializationSchema =
353+
ChangeLogJsonFormatFactory.createSerializationSchema(
354+
configuration, JsonSerializationType.DEBEZIUM_JSON, ZoneId.systemDefault());
355+
serializationSchema.open(new MockInitializationContext());
356+
357+
// create table covering all types supported by convertDefaultValue
358+
Schema schema =
359+
Schema.newBuilder()
360+
.physicalColumn("_boolean", DataTypes.BOOLEAN(), null, "true")
361+
.physicalColumn("_tinyint", DataTypes.TINYINT(), null, "1")
362+
.physicalColumn("_smallint", DataTypes.SMALLINT(), null, "5")
363+
.physicalColumn("_int", DataTypes.INT(), null, "10")
364+
.physicalColumn("_bigint", DataTypes.BIGINT(), null, "0")
365+
.physicalColumn("_float", DataTypes.FLOAT(), null, "1.5")
366+
.physicalColumn("_double", DataTypes.DOUBLE(), null, "3.14")
367+
.physicalColumn("_decimal", DataTypes.DECIMAL(10, 2), null, "99.99")
368+
.physicalColumn("_char", DataTypes.CHAR(5), null, "abc")
369+
.physicalColumn("_varchar", DataTypes.VARCHAR(10), null, "hello")
370+
.physicalColumn("_string", DataTypes.STRING(), null, "unknown")
371+
.physicalColumn("_date", DataTypes.DATE(), null, "100")
372+
.physicalColumn("_time", DataTypes.TIME(), null, "200000")
373+
.physicalColumn(
374+
"_timestamp", DataTypes.TIMESTAMP(), null, "1672531200000000")
375+
.physicalColumn(
376+
"_timestamp_3", DataTypes.TIMESTAMP(3), null, "1672531200000")
377+
.physicalColumn(
378+
"_timestamp_ltz",
379+
DataTypes.TIMESTAMP_LTZ(),
380+
null,
381+
"2023-01-01T00:00:00Z")
382+
.physicalColumn("_binary", DataTypes.BINARY(3), null, "bin")
383+
.primaryKey("_bigint")
384+
.build();
385+
386+
RowType rowType =
387+
RowType.of(
388+
DataTypes.BOOLEAN(),
389+
DataTypes.TINYINT(),
390+
DataTypes.SMALLINT(),
391+
DataTypes.INT(),
392+
DataTypes.BIGINT(),
393+
DataTypes.FLOAT(),
394+
DataTypes.DOUBLE(),
395+
DataTypes.DECIMAL(10, 2),
396+
DataTypes.CHAR(5),
397+
DataTypes.VARCHAR(10),
398+
DataTypes.STRING(),
399+
DataTypes.DATE(),
400+
DataTypes.TIME(),
401+
DataTypes.TIMESTAMP(),
402+
DataTypes.TIMESTAMP(3),
403+
DataTypes.TIMESTAMP_LTZ(),
404+
DataTypes.BINARY(3));
405+
406+
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
407+
// This should not throw - previously would fail with
408+
// "Invalid Java object for schema with type INT64: class java.lang.String"
409+
assertThat(serializationSchema.serialize(createTableEvent)).isNull();
410+
411+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
412+
413+
DataChangeEvent insertEvent =
414+
DataChangeEvent.insertEvent(
415+
TABLE_1,
416+
generator.generate(
417+
new Object[] {
418+
true,
419+
(byte) 1,
420+
(short) 7,
421+
42,
422+
1L,
423+
2.5f,
424+
9.99,
425+
DecimalData.fromBigDecimal(new BigDecimal("123.45"), 10, 2),
426+
BinaryStringData.fromString("test1"),
427+
BinaryStringData.fromString("test2"),
428+
BinaryStringData.fromString("test3"),
429+
DateData.fromEpochDay(100),
430+
TimeData.fromNanoOfDay(200_000_000L),
431+
TimestampData.fromTimestamp(
432+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
433+
TimestampData.fromTimestamp(
434+
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
435+
LocalZonedTimestampData.fromInstant(
436+
Instant.parse("2023-01-01T00:00:00.000Z")),
437+
new byte[] {1, 2, 3}
438+
}));
439+
440+
byte[] serialized = serializationSchema.serialize(insertEvent);
441+
JsonNode actual = mapper.readTree(serialized);
442+
443+
String fieldsSchema =
444+
"{\"type\":\"boolean\",\"optional\":true,\"default\":true,\"field\":\"_boolean\"},"
445+
+ "{\"type\":\"int16\",\"optional\":true,\"default\":1,\"field\":\"_tinyint\"},"
446+
+ "{\"type\":\"int16\",\"optional\":true,\"default\":5,\"field\":\"_smallint\"},"
447+
+ "{\"type\":\"int32\",\"optional\":true,\"default\":10,\"field\":\"_int\"},"
448+
+ "{\"type\":\"int64\",\"optional\":true,\"default\":0,\"field\":\"_bigint\"},"
449+
+ "{\"type\":\"float\",\"optional\":true,\"default\":1.5,\"field\":\"_float\"},"
450+
+ "{\"type\":\"double\",\"optional\":true,\"default\":3.14,\"field\":\"_double\"},"
451+
+ "{\"type\":\"bytes\",\"optional\":true,\"name\":\"org.apache.kafka.connect.data.Decimal\",\"version\":1,\"parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"10\"},\"default\":\"Jw8=\",\"field\":\"_decimal\"},"
452+
+ "{\"type\":\"string\",\"optional\":true,\"default\":\"abc\",\"field\":\"_char\"},"
453+
+ "{\"type\":\"string\",\"optional\":true,\"default\":\"hello\",\"field\":\"_varchar\"},"
454+
+ "{\"type\":\"string\",\"optional\":true,\"default\":\"unknown\",\"field\":\"_string\"},"
455+
+ "{\"type\":\"int32\",\"optional\":true,\"name\":\"io.debezium.time.Date\",\"version\":1,\"default\":100,\"field\":\"_date\"},"
456+
+ "{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTime\",\"version\":1,\"default\":200000,\"field\":\"_time\"},"
457+
+ "{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.MicroTimestamp\",\"version\":1,\"default\":1672531200000000,\"field\":\"_timestamp\"},"
458+
+ "{\"type\":\"int64\",\"optional\":true,\"name\":\"io.debezium.time.Timestamp\",\"version\":1,\"default\":1672531200000,\"field\":\"_timestamp_3\"},"
459+
+ "{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.time.ZonedTimestamp\",\"version\":1,\"default\":\"2023-01-01T00:00:00Z\",\"field\":\"_timestamp_ltz\"},"
460+
+ "{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"default\":\"Ymlu\",\"field\":\"_binary\"}";
461+
JsonNode expected =
462+
mapper.readTree(
463+
"{\"schema\":{\"type\":\"struct\",\"fields\":["
464+
+ "{\"type\":\"struct\",\"fields\":["
465+
+ fieldsSchema
466+
+ "],\"optional\":true,\"field\":\"before\"},"
467+
+ "{\"type\":\"struct\",\"fields\":["
468+
+ fieldsSchema
469+
+ "],\"optional\":true,\"field\":\"after\"}"
470+
+ "],\"optional\":false},"
471+
+ "\"payload\":{\"before\":null,\"after\":"
472+
+ "{\"_boolean\":true,\"_tinyint\":1,\"_smallint\":7,\"_int\":42,\"_bigint\":1,"
473+
+ "\"_float\":2.5,\"_double\":9.99,\"_decimal\":123.45,"
474+
+ "\"_char\":\"test1\",\"_varchar\":\"test2\",\"_string\":\"test3\","
475+
+ "\"_date\":\"1970-04-11\",\"_time\":\"00:00:00\","
476+
+ "\"_timestamp\":\"2023-01-01 00:00:00\",\"_timestamp_3\":\"2023-01-01 00:00:00\","
477+
+ "\"_timestamp_ltz\":\"2023-01-01 00:00:00Z\",\"_binary\":\"AQID\"},"
478+
+ "\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
479+
assertThat(actual).isEqualTo(expected);
480+
}
481+
344482
@Test
345483
void testSerializeWithSchemaComplexTypes() throws Exception {
346484
ObjectMapper mapper =

0 commit comments

Comments
 (0)