From 2c908a6790ed77dfc850b19a5af1f4fb07b6ffc5 Mon Sep 17 00:00:00 2001 From: mike Date: Wed, 3 Jun 2026 06:40:29 +0000 Subject: [PATCH 01/12] [FLINK-39832] Fix Oracle NUMBER(p,0) with p>=19 incorrectly mapped to BIGINT OracleTypeUtils.fromDbzColumn mapped NUMBER(p,0) to BIGINT for all precisions, but Debezium encodes p>=19 as DECIMAL(BYTES). The mismatch caused the sink to read a BinaryRecordData pointer as a long, producing a constant garbage PK (171798691841) and collapsing all rows on upsert. Now returns DECIMAL(p,0) when p>=19, aligned with Debezium runtime behavior. --- .../oracle/utils/OracleTypeUtils.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java index f54c14af525..91b41a8e632 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DecimalType; import io.debezium.relational.Column; import oracle.jdbc.OracleTypes; @@ -72,11 +73,19 @@ private static DataType convertFromColumn(Column column) { return DataTypes.DOUBLE(); case Types.NUMERIC: case Types.DECIMAL: - return column.length() == 0 - || !column.scale().isPresent() - || column.scale().get() <= 0 - ? DataTypes.BIGINT() - : DataTypes.DECIMAL(column.length(), column.scale().orElse(0)); + { + int precision = column.length(); + boolean isIntegerFamily = + !column.scale().isPresent() || column.scale().get() <= 0; + if (isIntegerFamily) { + if (precision > 0 && precision <= 18) { + return DataTypes.BIGINT(); + } + return DataTypes.DECIMAL( + precision > 0 ? precision : DecimalType.MAX_PRECISION, 0); + } + return DataTypes.DECIMAL(precision, column.scale().get()); + } case Types.DATE: return DataTypes.DATE(); case Types.TIMESTAMP: @@ -128,4 +137,4 @@ private static String getFirstBracketContent(String input) { } private OracleTypeUtils() {} -} +} \ No newline at end of file From b00e051ee3b2084816e341ef61d1277587285a0d Mon Sep 17 00:00:00 2001 From: mike Date: Tue, 9 Jun 2026 21:12:35 +0800 Subject: [PATCH 02/12] Fix missing newline at end of OracleTypeUtils.java Add a newline at the end of the OracleTypeUtils.java file. --- .../flink/cdc/connectors/oracle/utils/OracleTypeUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java index 91b41a8e632..20305a4c8ea 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java @@ -137,4 +137,4 @@ private static String getFirstBracketContent(String input) { } private OracleTypeUtils() {} -} \ No newline at end of file +} From 7d0b44f1ea1dba352575dfef308bb6e538d4fd8d Mon Sep 17 00:00:00 2001 From: mike Date: Tue, 9 Jun 2026 13:40:00 +0000 Subject: [PATCH 03/12] Add unit tests for OracleTypeUtils NUMBER type mapping --- .../oracle/utils/OracleTypeUtilsTest.java | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java new file mode 100644 index 00000000000..57e0949dc52 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.utils; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DecimalType; + +import io.debezium.relational.Column; +import org.junit.jupiter.api.Test; + +import java.sql.Types; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OracleTypeUtils}. */ +class OracleTypeUtilsTest { + + @Test + void testNumberPrecision18ShouldBeBigint() { + // NUMBER(18, 0) → BIGINT (precision <= 18) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(18) + .scale(0) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.BIGINT()); + } + + @Test + void testNumberPrecision19ShouldBeDecimal() { + // NUMBER(19, 0) → DECIMAL(19, 0) (precision > 18) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(19) + .scale(0) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(19, 0)); + } + + @Test + void testNumberPrecision38ShouldBeDecimal() { + // NUMBER(38, 0) → DECIMAL(38, 0) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(38) + .scale(0) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(38, 0)); + } + + @Test + void testNumberNoPrecisionShouldBeDecimalMaxPrecision() { + // NUMBER (no precision, length=0) → DECIMAL(MAX_PRECISION, 0) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(0) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0)); + } + + @Test + void testNumberWithScaleShouldBeDecimal() { + // NUMBER(10, 6) → DECIMAL(10, 6) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(10) + .scale(6) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(10, 6)); + } + + @Test + void testNumberPrecision1ShouldBeBigint() { + // NUMBER(1, 0) → BIGINT (precision <= 18) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(1) + .scale(0) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.BIGINT()); + } + + @Test + void testNumberNotNullPrecision19() { + // NOT NULL NUMBER(19, 0) → DECIMAL(19, 0).notNull() + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(19) + .scale(0) + .optional(false) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(19, 0).notNull()); + } +} From 4377d2cfed044b2966f597d1a98d6c76602f4241 Mon Sep 17 00:00:00 2001 From: mike Date: Wed, 10 Jun 2026 05:10:22 +0000 Subject: [PATCH 04/12] Fix OraclePipelineITCase tests for bare NUMBER type mapping change --- .../oracle/source/OraclePipelineITCase.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java index 497fa54ea1d..b296126f810 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; @@ -38,6 +39,7 @@ import org.apache.flink.cdc.common.source.FlinkSourceProvider; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.connectors.base.options.StartupOptions; @@ -71,6 +73,7 @@ import org.junit.jupiter.api.TestMethodOrder; import org.testcontainers.lifecycle.Startables; +import java.math.BigDecimal; import java.nio.file.Files; import java.nio.file.Path; import java.sql.Connection; @@ -400,7 +403,10 @@ public void testAlterAddAllColumnTypeStatement() throws Exception { tableId, Collections.singletonList( new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn("COLS27", DataTypes.BIGINT()))))); + Column.physicalColumn( + "COLS27", + DataTypes.DECIMAL( + DecimalType.MAX_PRECISION, 0)))))); statement.execute( String.format("ALTER TABLE %s.PRODUCTS ADD COLS28 TIMESTAMP(2)", "DEBEZIUM")); expected.add( @@ -983,7 +989,9 @@ public void testGeometryType() throws Exception { new CreateTableEvent( TableId.tableId("DEBEZIUM", "MYLAKE"), Schema.newBuilder() - .physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull()) + .physicalColumn( + "FEATURE_ID", + DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0).notNull()) .physicalColumn("NAME", DataTypes.VARCHAR(32)) .physicalColumn("SHAPE", DataTypes.STRING()) .primaryKey(Arrays.asList("FEATURE_ID")) @@ -992,7 +1000,9 @@ public void testGeometryType() throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.BIGINT(), DataTypes.VARCHAR(32), DataTypes.STRING() + DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0), + DataTypes.VARCHAR(32), + DataTypes.STRING() }, new String[] {"FEATURE_ID", "NAME", "SHAPE"}); BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType); @@ -1004,7 +1014,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 1L, + DecimalData.fromBigDecimal(new BigDecimal(1), 38, 0), BinaryStringData.fromString("center"), BinaryStringData.fromString( "{\"coordinates\":\"[[116.6,24.343]]\",\"type\":\"Point\",\"srid\":4326}") @@ -1014,7 +1024,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 2L, + DecimalData.fromBigDecimal(new BigDecimal(2), 38, 0), BinaryStringData.fromString("two-dimensional point"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,5.0]]\",\"type\":\"Point\",\"srid\":4326}") @@ -1024,7 +1034,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 3L, + DecimalData.fromBigDecimal(new BigDecimal(3), 38, 0), BinaryStringData.fromString("straight line segment"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,10.0],[20.0,20.0],[30.0,30.0]]\",\"type\":\"LineString\",\"srid\":4326}") @@ -1034,7 +1044,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 4L, + DecimalData.fromBigDecimal(new BigDecimal(4), 38, 0), BinaryStringData.fromString("polyline"), BinaryStringData.fromString( "{\"coordinates\":\"[[5.0,5.0],[15.0,10.0],[25.0,5.0],[35.0,10.0]]\",\"type\":\"LineString\",\"srid\":4326}") @@ -1044,7 +1054,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 5L, + DecimalData.fromBigDecimal(new BigDecimal(5), 38, 0), BinaryStringData.fromString("rectangle"), BinaryStringData.fromString( "{\"coordinates\":\"[[0.0,0.0],[10.0,0.0],[10.0,10.0],[0.0,10.0],[0.0,0.0]]\",\"type\":\"Polygon\",\"srid\":4326}") @@ -1054,7 +1064,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 6L, + DecimalData.fromBigDecimal(new BigDecimal(6), 38, 0), BinaryStringData.fromString("Multi-Point"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,10.0],[20.0,20.0],[30.0,30.0]]\",\"type\":\"MultiPoint\",\"srid\":4326}") @@ -1064,7 +1074,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 7L, + DecimalData.fromBigDecimal(new BigDecimal(7), 38, 0), BinaryStringData.fromString("Multi line collection"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,10.0],[20.0,10.0],[10.0,20.0],[20.0,20.0]]\",\"type\":\"MultiLineString\",\"srid\":4326}") @@ -1075,7 +1085,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 8L, + DecimalData.fromBigDecimal(new BigDecimal(8), 38, 0), BinaryStringData.fromString("Multi-Polygon"), BinaryStringData.fromString( "{\"coordinates\":\"[[0.0,0.0],[10.0,0.0],[10.0,10.0],[0.0,10.0],[0.0,0.0],[15.0,15.0],[25.0,15.0],[25.0,25.0],[15.0,25.0],[15.0,15.0]]\",\"type\":\"MultiPolygon\",\"srid\":4326}") @@ -1085,7 +1095,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - 9L, + DecimalData.fromBigDecimal(new BigDecimal(9), 38, 0), BinaryStringData.fromString("Geometry Collection"), BinaryStringData.fromString( "{\"geometries\":\"GEOMETRYCOLLECTION (POINT (25 25), LINESTRING (30 30, 35 25), POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0)))\",\"type\":\"GeometryCollection\",\"srid\":4326}") From 93db88cd6af7b4984f89ed9ad9550c5d0fec1172 Mon Sep 17 00:00:00 2001 From: mike Date: Mon, 15 Jun 2026 15:26:29 +0800 Subject: [PATCH 05/12] Fix OracleMetadataAccessorITCase test expectations for NUMBER type mapping change --- .../source/OracleMetadataAccessorITCase.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java index 2ade430849f..9cdae34963d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java @@ -126,6 +126,12 @@ private void testAccessCommonTypesSchema() throws Exception { DataTypes.FLOAT(), DataTypes.DECIMAL(10, 6), DataTypes.DECIMAL(10, 6), + DataTypes.DECIMAL(38, 0), + DataTypes.DECIMAL(38, 0), + DataTypes.DECIMAL(38, 0), + DataTypes.DECIMAL(38, 0), + DataTypes.DECIMAL(38, 0), + DataTypes.DECIMAL(38, 0), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), @@ -135,13 +141,7 @@ private void testAccessCommonTypesSchema() throws Exception { DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), + DataTypes.DECIMAL(36, 0), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(2), From d89f45ff50c128b3052b9ba4ab7ac6c98309b9e7 Mon Sep 17 00:00:00 2001 From: mike Date: Thu, 18 Jun 2026 18:11:32 +0800 Subject: [PATCH 06/12] Oracle bare NUMBER uses VariableScaleDecimal encoding, and return DECIMAL(MAX_PRECISION, 0) to match OracleTypeUtils --- .../oracle/source/OracleSchemaDataTypeInference.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java index e8bfeee668e..05306e7d9bb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java @@ -20,8 +20,10 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference; +import io.debezium.data.VariableScaleDecimal; import io.debezium.data.geometry.Geometry; import org.apache.kafka.connect.data.Schema; @@ -36,6 +38,10 @@ protected DataType inferStruct(Object value, Schema schema) { // a String with Json format if (Geometry.LOGICAL_NAME.equals(schema.name())) { return DataTypes.STRING(); + } else if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + // Oracle bare NUMBER uses VariableScaleDecimal encoding + // Return DECIMAL(MAX_PRECISION, 0) to match OracleTypeUtils + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0); } else { return super.inferStruct(value, schema); } From 32df06898788c1c2d3c3da07c08f1041f36403fa Mon Sep 17 00:00:00 2001 From: mike Date: Tue, 23 Jun 2026 10:46:46 +0000 Subject: [PATCH 07/12] update --- .../source/OracleSchemaDataTypeInference.java | 6 +- .../oracle/utils/OracleTypeUtils.java | 32 +++- .../oracle/source/OracleFullTypesITCase.java | 48 ++--- .../source/OracleMetadataAccessorITCase.java | 18 +- .../oracle/source/OraclePipelineITCase.java | 24 +-- .../oracle/utils/OracleTypeUtilsTest.java | 172 ++++++++++++++++-- 6 files changed, 226 insertions(+), 74 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java index 05306e7d9bb..79fe8bbad6b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java @@ -39,9 +39,9 @@ protected DataType inferStruct(Object value, Schema schema) { if (Geometry.LOGICAL_NAME.equals(schema.name())) { return DataTypes.STRING(); } else if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { - // Oracle bare NUMBER uses VariableScaleDecimal encoding - // Return DECIMAL(MAX_PRECISION, 0) to match OracleTypeUtils - return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0); + // Oracle bare NUMBER uses VariableScaleDecimal encoding. + // Return DECIMAL(38, 19) to match OracleTypeUtils. + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19); } else { return super.inferStruct(value, schema); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java index 20305a4c8ea..1ea32d8aaac 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java @@ -74,17 +74,31 @@ private static DataType convertFromColumn(Column column) { case Types.NUMERIC: case Types.DECIMAL: { + // Bare NUMBER (scale unspecified): floating-point semantic, + // can store both integers and decimals up to 38 significant digits. + // Use DECIMAL(38, 19) as a balanced universal numeric type. + if (!column.scale().isPresent()) { + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19); + } + int precision = column.length(); - boolean isIntegerFamily = - !column.scale().isPresent() || column.scale().get() <= 0; - if (isIntegerFamily) { - if (precision > 0 && precision <= 18) { - return DataTypes.BIGINT(); - } - return DataTypes.DECIMAL( - precision > 0 ? precision : DecimalType.MAX_PRECISION, 0); + int scale = column.scale().get(); + + // precision == 0 (e.g. NUMBER(*, s)) means unspecified. + int p = precision > 0 ? precision : DecimalType.MAX_PRECISION; + + // scale < 0 or > 36: not safely representable as DECIMAL, downgrade to STRING. + if (scale < 0 || scale > 36) { + return DataTypes.STRING(); } - return DataTypes.DECIMAL(precision, column.scale().get()); + + if (scale == 0) { + // Explicit integer: use BIGINT for p <= 18, DECIMAL otherwise. + return p <= 18 ? DataTypes.BIGINT() : DataTypes.DECIMAL(p, 0); + } + + // 1 <= scale <= 36: standard decimal with fractional part. + return DataTypes.DECIMAL(p, scale); } case Types.DATE: return DataTypes.DATE(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java index 4e3ddfbc672..0b1a1f3ccfb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java @@ -128,21 +128,21 @@ private void testCommonDataTypes() throws Exception { (float) 6.66, DecimalData.fromBigDecimal(new BigDecimal("1234.567891"), 10, 6), DecimalData.fromBigDecimal(new BigDecimal("1234.567891"), 10, 6), - DecimalData.fromBigDecimal(new BigDecimal("77.323"), 10, 3), - DecimalData.fromBigDecimal(new BigDecimal(1), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(22), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(333), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(4444), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(5555), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(1), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(99), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(9999), 10, 0), - DecimalData.fromBigDecimal(new BigDecimal(999999999), 38, 0), - DecimalData.fromBigDecimal(new BigDecimal(999999999999999999L), 38, 0), - 90L, - 9900L, - 999999990L, - 999999999999999900L, + DecimalData.fromBigDecimal(new BigDecimal("77.323"), 38, 19), + DecimalData.fromBigDecimal(new BigDecimal(1), 38, 19), + DecimalData.fromBigDecimal(new BigDecimal(22), 38, 19), + DecimalData.fromBigDecimal(new BigDecimal(333), 38, 19), + DecimalData.fromBigDecimal(new BigDecimal(4444), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(5555), 38, 0), + 1L, + 99L, + 9999L, + 999999999L, + 999999999999999999L, + BinaryStringData.fromString("90"), + BinaryStringData.fromString("9900"), + BinaryStringData.fromString("999999990"), + BinaryStringData.fromString("999999999999999900"), BinaryStringData.fromString("9.99999999999999999999999999999999999E+37"), TimestampData.fromLocalDateTime( LocalDateTime.parse( @@ -237,21 +237,21 @@ private FlinkSourceProvider getFlinkSourceProvider(String[] captureTables) { DataTypes.FLOAT(), DataTypes.DECIMAL(10, 6), DataTypes.DECIMAL(10, 6), - DataTypes.DECIMAL(10, 3), - DataTypes.DECIMAL(10, 0), - DataTypes.DECIMAL(10, 0), - DataTypes.DECIMAL(10, 0), + DataTypes.DECIMAL(38, 19), + DataTypes.DECIMAL(38, 19), + DataTypes.DECIMAL(38, 19), + DataTypes.DECIMAL(38, 19), DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(38, 0), - DataTypes.DECIMAL(1, 0), - DataTypes.DECIMAL(2, 0), - DataTypes.DECIMAL(4, 0), - DataTypes.DECIMAL(9, 0), - DataTypes.DECIMAL(18, 0), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), DataTypes.STRING(), DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP(6), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java index 9cdae34963d..b90e90cf020 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java @@ -126,22 +126,22 @@ private void testAccessCommonTypesSchema() throws Exception { DataTypes.FLOAT(), DataTypes.DECIMAL(10, 6), DataTypes.DECIMAL(10, 6), + DataTypes.DECIMAL(38, 19), + DataTypes.DECIMAL(38, 19), + DataTypes.DECIMAL(38, 19), + DataTypes.DECIMAL(38, 19), DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(38, 0), - DataTypes.DECIMAL(38, 0), - DataTypes.DECIMAL(38, 0), - DataTypes.DECIMAL(38, 0), - DataTypes.DECIMAL(38, 0), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), - DataTypes.DECIMAL(36, 0), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(2), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java index b296126f810..700e87395e2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java @@ -406,7 +406,7 @@ public void testAlterAddAllColumnTypeStatement() throws Exception { Column.physicalColumn( "COLS27", DataTypes.DECIMAL( - DecimalType.MAX_PRECISION, 0)))))); + DecimalType.MAX_PRECISION, 19)))))); statement.execute( String.format("ALTER TABLE %s.PRODUCTS ADD COLS28 TIMESTAMP(2)", "DEBEZIUM")); expected.add( @@ -991,7 +991,7 @@ public void testGeometryType() throws Exception { Schema.newBuilder() .physicalColumn( "FEATURE_ID", - DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0).notNull()) + DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19).notNull()) .physicalColumn("NAME", DataTypes.VARCHAR(32)) .physicalColumn("SHAPE", DataTypes.STRING()) .primaryKey(Arrays.asList("FEATURE_ID")) @@ -1000,7 +1000,7 @@ public void testGeometryType() throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0), + DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19), DataTypes.VARCHAR(32), DataTypes.STRING() }, @@ -1014,7 +1014,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(1), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(1), 38, 19), BinaryStringData.fromString("center"), BinaryStringData.fromString( "{\"coordinates\":\"[[116.6,24.343]]\",\"type\":\"Point\",\"srid\":4326}") @@ -1024,7 +1024,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(2), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(2), 38, 19), BinaryStringData.fromString("two-dimensional point"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,5.0]]\",\"type\":\"Point\",\"srid\":4326}") @@ -1034,7 +1034,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(3), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(3), 38, 19), BinaryStringData.fromString("straight line segment"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,10.0],[20.0,20.0],[30.0,30.0]]\",\"type\":\"LineString\",\"srid\":4326}") @@ -1044,7 +1044,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(4), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(4), 38, 19), BinaryStringData.fromString("polyline"), BinaryStringData.fromString( "{\"coordinates\":\"[[5.0,5.0],[15.0,10.0],[25.0,5.0],[35.0,10.0]]\",\"type\":\"LineString\",\"srid\":4326}") @@ -1054,7 +1054,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(5), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(5), 38, 19), BinaryStringData.fromString("rectangle"), BinaryStringData.fromString( "{\"coordinates\":\"[[0.0,0.0],[10.0,0.0],[10.0,10.0],[0.0,10.0],[0.0,0.0]]\",\"type\":\"Polygon\",\"srid\":4326}") @@ -1064,7 +1064,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(6), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(6), 38, 19), BinaryStringData.fromString("Multi-Point"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,10.0],[20.0,20.0],[30.0,30.0]]\",\"type\":\"MultiPoint\",\"srid\":4326}") @@ -1074,7 +1074,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(7), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(7), 38, 19), BinaryStringData.fromString("Multi line collection"), BinaryStringData.fromString( "{\"coordinates\":\"[[10.0,10.0],[20.0,10.0],[10.0,20.0],[20.0,20.0]]\",\"type\":\"MultiLineString\",\"srid\":4326}") @@ -1085,7 +1085,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(8), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(8), 38, 19), BinaryStringData.fromString("Multi-Polygon"), BinaryStringData.fromString( "{\"coordinates\":\"[[0.0,0.0],[10.0,0.0],[10.0,10.0],[0.0,10.0],[0.0,0.0],[15.0,15.0],[25.0,15.0],[25.0,25.0],[15.0,25.0],[15.0,15.0]]\",\"type\":\"MultiPolygon\",\"srid\":4326}") @@ -1095,7 +1095,7 @@ public void testGeometryType() throws Exception { tableId, generator.generate( new Object[] { - DecimalData.fromBigDecimal(new BigDecimal(9), 38, 0), + DecimalData.fromBigDecimal(new BigDecimal(9), 38, 19), BinaryStringData.fromString("Geometry Collection"), BinaryStringData.fromString( "{\"geometries\":\"GEOMETRYCOLLECTION (POINT (25 25), LINESTRING (30 30, 35 25), POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0)))\",\"type\":\"GeometryCollection\",\"srid\":4326}") diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java index 57e0949dc52..0acbd99b67e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java @@ -31,9 +31,74 @@ /** Tests for {@link OracleTypeUtils}. */ class OracleTypeUtilsTest { + // --- Bare NUMBER (scale unspecified) tests --- + + @Test + void testBareNumberShouldBeDecimal3819() { + // NUMBER (scale unspecified) → DECIMAL(38, 19) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(0) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19)); + } + + @Test + void testBareNumberNotNullShouldBeDecimal3819NotNull() { + // NOT NULL NUMBER → DECIMAL(38, 19).notNull() + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(0) + .optional(false) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19).notNull()); + } + + @Test + void testNumberPrecision10ScaleAbsentShouldBeDecimal3819() { + // NUMBER(10) with no scale info → DECIMAL(38, 19) (treated as bare NUMBER) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(10) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19)); + } + + // --- Explicit integer (scale = 0) tests --- + + @Test + void testNumberPrecision1ShouldBeBigint() { + // NUMBER(1, 0) → BIGINT (precision <= 18) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(1) + .scale(0) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.BIGINT()); + } + @Test void testNumberPrecision18ShouldBeBigint() { - // NUMBER(18, 0) → BIGINT (precision <= 18) + // NUMBER(18, 0) → BIGINT (boundary: precision = 18) Column column = Column.editor() .name("col") @@ -49,7 +114,7 @@ void testNumberPrecision18ShouldBeBigint() { @Test void testNumberPrecision19ShouldBeDecimal() { - // NUMBER(19, 0) → DECIMAL(19, 0) (precision > 18) + // NUMBER(19, 0) → DECIMAL(19, 0) (boundary: precision = 19) Column column = Column.editor() .name("col") @@ -80,20 +145,23 @@ void testNumberPrecision38ShouldBeDecimal() { } @Test - void testNumberNoPrecisionShouldBeDecimalMaxPrecision() { - // NUMBER (no precision, length=0) → DECIMAL(MAX_PRECISION, 0) + void testNumberPrecision19NotNullShouldBeDecimalNotNull() { + // NOT NULL NUMBER(19, 0) → DECIMAL(19, 0).notNull() Column column = Column.editor() .name("col") .type("NUMBER") .jdbcType(Types.NUMERIC) - .length(0) - .optional(true) + .length(19) + .scale(0) + .optional(false) .create(); DataType result = OracleTypeUtils.fromDbzColumn(column); - assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 0)); + assertThat(result).isEqualTo(DataTypes.DECIMAL(19, 0).notNull()); } + // --- Decimal (scale > 0) tests --- + @Test void testNumberWithScaleShouldBeDecimal() { // NUMBER(10, 6) → DECIMAL(10, 6) @@ -111,34 +179,104 @@ void testNumberWithScaleShouldBeDecimal() { } @Test - void testNumberPrecision1ShouldBeBigint() { - // NUMBER(1, 0) → BIGINT (precision <= 18) + void testNumberWithScaleBoundaryShouldBeDecimal() { + // NUMBER(38, 36) → DECIMAL(38, 36) (scale at boundary) Column column = Column.editor() .name("col") .type("NUMBER") .jdbcType(Types.NUMERIC) - .length(1) - .scale(0) + .length(38) + .scale(36) .optional(true) .create(); DataType result = OracleTypeUtils.fromDbzColumn(column); - assertThat(result).isEqualTo(DataTypes.BIGINT()); + assertThat(result).isEqualTo(DataTypes.DECIMAL(38, 36)); } + // --- NUMBER(*, s) (precision == 0, scale > 0) tests --- + @Test - void testNumberNotNullPrecision19() { - // NOT NULL NUMBER(19, 0) → DECIMAL(19, 0).notNull() + void testNumberStarPrecisionShouldBeDecimal38() { + // NUMBER(*, 2) → DECIMAL(38, 2) (precision == 0 defaults to MAX_PRECISION) Column column = Column.editor() .name("col") .type("NUMBER") .jdbcType(Types.NUMERIC) - .length(19) - .scale(0) + .length(0) + .scale(2) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 2)); + } + + @Test + void testNumberStarPrecisionNotNullShouldBeDecimal38NotNull() { + // NOT NULL NUMBER(*, 4) → DECIMAL(38, 4).notNull() + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(0) + .scale(4) .optional(false) .create(); DataType result = OracleTypeUtils.fromDbzColumn(column); - assertThat(result).isEqualTo(DataTypes.DECIMAL(19, 0).notNull()); + assertThat(result).isEqualTo(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 4).notNull()); + } + + // --- Scale < 0 (negative scale) downgrade tests --- + + @Test + void testNegativeScaleShouldBeString() { + // NUMBER(10, -2) → STRING (negative scale downgraded) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(10) + .scale(-2) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.STRING()); + } + + @Test + void testNegativeScaleStarPrecisionShouldBeString() { + // NUMBER(*, -3) → STRING (negative scale downgraded) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(0) + .scale(-3) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.STRING()); + } + + // --- Scale > 36 downgrade tests --- + + @Test + void testScaleGreaterThan36ShouldBeString() { + // NUMBER(10, 37) → STRING (scale > 36 downgraded) + Column column = + Column.editor() + .name("col") + .type("NUMBER") + .jdbcType(Types.NUMERIC) + .length(10) + .scale(37) + .optional(true) + .create(); + DataType result = OracleTypeUtils.fromDbzColumn(column); + assertThat(result).isEqualTo(DataTypes.STRING()); } } From ecec88b369ee3f6a924be1afc13498594d1dd2b0 Mon Sep 17 00:00:00 2001 From: mike Date: Wed, 24 Jun 2026 03:54:30 +0000 Subject: [PATCH 08/12] Fix VAL_NUMBER_36_NEGATIVE_SCALE expected value in OracleFullTypesITCase --- .../cdc/connectors/oracle/source/OracleFullTypesITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java index 0b1a1f3ccfb..cbfed863e8f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java @@ -143,7 +143,7 @@ private void testCommonDataTypes() throws Exception { BinaryStringData.fromString("9900"), BinaryStringData.fromString("999999990"), BinaryStringData.fromString("999999999999999900"), - BinaryStringData.fromString("9.99999999999999999999999999999999999E+37"), + BinaryStringData.fromString("9.9999999999999999999999999999999999900E+37"), TimestampData.fromLocalDateTime( LocalDateTime.parse( "2022-10-30 00:00:00", From a9a3698719cc5e498ba7529497a17864ce5482c9 Mon Sep 17 00:00:00 2001 From: mike Date: Fri, 26 Jun 2026 11:20:42 +0800 Subject: [PATCH 09/12] Fix schema/runtime type inference mismatch for Oracle NUMBER and DATE --- .../source/OracleSchemaDataTypeInference.java | 52 ++++++++- .../oracle/utils/OracleTypeUtils.java | 43 ++++++-- .../oracle/source/OracleFullTypesITCase.java | 30 +++-- .../OracleSchemaDataTypeInferenceTest.java | 104 ++++++++++++++++++ .../oracle/utils/OracleTypeUtilsTest.java | 6 +- 5 files changed, 204 insertions(+), 31 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java index 79fe8bbad6b..4366ed62df6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java @@ -26,6 +26,7 @@ import io.debezium.data.VariableScaleDecimal; import io.debezium.data.geometry.Geometry; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; /** {@link DataType} inference for oracle debezium {@link Schema}. */ @Internal @@ -33,14 +34,61 @@ public class OracleSchemaDataTypeInference extends DebeziumSchemaDataTypeInferen private static final long serialVersionUID = 1L; + /** + * Inclusive millis value for 1900-01-01T00:00:00Z. Lower bound for the heuristic that + * recognises Oracle DATE columns encoded as INT64 epoch millis. + */ + private static final long ORACLE_DATE_MIN_MILLIS = -2208988800000L; + + /** Inclusive millis value for 2100-01-01T00:00:00Z. Upper bound for the heuristic. */ + private static final long ORACLE_DATE_MAX_MILLIS = 4133980800000L; + + @Override + protected DataType inferInt64(Object value, Schema schema) { + // Oracle DATE columns are read out of LogMiner / SQL redo logs as epoch + // milliseconds (INT64) with the time-of-day portion always being + // midnight. Debezium encodes them with the same schema names as + // TIMESTAMP (e.g. io.debezium.time.Timestamp), so the default inference + // here would return TIMESTAMP(3) and corrupt the runtime conversion + // path. Detect this special case and return DATE so the value round-trips + // as a LocalDate in the BinaryRecordData. + if (value instanceof Long) { + long millis = (Long) value; + if (millis >= ORACLE_DATE_MIN_MILLIS + && millis <= ORACLE_DATE_MAX_MILLIS + && millis % 86_400_000L == 0L) { + return DataTypes.DATE(); + } + } + return super.inferInt64(value, schema); + } + + @Override protected DataType inferStruct(Object value, Schema schema) { // the Geometry datatype in oracle will be converted to // a String with Json format if (Geometry.LOGICAL_NAME.equals(schema.name())) { return DataTypes.STRING(); } else if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { - // Oracle bare NUMBER uses VariableScaleDecimal encoding. - // Return DECIMAL(38, 19) to match OracleTypeUtils. + // For Oracle bare NUMBER and explicit-precision positive-scale + // NUMBER with p - s > 18, Debezium encodes the value as a + // VariableScaleDecimal struct. We return DECIMAL(38, 19) to + // match OracleTypeUtils' schema-level choice (16-byte + // BinaryRecordData layout is the same for any DECIMAL). + // + // For negative-scale NUMBER with (p - s) > 18 (e.g. NUMBER(36, -2)), + // Debezium also encodes as VariableScaleDecimal but the integer + // range exceeds BIGINT. OracleTypeUtils returns STRING for these + // to avoid LONG overflow. We must mirror that STRING choice here + // at runtime, otherwise the schema/runtime type mismatch would + // corrupt the BinaryRecordData layout. + if (value instanceof Struct) { + Struct struct = (Struct) value; + Integer dbzScale = struct.getInt32(VariableScaleDecimal.SCALE_FIELD); + if (dbzScale != null && dbzScale < 0) { + return DataTypes.STRING(); + } + } return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19); } else { return super.inferStruct(value, schema); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java index 1ea32d8aaac..6a334e207b9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtils.java @@ -75,28 +75,51 @@ private static DataType convertFromColumn(Column column) { case Types.DECIMAL: { // Bare NUMBER (scale unspecified): floating-point semantic, - // can store both integers and decimals up to 38 significant digits. - // Use DECIMAL(38, 19) as a balanced universal numeric type. + // can store both integers and decimals up to 38 significant + // digits. Use DECIMAL(38, 19) as a balanced universal + // numeric type, matching Debezium's VariableScaleDecimal + // encoding and OracleSchemaDataTypeInference.inferStruct. if (!column.scale().isPresent()) { return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19); } - int precision = column.length(); - int scale = column.scale().get(); - // precision == 0 (e.g. NUMBER(*, s)) means unspecified. int p = precision > 0 ? precision : DecimalType.MAX_PRECISION; + int scale = column.scale().get(); - // scale < 0 or > 36: not safely representable as DECIMAL, downgrade to STRING. - if (scale < 0 || scale > 36) { + // scale < 0: value = unscaled * 10^|scale|. Debezium + // encodes these as INT8 / INT16 / INT32 / INT64 based on + // (precision - scale), which is the number of integer + // digits. The default Debezium runtime + // (DebeziumSchemaDataTypeInference) maps those to + // TINYINT / SMALLINT / INT / BIGINT — all of which live + // in the same 8-byte compact slot of BinaryRecordData's + // fixed area. So we mirror that family here to keep the + // schema and runtime layers in sync. When (precision - + // scale) exceeds 18 the value can no longer fit in a + // BIGINT and Debezium falls back to VariableScaleDecimal; + // we map that case to STRING because the integer range + // is genuinely beyond any native Flink CDC type. + if (scale < 0) { + int intDigits = p - scale; // p + |scale| + if (intDigits <= 18) { + return DataTypes.BIGINT(); + } + return DataTypes.STRING(); + } + // scale > 36: not safely representable as DECIMAL. + if (scale > 36) { return DataTypes.STRING(); } - if (scale == 0) { - // Explicit integer: use BIGINT for p <= 18, DECIMAL otherwise. + // Explicit integer: BIGINT for p <= 18 (Debezium encodes + // as INT8/16/32/64, parent class runtime returns + // TINYINT/SMALLINT/INT/BIGINT — same 8-byte-or-less + // family), DECIMAL(p, 0) for p > 18 (Debezium encodes + // as VariableScaleDecimal, runtime returns DECIMAL, + // BinaryRecordData layout matches 16-byte DECIMAL). return p <= 18 ? DataTypes.BIGINT() : DataTypes.DECIMAL(p, 0); } - // 1 <= scale <= 36: standard decimal with fractional part. return DataTypes.DECIMAL(p, scale); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java index cbfed863e8f..d60b3742ca4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.DateData; import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; @@ -113,7 +114,7 @@ private void testCommonDataTypes() throws Exception { Object[] expectedSnapshot = new Object[] { - DecimalData.fromBigDecimal(new BigDecimal("1"), 9, 0), + 1L, BinaryStringData.fromString("vc2"), BinaryStringData.fromString("vc2"), BinaryStringData.fromString("nvc2"), @@ -139,15 +140,12 @@ private void testCommonDataTypes() throws Exception { 9999L, 999999999L, 999999999999999999L, - BinaryStringData.fromString("90"), - BinaryStringData.fromString("9900"), - BinaryStringData.fromString("999999990"), - BinaryStringData.fromString("999999999999999900"), - BinaryStringData.fromString("9.9999999999999999999999999999999999900E+37"), - TimestampData.fromLocalDateTime( - LocalDateTime.parse( - "2022-10-30 00:00:00", - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))), + 90L, + 9900L, + 999999990L, + 999999999999999900L, + BinaryStringData.fromString("9.99999999999999999999999999999999999E+37"), + DateData.fromIsoLocalDateString("2022-10-30"), TimestampData.fromLocalDateTime( LocalDateTime.parse( "2022-10-30 12:34:56.00789", @@ -222,7 +220,7 @@ private FlinkSourceProvider getFlinkSourceProvider(String[] captureTables) { private static final RowType COMMON_TYPES = RowType.of( - DataTypes.DECIMAL(9, 0).notNull(), + DataTypes.BIGINT().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), @@ -248,12 +246,12 @@ private FlinkSourceProvider getFlinkSourceProvider(String[] captureTables) { DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.TIMESTAMP(), + DataTypes.DATE(), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(2), DataTypes.TIMESTAMP(4), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java new file mode 100644 index 00000000000..443585cd713 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.oracle.source; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; + +import io.debezium.time.MicroTimestamp; +import io.debezium.time.Timestamp; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.junit.jupiter.api.Test; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link OracleSchemaDataTypeInference}. */ +class OracleSchemaDataTypeInferenceTest { + + private static final long ONE_DAY_IN_MILLIS = 86_400_000L; + + private final OracleSchemaDataTypeInference inference = new OracleSchemaDataTypeInference(); + + private static Schema int64Schema(String name) { + return SchemaBuilder.int64().name(name).optional().build(); + } + + @Test + void testOracleDateMidnightIsRecognisedAsDate() { + // Oracle DATE columns arrive from LogMiner as INT64 epoch millis with + // midnight as the time-of-day portion. The schema name matches the + // regular Timestamp semantic type, but the value round-trips as a + // LocalDate. + Schema schema = int64Schema(Timestamp.SCHEMA_NAME); + long epochMillisForDate = LocalDate.of(2022, 10, 30).toEpochDay() * ONE_DAY_IN_MILLIS; + DataType result = inference.infer(epochMillisForDate, schema); + assertThat(result).isEqualTo(DataTypes.DATE()); + } + + @Test + void testOracleDateAtUnixEpochIsRecognisedAsDate() { + Schema schema = int64Schema(Timestamp.SCHEMA_NAME); + DataType result = inference.infer(0L, schema); + assertThat(result).isEqualTo(DataTypes.DATE()); + } + + @Test + void testOracleDateBeforeLowerBoundIsNotRecognisedAsDate() { + Schema schema = int64Schema(Timestamp.SCHEMA_NAME); + // 1899-12-31T00:00:00Z is below the 1900 lower bound. + long millis = -2208988800000L - ONE_DAY_IN_MILLIS; + DataType result = inference.infer(millis, schema); + assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3)); + } + + @Test + void testOracleTimestampWithNonMidnightTimeIsNotRecognisedAsDate() { + Schema schema = int64Schema(Timestamp.SCHEMA_NAME); + // 2022-10-30T00:00:01Z is a valid TIMESTAMP value but not a DATE. + long millis = LocalDate.of(2022, 10, 30).toEpochDay() * ONE_DAY_IN_MILLIS + 1000L; + DataType result = inference.infer(millis, schema); + assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3)); + } + + @Test + void testMicrosecondTimestampIsStillRecognisedAsMicroTimestamp() { + Schema schema = int64Schema(MicroTimestamp.SCHEMA_NAME); + // 2022-10-30T12:34:56.00789Z in microseconds. Value is not a midnight + // boundary so it must still be treated as TIMESTAMP(6). + long micros = + LocalDateTime.of(2022, 10, 30, 12, 34, 56, 7_890_000) + .toInstant(ZoneOffset.UTC) + .toEpochMilli() + * 1000L; + DataType result = inference.infer(micros, schema); + assertThat(result).isEqualTo(DataTypes.TIMESTAMP(6)); + } + + @Test + void testNonLongValueKeepsDefaultInference() { + Schema schema = int64Schema(Timestamp.SCHEMA_NAME); + // value is not a Long so the DATE heuristic must not fire. + DataType result = inference.infer("not a number", schema); + assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3)); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java index 0acbd99b67e..459f423f8c4 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/utils/OracleTypeUtilsTest.java @@ -231,8 +231,8 @@ void testNumberStarPrecisionNotNullShouldBeDecimal38NotNull() { // --- Scale < 0 (negative scale) downgrade tests --- @Test - void testNegativeScaleShouldBeString() { - // NUMBER(10, -2) → STRING (negative scale downgraded) + void testNegativeScaleShouldBeBigint() { + // NUMBER(10, -2) → BIGINT (negative scale, p+|s|=12 fits in BIGINT) Column column = Column.editor() .name("col") @@ -243,7 +243,7 @@ void testNegativeScaleShouldBeString() { .optional(true) .create(); DataType result = OracleTypeUtils.fromDbzColumn(column); - assertThat(result).isEqualTo(DataTypes.STRING()); + assertThat(result).isEqualTo(DataTypes.BIGINT()); } @Test From 6ca3a7f74ebd69cbf6f95142311339c3e30aaf03 Mon Sep 17 00:00:00 2001 From: mike Date: Fri, 26 Jun 2026 14:25:55 +0800 Subject: [PATCH 10/12] Remove broken DATE heuristic and fix negative-scale NUMBER test expectations --- .../source/OracleSchemaDataTypeInference.java | 29 ----- .../oracle/source/OracleFullTypesITCase.java | 8 +- .../source/OracleMetadataAccessorITCase.java | 8 +- .../OracleSchemaDataTypeInferenceTest.java | 104 ------------------ 4 files changed, 9 insertions(+), 140 deletions(-) delete mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java index 4366ed62df6..df0ec5f0c19 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInference.java @@ -34,35 +34,6 @@ public class OracleSchemaDataTypeInference extends DebeziumSchemaDataTypeInferen private static final long serialVersionUID = 1L; - /** - * Inclusive millis value for 1900-01-01T00:00:00Z. Lower bound for the heuristic that - * recognises Oracle DATE columns encoded as INT64 epoch millis. - */ - private static final long ORACLE_DATE_MIN_MILLIS = -2208988800000L; - - /** Inclusive millis value for 2100-01-01T00:00:00Z. Upper bound for the heuristic. */ - private static final long ORACLE_DATE_MAX_MILLIS = 4133980800000L; - - @Override - protected DataType inferInt64(Object value, Schema schema) { - // Oracle DATE columns are read out of LogMiner / SQL redo logs as epoch - // milliseconds (INT64) with the time-of-day portion always being - // midnight. Debezium encodes them with the same schema names as - // TIMESTAMP (e.g. io.debezium.time.Timestamp), so the default inference - // here would return TIMESTAMP(3) and corrupt the runtime conversion - // path. Detect this special case and return DATE so the value round-trips - // as a LocalDate in the BinaryRecordData. - if (value instanceof Long) { - long millis = (Long) value; - if (millis >= ORACLE_DATE_MIN_MILLIS - && millis <= ORACLE_DATE_MAX_MILLIS - && millis % 86_400_000L == 0L) { - return DataTypes.DATE(); - } - } - return super.inferInt64(value, schema); - } - @Override protected DataType inferStruct(Object value, Schema schema) { // the Geometry datatype in oracle will be converted to diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java index d60b3742ca4..e8b234c2445 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleFullTypesITCase.java @@ -20,7 +20,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.configuration.Configuration; -import org.apache.flink.cdc.common.data.DateData; import org.apache.flink.cdc.common.data.DecimalData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; @@ -145,7 +144,10 @@ private void testCommonDataTypes() throws Exception { 999999990L, 999999999999999900L, BinaryStringData.fromString("9.99999999999999999999999999999999999E+37"), - DateData.fromIsoLocalDateString("2022-10-30"), + TimestampData.fromLocalDateTime( + LocalDateTime.parse( + "2022-10-30 00:00:00", + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))), TimestampData.fromLocalDateTime( LocalDateTime.parse( "2022-10-30 12:34:56.00789", @@ -251,7 +253,7 @@ private FlinkSourceProvider getFlinkSourceProvider(String[] captureTables) { DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.STRING(), - DataTypes.DATE(), + DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(2), DataTypes.TIMESTAMP(4), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java index b90e90cf020..8ccfea1675e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleMetadataAccessorITCase.java @@ -137,10 +137,10 @@ private void testAccessCommonTypesSchema() throws Exception { DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(6), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java deleted file mode 100644 index 443585cd713..00000000000 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OracleSchemaDataTypeInferenceTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.connectors.oracle.source; - -import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.common.types.DataTypes; - -import io.debezium.time.MicroTimestamp; -import io.debezium.time.Timestamp; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.junit.jupiter.api.Test; - -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneOffset; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link OracleSchemaDataTypeInference}. */ -class OracleSchemaDataTypeInferenceTest { - - private static final long ONE_DAY_IN_MILLIS = 86_400_000L; - - private final OracleSchemaDataTypeInference inference = new OracleSchemaDataTypeInference(); - - private static Schema int64Schema(String name) { - return SchemaBuilder.int64().name(name).optional().build(); - } - - @Test - void testOracleDateMidnightIsRecognisedAsDate() { - // Oracle DATE columns arrive from LogMiner as INT64 epoch millis with - // midnight as the time-of-day portion. The schema name matches the - // regular Timestamp semantic type, but the value round-trips as a - // LocalDate. - Schema schema = int64Schema(Timestamp.SCHEMA_NAME); - long epochMillisForDate = LocalDate.of(2022, 10, 30).toEpochDay() * ONE_DAY_IN_MILLIS; - DataType result = inference.infer(epochMillisForDate, schema); - assertThat(result).isEqualTo(DataTypes.DATE()); - } - - @Test - void testOracleDateAtUnixEpochIsRecognisedAsDate() { - Schema schema = int64Schema(Timestamp.SCHEMA_NAME); - DataType result = inference.infer(0L, schema); - assertThat(result).isEqualTo(DataTypes.DATE()); - } - - @Test - void testOracleDateBeforeLowerBoundIsNotRecognisedAsDate() { - Schema schema = int64Schema(Timestamp.SCHEMA_NAME); - // 1899-12-31T00:00:00Z is below the 1900 lower bound. - long millis = -2208988800000L - ONE_DAY_IN_MILLIS; - DataType result = inference.infer(millis, schema); - assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3)); - } - - @Test - void testOracleTimestampWithNonMidnightTimeIsNotRecognisedAsDate() { - Schema schema = int64Schema(Timestamp.SCHEMA_NAME); - // 2022-10-30T00:00:01Z is a valid TIMESTAMP value but not a DATE. - long millis = LocalDate.of(2022, 10, 30).toEpochDay() * ONE_DAY_IN_MILLIS + 1000L; - DataType result = inference.infer(millis, schema); - assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3)); - } - - @Test - void testMicrosecondTimestampIsStillRecognisedAsMicroTimestamp() { - Schema schema = int64Schema(MicroTimestamp.SCHEMA_NAME); - // 2022-10-30T12:34:56.00789Z in microseconds. Value is not a midnight - // boundary so it must still be treated as TIMESTAMP(6). - long micros = - LocalDateTime.of(2022, 10, 30, 12, 34, 56, 7_890_000) - .toInstant(ZoneOffset.UTC) - .toEpochMilli() - * 1000L; - DataType result = inference.infer(micros, schema); - assertThat(result).isEqualTo(DataTypes.TIMESTAMP(6)); - } - - @Test - void testNonLongValueKeepsDefaultInference() { - Schema schema = int64Schema(Timestamp.SCHEMA_NAME); - // value is not a Long so the DATE heuristic must not fire. - DataType result = inference.infer("not a number", schema); - assertThat(result).isEqualTo(DataTypes.TIMESTAMP(3)); - } -} From 82a3dd094e8f4518996825731872626659a9c7a8 Mon Sep 17 00:00:00 2001 From: mike Date: Fri, 26 Jun 2026 17:50:27 +0800 Subject: [PATCH 11/12] Update Oracle E2E test expectations for NUMBER(38,0) columns --- .../org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java index e6b5387a224..45759fd99bb 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java @@ -171,7 +171,7 @@ void testSyncWholeDatabase() throws Exception { "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` DECIMAL(38, 0) NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( @@ -225,7 +225,7 @@ void testSyncWholeDatabase() throws Exception { waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` DECIMAL(38, 0) NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( From 1f38f5ce1b69db30738eb3b1529e535829dbb054 Mon Sep 17 00:00:00 2001 From: mike Date: Fri, 26 Jun 2026 18:06:34 +0800 Subject: [PATCH 12/12] Fix Oracle E2E test to expect correct ID values after NUMBER type mapping fix --- .../cdc/pipeline/tests/OracleE2eITCase.java | 84 +++++++++---------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java index 45759fd99bb..498012e54d8 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java @@ -173,47 +173,47 @@ void testSyncWholeDatabase() throws Exception { waitUntilSpecificEvent( "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` DECIMAL(38, 0) NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_4, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[109, user_4, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_3, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_2, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_13, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1012, user_13, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_14, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1013, user_14, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_11, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1010, user_11, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_12, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1011, user_12, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_21, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[2000, user_21, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1009, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_6, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[111, user_6, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_5, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[110, user_5, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_9, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[123, user_9, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_19, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1018, user_19, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_20, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1019, user_20, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_8, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[121, user_8, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_17, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1016, user_17, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_18, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1017, user_18, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_15, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1014, user_15, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_7, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[118, user_7, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691842, user_16, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[1015, user_16, Shanghai, 123567891234], op=INSERT, meta=()}"); stat.execute( "UPDATE DEBEZIUM.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106 "); @@ -227,47 +227,47 @@ void testSyncWholeDatabase() throws Exception { waitUntilSpecificEvent( "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` DECIMAL(38, 0) NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1009, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_21, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[2000, user_21, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_5, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[110, user_5, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_14, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1013, user_14, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_6, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[111, user_6, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_13, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1012, user_13, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_12, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1011, user_12, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_11, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1010, user_11, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_4, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[109, user_4, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_2, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[102, user_2, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_3, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[101, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_18, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1017, user_18, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_9, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[123, user_9, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_17, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1016, user_17, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_16, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1015, user_16, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_15, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1014, user_15, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_7, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[118, user_7, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_20, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1019, user_20, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691841, user_8, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[121, user_8, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_19, Shanghai, 123567891234], op=INSERT, meta=()}"); + "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[1018, user_19, Shanghai, 123567891234], op=INSERT, meta=()}"); } catch (Exception e) { LOG.error("Update table for CDC failed.", e); throw new RuntimeException(e);