Skip to content

Commit 2bbff3b

Browse files
committed
WIP
1 parent eaf03ff commit 2bbff3b

4 files changed

Lines changed: 95 additions & 38 deletions

File tree

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/cdc/PostgresCustomConverter.kt

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,10 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
191191
)
192192
}
193193

194-
// real / double precision can carry Infinity, -Infinity, and NaN. Downstream destinations
195-
// parse these as numbers, so non-finite values are nulled here to mirror the JDBC path
196-
// (PostgresFiniteFloatFieldType / PostgresFiniteDoubleFieldType).
194+
// real / double precision can carry Infinity, -Infinity, and NaN. The custom converter passes
195+
// these through unchanged; FiniteFloatCodec / FiniteDoubleCodec reject them during decode,
196+
// which the framework records as DESERIALIZATION_FAILURE_TOTAL in the record's metadata
197+
// before nulling the value — mirroring how PgTimestampCodec handles infinity dates.
197198
private fun registerFloatingPoint(
198199
field: RelationalColumn,
199200
registration: CustomConverter.ConverterRegistration<SchemaBuilder?>
@@ -204,26 +205,24 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
204205
schema.optional(),
205206
CustomConverter.Converter { x: Any? ->
206207
val value = x ?: convertDefaultValue(field) ?: return@Converter null
207-
if (isFloat4) finiteFloatOrNull(value) else finiteDoubleOrNull(value)
208+
if (isFloat4) toFloat(value) else toDouble(value)
208209
},
209210
)
210211
}
211212

212-
private fun finiteFloatOrNull(x: Any): Float? =
213+
private fun toFloat(x: Any): Float? =
213214
when (x) {
214-
is Float -> x.takeIf { it.isFinite() }
215-
is Double -> x.takeIf { it.isFinite() }?.toFloat()
216-
is Number -> x.toFloat().takeIf { it.isFinite() }
217-
is String -> x.toFloatOrNull()?.takeIf { it.isFinite() }
215+
is Float -> x
216+
is Number -> x.toFloat()
217+
is String -> x.toFloatOrNull()
218218
else -> null
219219
}
220220

221-
private fun finiteDoubleOrNull(x: Any): Double? =
221+
private fun toDouble(x: Any): Double? =
222222
when (x) {
223-
is Double -> x.takeIf { it.isFinite() }
224-
is Float -> x.takeIf { it.isFinite() }?.toDouble()
225-
is Number -> x.toDouble().takeIf { it.isFinite() }
226-
is String -> x.toDoubleOrNull()?.takeIf { it.isFinite() }
223+
is Double -> x
224+
is Number -> x.toDouble()
225+
is String -> x.toDoubleOrNull()
227226
else -> null
228227
}
229228

