Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference;

import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.geometry.Geometry;
import org.apache.kafka.connect.data.Schema;

Expand All @@ -36,6 +38,10 @@ protected DataType inferStruct(Object value, Schema schema) {
// a String with Json format
if (Geometry.LOGICAL_NAME.equals(schema.name())) {
return DataTypes.STRING();
} else if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
// Oracle bare NUMBER uses VariableScaleDecimal encoding.
// Return DECIMAL(38, 19) to match OracleTypeUtils.
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19);
} else {
return super.inferStruct(value, schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;

import io.debezium.relational.Column;
import oracle.jdbc.OracleTypes;
Expand Down Expand Up @@ -72,11 +73,33 @@ private static DataType convertFromColumn(Column column) {
return DataTypes.DOUBLE();
case Types.NUMERIC:
case Types.DECIMAL:
return column.length() == 0
|| !column.scale().isPresent()
|| column.scale().get() <= 0
? DataTypes.BIGINT()
: DataTypes.DECIMAL(column.length(), column.scale().orElse(0));
{
// Bare NUMBER (scale unspecified): floating-point semantic,
// can store both integers and decimals up to 38 significant digits.
// Use DECIMAL(38, 19) as a balanced universal numeric type.
if (!column.scale().isPresent()) {
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19);
}

int precision = column.length();
int scale = column.scale().get();

// precision == 0 (e.g. NUMBER(*, s)) means unspecified.
int p = precision > 0 ? precision : DecimalType.MAX_PRECISION;

// scale < 0 or > 36: not safely representable as DECIMAL, downgrade to STRING.
if (scale < 0 || scale > 36) {
return DataTypes.STRING();
}

if (scale == 0) {
// Explicit integer: use BIGINT for p <= 18, DECIMAL otherwise.
return p <= 18 ? DataTypes.BIGINT() : DataTypes.DECIMAL(p, 0);
}

// 1 <= scale <= 36: standard decimal with fractional part.
return DataTypes.DECIMAL(p, scale);
}
case Types.DATE:
return DataTypes.DATE();
case Types.TIMESTAMP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,22 +128,22 @@ private void testCommonDataTypes() throws Exception {
(float) 6.66,
DecimalData.fromBigDecimal(new BigDecimal("1234.567891"), 10, 6),
DecimalData.fromBigDecimal(new BigDecimal("1234.567891"), 10, 6),
DecimalData.fromBigDecimal(new BigDecimal("77.323"), 10, 3),
DecimalData.fromBigDecimal(new BigDecimal(1), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(22), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(333), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(4444), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(5555), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(1), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(99), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(9999), 10, 0),
DecimalData.fromBigDecimal(new BigDecimal(999999999), 38, 0),
DecimalData.fromBigDecimal(new BigDecimal(999999999999999999L), 38, 0),
90L,
9900L,
999999990L,
999999999999999900L,
BinaryStringData.fromString("9.99999999999999999999999999999999999E+37"),
DecimalData.fromBigDecimal(new BigDecimal("77.323"), 38, 19),
DecimalData.fromBigDecimal(new BigDecimal(1), 38, 19),
DecimalData.fromBigDecimal(new BigDecimal(22), 38, 19),
DecimalData.fromBigDecimal(new BigDecimal(333), 38, 19),
DecimalData.fromBigDecimal(new BigDecimal(4444), 38, 0),
DecimalData.fromBigDecimal(new BigDecimal(5555), 38, 0),
1L,
99L,
9999L,
999999999L,
999999999999999999L,
BinaryStringData.fromString("90"),
BinaryStringData.fromString("9900"),
BinaryStringData.fromString("999999990"),
BinaryStringData.fromString("999999999999999900"),
BinaryStringData.fromString("9.9999999999999999999999999999999999900E+37"),
TimestampData.fromLocalDateTime(
LocalDateTime.parse(
"2022-10-30 00:00:00",
Expand Down Expand Up @@ -237,21 +237,21 @@ private FlinkSourceProvider getFlinkSourceProvider(String[] captureTables) {
DataTypes.FLOAT(),
DataTypes.DECIMAL(10, 6),
DataTypes.DECIMAL(10, 6),
DataTypes.DECIMAL(10, 3),
DataTypes.DECIMAL(10, 0),
DataTypes.DECIMAL(10, 0),
DataTypes.DECIMAL(10, 0),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 0),
DataTypes.DECIMAL(38, 0),
DataTypes.DECIMAL(1, 0),
DataTypes.DECIMAL(2, 0),
DataTypes.DECIMAL(4, 0),
DataTypes.DECIMAL(9, 0),
DataTypes.DECIMAL(18, 0),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(),
DataTypes.TIMESTAMP(6),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,22 @@ private void testAccessCommonTypesSchema() throws Exception {
DataTypes.FLOAT(),
DataTypes.DECIMAL(10, 6),
DataTypes.DECIMAL(10, 6),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 19),
DataTypes.DECIMAL(38, 0),
DataTypes.DECIMAL(38, 0),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP(2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.junit.jupiter.api.TestMethodOrder;
import org.testcontainers.lifecycle.Startables;

import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
Expand Down Expand Up @@ -400,7 +403,10 @@ public void testAlterAddAllColumnTypeStatement() throws Exception {
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("COLS27", DataTypes.BIGINT())))));
Column.physicalColumn(
"COLS27",
DataTypes.DECIMAL(
DecimalType.MAX_PRECISION, 19))))));
statement.execute(
String.format("ALTER TABLE %s.PRODUCTS ADD COLS28 TIMESTAMP(2)", "DEBEZIUM"));
expected.add(
Expand Down Expand Up @@ -983,7 +989,9 @@ public void testGeometryType() throws Exception {
new CreateTableEvent(
TableId.tableId("DEBEZIUM", "MYLAKE"),
Schema.newBuilder()
.physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull())
.physicalColumn(
"FEATURE_ID",
DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19).notNull())
.physicalColumn("NAME", DataTypes.VARCHAR(32))
.physicalColumn("SHAPE", DataTypes.STRING())
.primaryKey(Arrays.asList("FEATURE_ID"))
Expand All @@ -992,7 +1000,9 @@ public void testGeometryType() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.BIGINT(), DataTypes.VARCHAR(32), DataTypes.STRING()
DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 19),
DataTypes.VARCHAR(32),
DataTypes.STRING()
},
new String[] {"FEATURE_ID", "NAME", "SHAPE"});
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
Expand All @@ -1004,7 +1014,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
1L,
DecimalData.fromBigDecimal(new BigDecimal(1), 38, 19),
BinaryStringData.fromString("center"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[116.6,24.343]]\",\"type\":\"Point\",\"srid\":4326}")
Expand All @@ -1014,7 +1024,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
2L,
DecimalData.fromBigDecimal(new BigDecimal(2), 38, 19),
BinaryStringData.fromString("two-dimensional point"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[10.0,5.0]]\",\"type\":\"Point\",\"srid\":4326}")
Expand All @@ -1024,7 +1034,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
3L,
DecimalData.fromBigDecimal(new BigDecimal(3), 38, 19),
BinaryStringData.fromString("straight line segment"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[10.0,10.0],[20.0,20.0],[30.0,30.0]]\",\"type\":\"LineString\",\"srid\":4326}")
Expand All @@ -1034,7 +1044,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
4L,
DecimalData.fromBigDecimal(new BigDecimal(4), 38, 19),
BinaryStringData.fromString("polyline"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[5.0,5.0],[15.0,10.0],[25.0,5.0],[35.0,10.0]]\",\"type\":\"LineString\",\"srid\":4326}")
Expand All @@ -1044,7 +1054,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
5L,
DecimalData.fromBigDecimal(new BigDecimal(5), 38, 19),
BinaryStringData.fromString("rectangle"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[0.0,0.0],[10.0,0.0],[10.0,10.0],[0.0,10.0],[0.0,0.0]]\",\"type\":\"Polygon\",\"srid\":4326}")
Expand All @@ -1054,7 +1064,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
6L,
DecimalData.fromBigDecimal(new BigDecimal(6), 38, 19),
BinaryStringData.fromString("Multi-Point"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[10.0,10.0],[20.0,20.0],[30.0,30.0]]\",\"type\":\"MultiPoint\",\"srid\":4326}")
Expand All @@ -1064,7 +1074,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
7L,
DecimalData.fromBigDecimal(new BigDecimal(7), 38, 19),
BinaryStringData.fromString("Multi line collection"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[10.0,10.0],[20.0,10.0],[10.0,20.0],[20.0,20.0]]\",\"type\":\"MultiLineString\",\"srid\":4326}")
Expand All @@ -1075,7 +1085,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
8L,
DecimalData.fromBigDecimal(new BigDecimal(8), 38, 19),
BinaryStringData.fromString("Multi-Polygon"),
BinaryStringData.fromString(
"{\"coordinates\":\"[[0.0,0.0],[10.0,0.0],[10.0,10.0],[0.0,10.0],[0.0,0.0],[15.0,15.0],[25.0,15.0],[25.0,25.0],[15.0,25.0],[15.0,15.0]]\",\"type\":\"MultiPolygon\",\"srid\":4326}")
Expand All @@ -1085,7 +1095,7 @@ public void testGeometryType() throws Exception {
tableId,
generator.generate(
new Object[] {
9L,
DecimalData.fromBigDecimal(new BigDecimal(9), 38, 19),
BinaryStringData.fromString("Geometry Collection"),
BinaryStringData.fromString(
"{\"geometries\":\"GEOMETRYCOLLECTION (POINT (25 25), LINESTRING (30 30, 35 25), POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0)))\",\"type\":\"GeometryCollection\",\"srid\":4326}")
Expand Down
Loading