Skip to content

Commit 19b5d54

Browse files
committed
avoid outputting infinite numeric values
1 parent 0f0cfa0 commit 19b5d54

7 files changed

Lines changed: 191 additions & 64 deletions

File tree

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/cdc/PostgresCustomConverter.kt

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,13 @@ import org.postgresql.jdbc.PgArray
3131
import org.postgresql.util.PGInterval
3232

3333
class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn?> {
34-
private val DATE_TYPES =
35-
arrayOf<String?>("DATE", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ")
36-
private val BIT_TYPES = arrayOf<String?>("BIT", "VARBIT")
37-
private val MONEY_ITEM_TYPE = arrayOf<String?>("MONEY")
34+
private val DATE_TYPES = setOf("DATE", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ")
35+
private val BIT_TYPES = setOf("BIT", "VARBIT")
36+
private val MONEY_ITEM_TYPE = "MONEY"
3837
private val GEOMETRICS_TYPES =
39-
arrayOf<String?>("BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH")
38+
setOf("BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH")
4039
private val TEXT_TYPES =
41-
arrayOf<String?>(
40+
setOf(
4241
"VARCHAR",
4342
"VARBINARY",
4443
"BLOB",
@@ -51,9 +50,9 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
5150
"TSQUERY",
5251
"PG_LSN",
5352
)
54-
private val NUMERIC_TYPES = arrayOf<String?>("NUMERIC", "DECIMAL")
53+
private val NUMERIC_TYPES = setOf("NUMERIC", "DECIMAL")
5554
private val ARRAY_TYPES =
56-
arrayOf<String?>(
55+
setOf(
5756
"_NAME",
5857
"_NUMERIC",
5958
"_BYTEA",
@@ -79,43 +78,22 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
7978
registration: CustomConverter.ConverterRegistration<SchemaBuilder?>?
8079
) {
8180
if (field == null || registration == null) return
82-
if (
83-
Arrays.stream<String?>(DATE_TYPES).anyMatch { s: String? ->
84-
s.equals(field.typeName(), ignoreCase = true)
85-
}
86-
) {
81+
val upperType = field.typeName().uppercase()
82+
if (DATE_TYPES.contains(upperType)) {
8783
registerDate(field, registration)
8884
} else if (
89-
Arrays.stream<String?>(TEXT_TYPES).anyMatch { s: String? ->
90-
s.equals(field.typeName(), ignoreCase = true)
91-
} ||
92-
Arrays.stream<String?>(GEOMETRICS_TYPES).anyMatch { s: String? ->
93-
s.equals(field.typeName(), ignoreCase = true)
94-
} ||
95-
Arrays.stream<String?>(BIT_TYPES).anyMatch { s: String? ->
96-
s.equals(field.typeName(), ignoreCase = true)
97-
}
85+
TEXT_TYPES.contains(upperType) ||
86+
GEOMETRICS_TYPES.contains(upperType) ||
87+
BIT_TYPES.contains(upperType)
9888
) {
9989
registerText(field, registration)
100-
} else if (
101-
Arrays.stream<String?>(MONEY_ITEM_TYPE).anyMatch { s: String? ->
102-
s.equals(field.typeName(), ignoreCase = true)
103-
}
104-
) {
90+
} else if (MONEY_ITEM_TYPE == upperType) {
10591
registerMoney(field, registration)
106-
} else if (BYTEA_TYPE.equals(field.typeName(), ignoreCase = true)) {
92+
} else if (BYTEA_TYPE == upperType) {
10793
registerBytea(field, registration)
108-
} else if (
109-
Arrays.stream<String?>(NUMERIC_TYPES).anyMatch { s: String? ->
110-
s.equals(field.typeName(), ignoreCase = true)
111-
}
112-
) {
94+
} else if (NUMERIC_TYPES.contains(upperType)) {
11395
registerNumber(field, registration)
114-
} else if (
115-
Arrays.stream<String?>(ARRAY_TYPES).anyMatch { s: String? ->
116-
s.equals(field.typeName(), ignoreCase = true)
117-
}
118-
) {
96+
} else if (ARRAY_TYPES.contains(upperType)) {
11997
registerArray(field, registration)
12098
}
12199
}

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/cdc/PostgresSourceDebeziumOperations.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import io.airbyte.cdk.discover.FieldType
2020
import io.airbyte.cdk.jdbc.ArrayFieldType
2121
import io.airbyte.cdk.jdbc.BigDecimalFieldType
2222
import io.airbyte.cdk.jdbc.BigIntegerFieldType
23-
import io.airbyte.cdk.jdbc.DoubleFieldType
24-
import io.airbyte.cdk.jdbc.FloatFieldType
2523
import io.airbyte.cdk.jdbc.StringFieldType
2624
import io.airbyte.cdk.output.sockets.FieldValueEncoder
2725
import io.airbyte.cdk.output.sockets.NativeRecordPayload
@@ -43,6 +41,8 @@ import io.airbyte.cdk.util.Jsons
4341
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcConnectionFactory
4442
import io.airbyte.integrations.source.postgres.config.CdcIncrementalConfiguration
4543
import io.airbyte.integrations.source.postgres.config.PostgresSourceConfiguration
44+
import io.airbyte.integrations.source.postgres.operations.types.PostgresDoubleFieldType
45+
import io.airbyte.integrations.source.postgres.operations.types.PostgresFloatFieldType
4646
import io.debezium.connector.postgresql.PostgresConnector
4747
import io.debezium.connector.postgresql.connection.Lsn
4848
import io.debezium.time.Conversions
@@ -340,8 +340,15 @@ class PostgresSourceDebeziumOperations(
340340
try {
341341
val mappedValue =
342342
when (fieldType) {
343-
FloatFieldType -> Jsons.numberNode(input.floatValue())
344-
DoubleFieldType -> Jsons.numberNode(input.asDouble())
343+
// Narrow numeric nodes to the right precision so the codec's
344+
// IEEE-754 roundtrip check passes (Debezium emits float4 values as
345+
// DoubleNode in JSON). For non-numeric forms — e.g. TextNode("Infinity")
346+
// from non-finite values — pass through so the codec rejects them and
347+
// the framework records DESERIALIZATION_FAILURE_TOTAL.
348+
PostgresFloatFieldType ->
349+
if (input.isNumber) Jsons.numberNode(input.floatValue()) else input
350+
PostgresDoubleFieldType ->
351+
if (input.isNumber) Jsons.numberNode(input.asDouble()) else input
345352
BigDecimalFieldType -> {
346353
if (input.isNumber) input
347354
else Jsons.numberNode(BigDecimal(input.textValue()).stripTrailingZeros())

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/operations/PostgresSourceFieldTypeMapper.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import io.airbyte.cdk.jdbc.BigDecimalFieldType
99
import io.airbyte.cdk.jdbc.BigIntegerFieldType
1010
import io.airbyte.cdk.jdbc.BinaryStreamFieldType
1111
import io.airbyte.cdk.jdbc.BooleanFieldType
12-
import io.airbyte.cdk.jdbc.DoubleFieldType
13-
import io.airbyte.cdk.jdbc.FloatFieldType
1412
import io.airbyte.cdk.jdbc.IntFieldType
1513
import io.airbyte.cdk.jdbc.JdbcFieldType
1614
import io.airbyte.cdk.jdbc.LongFieldType
@@ -23,6 +21,8 @@ import io.airbyte.integrations.source.postgres.operations.types.HstoreFieldType
2321
import io.airbyte.integrations.source.postgres.operations.types.LegacyBooleanBitsFieldType
2422
import io.airbyte.integrations.source.postgres.operations.types.PostgresByteaFieldType
2523
import io.airbyte.integrations.source.postgres.operations.types.PostgresDateFieldType
24+
import io.airbyte.integrations.source.postgres.operations.types.PostgresDoubleFieldType
25+
import io.airbyte.integrations.source.postgres.operations.types.PostgresFloatFieldType
2626
import io.airbyte.integrations.source.postgres.operations.types.PostgresMoneyArrayElementFieldType
2727
import io.airbyte.integrations.source.postgres.operations.types.PostgresMoneyFieldType
2828
import io.airbyte.integrations.source.postgres.operations.types.PostgresTimeFieldType
@@ -67,14 +67,14 @@ class PostgresSourceFieldTypeMapper : JdbcMetadataQuerier.FieldTypeMapper {
6767
// oid type is unsigned and must be cast to Long to avoid truncation
6868
if (type.scalarTypeName == "oid") LongFieldType else IntFieldType
6969
JDBCType.BIGINT -> LongFieldType
70-
JDBCType.REAL -> FloatFieldType
70+
JDBCType.REAL -> PostgresFloatFieldType
7171
JDBCType.FLOAT,
7272
JDBCType.DOUBLE ->
7373
// TODO (https://github.com/airbytehq/airbyte-internal-issues/issues/15879):
7474
// Fix type handling for numeric arrays. Arrays and scalars of money are handled
7575
// differently by the PostgresCustomConverter.
7676
when {
77-
type.scalarTypeName != "money" -> DoubleFieldType
77+
type.scalarTypeName != "money" -> PostgresDoubleFieldType
7878
type.isArray -> PostgresMoneyArrayElementFieldType
7979
else -> PostgresMoneyFieldType
8080
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/* Copyright (c) 2026 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.integrations.source.postgres.operations.types
3+
4+
import com.fasterxml.jackson.databind.JsonNode
5+
import io.airbyte.cdk.data.JsonCodec
6+
import io.airbyte.cdk.data.LeafAirbyteSchemaType
7+
import io.airbyte.cdk.jdbc.JdbcAccessor
8+
import io.airbyte.cdk.jdbc.SymmetricJdbcFieldType
9+
import io.airbyte.cdk.util.Jsons
10+
import java.sql.PreparedStatement
11+
import java.sql.ResultSet
12+
13+
// Postgres double precision permits Infinity, -Infinity, and NaN. Downstream destinations expect
14+
// only JSON numbers. On these values, the accessor throws during JDBC read (recording
15+
// RETRIEVAL_FAILURE_TOTAL in record metadata), and the codec throws during CDC decode (recording
16+
// DESERIALIZATION_FAILURE_TOTAL). The value is then nulled in the emitted record.
17+
object PostgresDoubleFieldType :
18+
SymmetricJdbcFieldType<Double>(
19+
LeafAirbyteSchemaType.NUMBER,
20+
FiniteDoubleAccessor,
21+
FiniteDoubleCodec,
22+
)
23+
24+
object FiniteDoubleCodec : JsonCodec<Double> {
25+
override fun encode(decoded: Double): JsonNode = Jsons.numberNode(decoded)
26+
27+
override fun decode(encoded: JsonNode): Double {
28+
if (!encoded.isNumber) {
29+
throw IllegalArgumentException("non-numeric value $encoded is unsupported")
30+
}
31+
val decoded: Double = encoded.doubleValue()
32+
if (!decoded.isFinite()) {
33+
throw IllegalArgumentException("non-finite value $decoded is unsupported")
34+
}
35+
if (encode(decoded).decimalValue().compareTo(encoded.decimalValue()) != 0) {
36+
throw IllegalArgumentException("invalid IEEE-754 64-bit floating point value $encoded")
37+
}
38+
return decoded
39+
}
40+
}
41+
42+
private object FiniteDoubleAccessor : JdbcAccessor<Double> {
43+
override fun get(rs: ResultSet, colIdx: Int): Double? {
44+
val value = rs.getDouble(colIdx)
45+
if (rs.wasNull()) return null
46+
if (!value.isFinite()) {
47+
throw IllegalStateException("non-finite value $value is unsupported")
48+
}
49+
return value
50+
}
51+
52+
override fun set(stmt: PreparedStatement, paramIdx: Int, value: Double) {
53+
stmt.setDouble(paramIdx, value)
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/* Copyright (c) 2026 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.integrations.source.postgres.operations.types
3+
4+
import com.fasterxml.jackson.databind.JsonNode
5+
import io.airbyte.cdk.data.JsonCodec
6+
import io.airbyte.cdk.data.LeafAirbyteSchemaType
7+
import io.airbyte.cdk.jdbc.JdbcAccessor
8+
import io.airbyte.cdk.jdbc.SymmetricJdbcFieldType
9+
import io.airbyte.cdk.util.Jsons
10+
import java.sql.PreparedStatement
11+
import java.sql.ResultSet
12+
13+
// Postgres real permits Infinity, -Infinity, and NaN. Downstream destinations expect only JSON
14+
// numbers. On these values, the accessor throws during JDBC read (recording
15+
// RETRIEVAL_FAILURE_TOTAL in record metadata), and the codec throws during CDC decode (recording
16+
// DESERIALIZATION_FAILURE_TOTAL). The value is then nulled in the emitted record.
17+
object PostgresFloatFieldType :
18+
SymmetricJdbcFieldType<Float>(
19+
LeafAirbyteSchemaType.NUMBER,
20+
FiniteFloatAccessor,
21+
FiniteFloatCodec,
22+
)
23+
24+
object FiniteFloatCodec : JsonCodec<Float> {
25+
override fun encode(decoded: Float): JsonNode = Jsons.numberNode(decoded)
26+
27+
override fun decode(encoded: JsonNode): Float {
28+
if (!encoded.isNumber) {
29+
throw IllegalArgumentException("non-numeric value $encoded is unsupported")
30+
}
31+
val decoded: Float = encoded.floatValue()
32+
if (!decoded.isFinite()) {
33+
throw IllegalArgumentException("non-finite value $decoded is unsupported")
34+
}
35+
if (encode(decoded).doubleValue().compareTo(encoded.doubleValue()) != 0) {
36+
throw IllegalArgumentException(
37+
"invalid IEEE-754 32-bit floating point value $encoded (type ${encoded.javaClass.canonicalName})"
38+
)
39+
}
40+
return decoded
41+
}
42+
}
43+
44+
private object FiniteFloatAccessor : JdbcAccessor<Float> {
45+
override fun get(rs: ResultSet, colIdx: Int): Float? {
46+
val value = rs.getFloat(colIdx)
47+
if (rs.wasNull()) return null
48+
if (!value.isFinite()) {
49+
throw IllegalStateException("non-finite value $value is unsupported")
50+
}
51+
return value
52+
}
53+
54+
override fun set(stmt: PreparedStatement, paramIdx: Int, value: Float) {
55+
stmt.setFloat(paramIdx, value)
56+
}
57+
}

airbyte-integrations/connectors/source-postgres/src/test-integration/kotlin/io/airbyte/integrations/source/postgres/legacy/AbstractPostgresSourceDatatypeTest.kt

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -400,18 +400,25 @@ abstract class AbstractPostgresSourceDatatypeTest : AbstractSourceDatabaseTypeTe
400400
TestDataHolder.builder()
401401
.sourceType(type)
402402
.airbyteType(JsonSchemaType.NUMBER)
403+
// Non-finite values (Infinity/-Infinity/NaN) are rejected at the source
404+
// and surface as null with RETRIEVAL_FAILURE_TOTAL (JDBC) or
405+
// DESERIALIZATION_FAILURE_TOTAL (CDC) in the record's change metadata.
403406
.addInsertValues(
404407
"'123'",
405408
"'1234567890.1234567'",
406409
"null",
407-
) // Postgres source does not support these special values yet
408-
// https://github.com/airbytehq/airbyte/issues/8902
409-
// "'-Infinity'", "'Infinity'", "'NaN'", "null")
410+
"'-Infinity'",
411+
"'Infinity'",
412+
"'NaN'",
413+
)
410414
.addExpectedValues(
411415
"123.0",
412416
"1.2345678901234567E9",
413417
null,
414-
) // "-Infinity", "Infinity", "NaN", null)
418+
null,
419+
null,
420+
null,
421+
)
415422
.build(),
416423
)
417424
}
@@ -467,8 +474,14 @@ abstract class AbstractPostgresSourceDatatypeTest : AbstractSourceDatabaseTypeTe
467474
TestDataHolder.builder()
468475
.sourceType(type)
469476
.airbyteType(JsonSchemaType.NUMBER)
470-
.addInsertValues("null", "3.4145")
471-
.addExpectedValues(null, "3.4145")
477+
.addInsertValues(
478+
"null",
479+
"3.4145",
480+
"'-Infinity'",
481+
"'Infinity'",
482+
"'NaN'",
483+
)
484+
.addExpectedValues(null, "3.4145", null, null, null)
472485
.build(),
473486
)
474487
}
@@ -1070,8 +1083,12 @@ abstract class AbstractPostgresSourceDatatypeTest : AbstractSourceDatabaseTypeTe
10701083
)
10711084
.build(),
10721085
)
1073-
.addInsertValues("'{131070.237689,231072.476596593}'")
1074-
.addExpectedValues("[131070.234,231072.48]")
1086+
// A row whose array contains any non-finite element nulls the entire array.
1087+
.addInsertValues(
1088+
"'{131070.237689,231072.476596593}'",
1089+
"'{1.0,Infinity,-Infinity,NaN}'",
1090+
)
1091+
.addExpectedValues("[131070.234,231072.48]", null)
10751092
.build(),
10761093
)
10771094

@@ -1089,8 +1106,11 @@ abstract class AbstractPostgresSourceDatatypeTest : AbstractSourceDatabaseTypeTe
10891106
)
10901107
.build(),
10911108
)
1092-
.addInsertValues("'{131070.237689,231072.476596593}'")
1093-
.addExpectedValues("[131070.237689,231072.476596593]")
1109+
.addInsertValues(
1110+
"'{131070.237689,231072.476596593}'",
1111+
"'{1.0,Infinity,-Infinity,NaN}'",
1112+
)
1113+
.addExpectedValues("[131070.237689,231072.476596593]", null)
10941114
.build(),
10951115
)
10961116

airbyte-integrations/connectors/source-postgres/src/test/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceFieldTypeMapperTest.kt

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,28 @@ class PostgresSourceFieldTypeMapperTest : FieldTypeMapperTest() {
109109
scalarAndArray(
110110
"REAL",
111111
LeafAirbyteSchemaType.NUMBER,
112-
AnsiSql.realValues.plus(preservedInfinities).plus(preservedNaN),
112+
AnsiSql.realValues,
113+
baseTestName = "REAL SUPPORTED VALS",
114+
)
115+
scalarAndArray(
116+
"REAL",
117+
LeafAirbyteSchemaType.NUMBER,
118+
nulledInfinities.plus(nulledNaN),
119+
arrayIsNulled = true,
120+
baseTestName = "REAL UNSUPPORTED VALS",
121+
)
122+
scalarAndArray(
123+
"DOUBLE PRECISION",
124+
LeafAirbyteSchemaType.NUMBER,
125+
AnsiSql.doubleValues,
126+
baseTestName = "DOUBLE PRECISION SUPPORTED VALS",
113127
)
114128
scalarAndArray(
115129
"DOUBLE PRECISION",
116130
LeafAirbyteSchemaType.NUMBER,
117-
AnsiSql.doubleValues.plus(preservedInfinities).plus(preservedNaN)
131+
nulledInfinities.plus(nulledNaN),
132+
arrayIsNulled = true,
133+
baseTestName = "DOUBLE PRECISION UNSUPPORTED VALS",
118134
)
119135

120136
// Character types
@@ -401,17 +417,11 @@ class PostgresSourceFieldTypeMapperTest : FieldTypeMapperTest() {
401417
val executor = JdbcTestDbExecutor(schema, jdbcConfig)
402418
executor.executeUpdate("CREATE EXTENSION IF NOT EXISTS hstore;")
403419
}
404-
private val preservedInfinities =
405-
mapOf(
406-
"'Infinity'" to "\"Infinity\"",
407-
"'-Infinity'" to "\"-Infinity\"",
408-
)
409420
private val nulledInfinities =
410421
mapOf(
411422
"'Infinity'" to "null",
412423
"'-Infinity'" to "null",
413424
)
414-
private val preservedNaN = mapOf("'NaN'" to "\"NaN\"")
415425
private val nulledNaN = mapOf("'NaN'" to "null")
416426
}
417427

0 commit comments

Comments
 (0)