@@ -359,15 +358,11 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
359358
.collect(Collectors.toList())
360359
"_FLOAT4" ->
361360
return Arrays.stream<Any?>(getArray(x))
362-
.map<Float?> { value: Any? ->
363-
if (value == null) null else finiteFloatOrNull(value)
364-
}
361+
.map<Float?> { value: Any? -> if (value == null) null else toFloat(value) }
365362
.collect(Collectors.toList())
366363
"_FLOAT8" ->
367364
return Arrays.stream<Any?>(getArray(x))
368-
.map<Double?> { value: Any? ->
369-
if (value == null) null else finiteDoubleOrNull(value)
370-
}
365+
.map<Double?> { value: Any? -> if (value == null) null else toDouble(value) }
371366
.collect(Collectors.toList())
372367
"_TIME" ->
373368
return Arrays.stream<Any?>(getArray(x))

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/cdc/PostgresSourceDebeziumOperations.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import io.airbyte.cdk.discover.FieldType
2020
import io.airbyte.cdk.jdbc.ArrayFieldType
2121
import io.airbyte.cdk.jdbc.BigDecimalFieldType
2222
import io.airbyte.cdk.jdbc.BigIntegerFieldType
23-
import io.airbyte.cdk.jdbc.DoubleFieldType
24-
import io.airbyte.cdk.jdbc.FloatFieldType
2523
import io.airbyte.cdk.jdbc.StringFieldType
2624
import io.airbyte.cdk.output.sockets.FieldValueEncoder
2725
import io.airbyte.cdk.output.sockets.NativeRecordPayload
@@ -43,6 +41,8 @@ import io.airbyte.cdk.util.Jsons
4341
import io.airbyte.integrations.source.postgres.PostgresSourceJdbcConnectionFactory
4442
import io.airbyte.integrations.source.postgres.config.CdcIncrementalConfiguration
4543
import io.airbyte.integrations.source.postgres.config.PostgresSourceConfiguration
44+
import io.airbyte.integrations.source.postgres.operations.types.PostgresFiniteDoubleFieldType
45+
import io.airbyte.integrations.source.postgres.operations.types.PostgresFiniteFloatFieldType
4646
import io.debezium.connector.postgresql.PostgresConnector
4747
import io.debezium.connector.postgresql.connection.Lsn
4848
import io.debezium.time.Conversions
@@ -340,8 +340,15 @@ class PostgresSourceDebeziumOperations(
340340
try {
341341
val mappedValue =
342342
when (fieldType) {
343-
FloatFieldType -> Jsons.numberNode(input.floatValue())
344-
DoubleFieldType -> Jsons.numberNode(input.asDouble())
343+
// Narrow numeric nodes to the right precision so the codec's
344+
// IEEE-754 roundtrip check passes (Debezium emits float4 values as
345+
// DoubleNode in JSON). For non-numeric forms — e.g. TextNode("Infinity")
346+
// from non-finite values — pass through so the codec rejects them and
347+
// the framework records DESERIALIZATION_FAILURE_TOTAL.
348+
PostgresFiniteFloatFieldType ->
349+
if (input.isNumber) Jsons.numberNode(input.floatValue()) else input
350+
PostgresFiniteDoubleFieldType ->
351+
if (input.isNumber) Jsons.numberNode(input.asDouble()) else input
345352
BigDecimalFieldType -> {
346353
if (input.isNumber) input
347354
else Jsons.numberNode(BigDecimal(input.textValue()).stripTrailingZeros())

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/operations/types/PostgresFloatingPointFieldType.kt

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,99 @@
11
/* Copyright (c) 2026 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.integrations.source.postgres.operations.types
33

4-
import io.airbyte.cdk.data.DoubleCodec
5-
import io.airbyte.cdk.data.FloatCodec
4+
import com.fasterxml.jackson.databind.JsonNode
5+
import io.airbyte.cdk.data.JsonCodec
66
import io.airbyte.cdk.data.LeafAirbyteSchemaType
77
import io.airbyte.cdk.jdbc.JdbcAccessor
88
import io.airbyte.cdk.jdbc.SymmetricJdbcFieldType
9+
import io.airbyte.cdk.util.Jsons
910
import java.sql.PreparedStatement
1011
import java.sql.ResultSet
1112

12-
// Postgres real/double precision columns permit Infinity, -Infinity, and NaN. Downstream
13-
// destinations expect JSON numbers, but Jackson serializes IEEE-754 non-finite values as
14-
// the string tokens "Infinity"/"-Infinity"/"NaN" — which then fail to parse as numbers
15-
// (NumberFormatException: Character I is neither a decimal digit...). Match the legacy
16-
// connector's behavior and the existing numeric/decimal handling by emitting null for
17-
// non-finite values.
13+
// Postgres real/double precision permit Infinity, -Infinity, and NaN. Downstream destinations
14+
// expect JSON numbers, but Jackson serializes IEEE-754 non-finite values as the string tokens
15+
// "Infinity"/"-Infinity"/"NaN" — which then fail to parse as numbers (NumberFormatException:
16+
// Character I is neither a decimal digit...). The accessor throws on non-finite during JDBC
17+
// reads (recording RETRIEVAL_FAILURE_TOTAL in record metadata), and the codec throws during
18+
// CDC decode (recording DESERIALIZATION_FAILURE_TOTAL). The value is then nulled in the
19+
// emitted record, but the change is noted — mirroring how PgTimestampFieldType handles
20+
// infinity dates.
1821
object PostgresFiniteFloatFieldType :
1922
SymmetricJdbcFieldType<Float>(
2023
LeafAirbyteSchemaType.NUMBER,
2124
PgFiniteFloatAccessor,
22-
FloatCodec,
25+
FiniteFloatCodec,
2326
)
2427

2528
object PostgresFiniteDoubleFieldType :
2629
SymmetricJdbcFieldType<Double>(
2730
LeafAirbyteSchemaType.NUMBER,
2831
PgFiniteDoubleAccessor,
29-
DoubleCodec,
32+
FiniteDoubleCodec,
3033
)
3134

35+
object FiniteFloatCodec : JsonCodec<Float> {
36+
override fun encode(decoded: Float): JsonNode = Jsons.numberNode(decoded)
37+
38+
override fun decode(encoded: JsonNode): Float {
39+
if (!encoded.isNumber) {
40+
throw IllegalArgumentException("non-numeric value $encoded is unsupported")
41+
}
42+
val decoded: Float = encoded.floatValue()
43+
if (!decoded.isFinite()) {
44+
throw IllegalArgumentException("non-finite value $decoded is unsupported")
45+
}
46+
if (encode(decoded).doubleValue().compareTo(encoded.doubleValue()) != 0) {
47+
throw IllegalArgumentException(
48+
"invalid IEEE-754 32-bit floating point value $encoded (type ${encoded.javaClass.canonicalName})"
49+
)
50+
}
51+
return decoded
52+
}
53+
}
54+
55+
object FiniteDoubleCodec : JsonCodec<Double> {
56+
override fun encode(decoded: Double): JsonNode = Jsons.numberNode(decoded)
57+
58+
override fun decode(encoded: JsonNode): Double {
59+
if (!encoded.isNumber) {
60+
throw IllegalArgumentException("non-numeric value $encoded is unsupported")
61+
}
62+
val decoded: Double = encoded.doubleValue()
63+
if (!decoded.isFinite()) {
64+
throw IllegalArgumentException("non-finite value $decoded is unsupported")
65+
}
66+
if (encode(decoded).decimalValue().compareTo(encoded.decimalValue()) != 0) {
67+
throw IllegalArgumentException("invalid IEEE-754 64-bit floating point value $encoded")
68+
}
69+
return decoded
70+
}
71+
}
72+
3273
private object PgFiniteFloatAccessor : JdbcAccessor<Float> {
33-
override fun get(rs: ResultSet, colIdx: Int): Float? =
34-
rs.getFloat(colIdx).takeUnless { rs.wasNull() }?.takeIf { it.isFinite() }
74+
override fun get(rs: ResultSet, colIdx: Int): Float? {
75+
val value = rs.getFloat(colIdx)
76+
if (rs.wasNull()) return null
77+
if (!value.isFinite()) {
78+
throw IllegalStateException("non-finite value $value is unsupported")
79+
}
80+
return value
81+
}
3582

3683
override fun set(stmt: PreparedStatement, paramIdx: Int, value: Float) {
3784
stmt.setFloat(paramIdx, value)
3885
}
3986
}
4087

4188
private object PgFiniteDoubleAccessor : JdbcAccessor<Double> {
42-
override fun get(rs: ResultSet, colIdx: Int): Double? =
43-
rs.getDouble(colIdx).takeUnless { rs.wasNull() }?.takeIf { it.isFinite() }
89+
override fun get(rs: ResultSet, colIdx: Int): Double? {
90+
val value = rs.getDouble(colIdx)
91+
if (rs.wasNull()) return null
92+
if (!value.isFinite()) {
93+
throw IllegalStateException("non-finite value $value is unsupported")
94+
}
95+
return value
96+
}
4497

4598
override fun set(stmt: PreparedStatement, paramIdx: Int, value: Double) {
4699
stmt.setDouble(paramIdx, value)

airbyte-integrations/connectors/source-postgres/src/test/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceFieldTypeMapperTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class PostgresSourceFieldTypeMapperTest : FieldTypeMapperTest() {
116116
"REAL",
117117
LeafAirbyteSchemaType.NUMBER,
118118
nulledInfinities.plus(nulledNaN),
119+
arrayIsNulled = true,
119120
baseTestName = "REAL UNSUPPORTED VALS",
120121
)
121122
scalarAndArray(
@@ -128,6 +129,7 @@ class PostgresSourceFieldTypeMapperTest : FieldTypeMapperTest() {
128129
"DOUBLE PRECISION",
129130
LeafAirbyteSchemaType.NUMBER,
130131
nulledInfinities.plus(nulledNaN),
132+
arrayIsNulled = true,
131133
baseTestName = "DOUBLE PRECISION UNSUPPORTED VALS",
132134
)
133135

0 commit comments

Comments
 (0)