Skip to content

Commit eaf03ff

Browse files
committed
claude WIP
1 parent 0f0cfa0 commit eaf03ff

4 files changed

Lines changed: 127 additions & 12 deletions

File tree

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/cdc/PostgresCustomConverter.kt

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,13 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
5252
"PG_LSN",
5353
)
5454
private val NUMERIC_TYPES = arrayOf<String?>("NUMERIC", "DECIMAL")
55+
private val FLOATING_POINT_TYPES = arrayOf<String?>("FLOAT4", "FLOAT8")
5556
private val ARRAY_TYPES =
5657
arrayOf<String?>(
5758
"_NAME",
5859
"_NUMERIC",
60+
"_FLOAT4",
61+
"_FLOAT8",
5962
"_BYTEA",
6063
"_MONEY",
6164
"_BIT",
@@ -111,6 +114,12 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
111114
}
112115
) {
113116
registerNumber(field, registration)
117+
} else if (
118+
Arrays.stream<String?>(FLOATING_POINT_TYPES).anyMatch { s: String? ->
119+
s.equals(field.typeName(), ignoreCase = true)
120+
}
121+
) {
122+
registerFloatingPoint(field, registration)
114123
} else if (
115124
Arrays.stream<String?>(ARRAY_TYPES).anyMatch { s: String? ->
116125
s.equals(field.typeName(), ignoreCase = true)
@@ -139,6 +148,8 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
139148
SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional()
140149
}
141150
}
151+
"_FLOAT4" -> SchemaBuilder.array(Schema.OPTIONAL_FLOAT32_SCHEMA).optional()
152+
"_FLOAT8",
142153
"_MONEY" -> SchemaBuilder.array(Schema.OPTIONAL_FLOAT64_SCHEMA).optional()
143154
"_NAME",
144155
"_DATE",
@@ -180,6 +191,42 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
180191
)
181192
}
182193

194+
// real / double precision can carry Infinity, -Infinity, and NaN. Downstream destinations
195+
// parse these as numbers, so non-finite values are nulled here to mirror the JDBC path
196+
// (PostgresFiniteFloatFieldType / PostgresFiniteDoubleFieldType).
197+
private fun registerFloatingPoint(
198+
field: RelationalColumn,
199+
registration: CustomConverter.ConverterRegistration<SchemaBuilder?>
200+
) {
201+
val isFloat4 = field.typeName().equals("float4", ignoreCase = true)
202+
val schema = if (isFloat4) SchemaBuilder.float32() else SchemaBuilder.float64()
203+
registration.register(
204+
schema.optional(),
205+
CustomConverter.Converter { x: Any? ->
206+
val value = x ?: convertDefaultValue(field) ?: return@Converter null
207+
if (isFloat4) finiteFloatOrNull(value) else finiteDoubleOrNull(value)
208+
},
209+
)
210+
}
211+
212+
private fun finiteFloatOrNull(x: Any): Float? =
213+
when (x) {
214+
is Float -> x.takeIf { it.isFinite() }
215+
is Double -> x.takeIf { it.isFinite() }?.toFloat()
216+
is Number -> x.toFloat().takeIf { it.isFinite() }
217+
is String -> x.toFloatOrNull()?.takeIf { it.isFinite() }
218+
else -> null
219+
}
220+
221+
private fun finiteDoubleOrNull(x: Any): Double? =
222+
when (x) {
223+
is Double -> x.takeIf { it.isFinite() }
224+
is Float -> x.takeIf { it.isFinite() }?.toDouble()
225+
is Number -> x.toDouble().takeIf { it.isFinite() }
226+
is String -> x.toDoubleOrNull()?.takeIf { it.isFinite() }
227+
else -> null
228+
}
229+
183230
private fun getNumberConvertedValue(x: Any): String {
184231
// Bad solution
185232
// We applied a solution like this for several reasons:
@@ -310,6 +357,18 @@ class PostgresCustomConverter : CustomConverter<SchemaBuilder?, RelationalColumn
310357
}
311358
}
312359
.collect(Collectors.toList())
360+
"_FLOAT4" ->
361+
return Arrays.stream<Any?>(getArray(x))
362+
.map<Float?> { value: Any? ->
363+
if (value == null) null else finiteFloatOrNull(value)
364+
}
365+
.collect(Collectors.toList())
366+
"_FLOAT8" ->
367+
return Arrays.stream<Any?>(getArray(x))
368+
.map<Double?> { value: Any? ->
369+
if (value == null) null else finiteDoubleOrNull(value)
370+
}
371+
.collect(Collectors.toList())
313372
"_TIME" ->
314373
return Arrays.stream<Any?>(getArray(x))
315374
.map<String> { value: Any? ->

airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/operations/PostgresSourceFieldTypeMapper.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import io.airbyte.cdk.jdbc.BigDecimalFieldType
99
import io.airbyte.cdk.jdbc.BigIntegerFieldType
1010
import io.airbyte.cdk.jdbc.BinaryStreamFieldType
1111
import io.airbyte.cdk.jdbc.BooleanFieldType
12-
import io.airbyte.cdk.jdbc.DoubleFieldType
13-
import io.airbyte.cdk.jdbc.FloatFieldType
1412
import io.airbyte.cdk.jdbc.IntFieldType
1513
import io.airbyte.cdk.jdbc.JdbcFieldType
1614
import io.airbyte.cdk.jdbc.LongFieldType
@@ -23,6 +21,8 @@ import io.airbyte.integrations.source.postgres.operations.types.HstoreFieldType
2321
import io.airbyte.integrations.source.postgres.operations.types.LegacyBooleanBitsFieldType
2422
import io.airbyte.integrations.source.postgres.operations.types.PostgresByteaFieldType
2523
import io.airbyte.integrations.source.postgres.operations.types.PostgresDateFieldType
24+
import io.airbyte.integrations.source.postgres.operations.types.PostgresFiniteDoubleFieldType
25+
import io.airbyte.integrations.source.postgres.operations.types.PostgresFiniteFloatFieldType
2626
import io.airbyte.integrations.source.postgres.operations.types.PostgresMoneyArrayElementFieldType
2727
import io.airbyte.integrations.source.postgres.operations.types.PostgresMoneyFieldType
2828
import io.airbyte.integrations.source.postgres.operations.types.PostgresTimeFieldType
@@ -67,14 +67,14 @@ class PostgresSourceFieldTypeMapper : JdbcMetadataQuerier.FieldTypeMapper {
6767
// oid type is unsigned and must be cast to Long to avoid truncation
6868
if (type.scalarTypeName == "oid") LongFieldType else IntFieldType
6969
JDBCType.BIGINT -> LongFieldType
70-
JDBCType.REAL -> FloatFieldType
70+
JDBCType.REAL -> PostgresFiniteFloatFieldType
7171
JDBCType.FLOAT,
7272
JDBCType.DOUBLE ->
7373
// TODO (https://github.com/airbytehq/airbyte-internal-issues/issues/15879):
7474
// Fix type handling for numeric arrays. Arrays and scalars of money are handled
7575
// differently by the PostgresCustomConverter.
7676
when {
77-
type.scalarTypeName != "money" -> DoubleFieldType
77+
type.scalarTypeName != "money" -> PostgresFiniteDoubleFieldType
7878
type.isArray -> PostgresMoneyArrayElementFieldType
7979
else -> PostgresMoneyFieldType
8080
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/* Copyright (c) 2026 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.integrations.source.postgres.operations.types
3+
4+
import io.airbyte.cdk.data.DoubleCodec
5+
import io.airbyte.cdk.data.FloatCodec
6+
import io.airbyte.cdk.data.LeafAirbyteSchemaType
7+
import io.airbyte.cdk.jdbc.JdbcAccessor
8+
import io.airbyte.cdk.jdbc.SymmetricJdbcFieldType
9+
import java.sql.PreparedStatement
10+
import java.sql.ResultSet
11+
12+
// Postgres real/double precision columns permit Infinity, -Infinity, and NaN. Downstream
13+
// destinations expect JSON numbers, but Jackson serializes IEEE-754 non-finite values as
14+
// the string tokens "Infinity"/"-Infinity"/"NaN" — which then fail to parse as numbers
15+
// (NumberFormatException: Character I is neither a decimal digit...). Match the legacy
16+
// connector's behavior and the existing numeric/decimal handling by emitting null for
17+
// non-finite values.
18+
object PostgresFiniteFloatFieldType :
19+
SymmetricJdbcFieldType<Float>(
20+
LeafAirbyteSchemaType.NUMBER,
21+
PgFiniteFloatAccessor,
22+
FloatCodec,
23+
)
24+
25+
object PostgresFiniteDoubleFieldType :
26+
SymmetricJdbcFieldType<Double>(
27+
LeafAirbyteSchemaType.NUMBER,
28+
PgFiniteDoubleAccessor,
29+
DoubleCodec,
30+
)
31+
32+
private object PgFiniteFloatAccessor : JdbcAccessor<Float> {
33+
override fun get(rs: ResultSet, colIdx: Int): Float? =
34+
rs.getFloat(colIdx).takeUnless { rs.wasNull() }?.takeIf { it.isFinite() }
35+
36+
override fun set(stmt: PreparedStatement, paramIdx: Int, value: Float) {
37+
stmt.setFloat(paramIdx, value)
38+
}
39+
}
40+
41+
private object PgFiniteDoubleAccessor : JdbcAccessor<Double> {
42+
override fun get(rs: ResultSet, colIdx: Int): Double? =
43+
rs.getDouble(colIdx).takeUnless { rs.wasNull() }?.takeIf { it.isFinite() }
44+
45+
override fun set(stmt: PreparedStatement, paramIdx: Int, value: Double) {
46+
stmt.setDouble(paramIdx, value)
47+
}
48+
}

airbyte-integrations/connectors/source-postgres/src/test/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceFieldTypeMapperTest.kt

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,26 @@ class PostgresSourceFieldTypeMapperTest : FieldTypeMapperTest() {
109109
scalarAndArray(
110110
"REAL",
111111
LeafAirbyteSchemaType.NUMBER,
112-
AnsiSql.realValues.plus(preservedInfinities).plus(preservedNaN),
112+
AnsiSql.realValues,
113+
baseTestName = "REAL SUPPORTED VALS",
114+
)
115+
scalarAndArray(
116+
"REAL",
117+
LeafAirbyteSchemaType.NUMBER,
118+
nulledInfinities.plus(nulledNaN),
119+
baseTestName = "REAL UNSUPPORTED VALS",
120+
)
121+
scalarAndArray(
122+
"DOUBLE PRECISION",
123+
LeafAirbyteSchemaType.NUMBER,
124+
AnsiSql.doubleValues,
125+
baseTestName = "DOUBLE PRECISION SUPPORTED VALS",
113126
)
114127
scalarAndArray(
115128
"DOUBLE PRECISION",
116129
LeafAirbyteSchemaType.NUMBER,
117-
AnsiSql.doubleValues.plus(preservedInfinities).plus(preservedNaN)
130+
nulledInfinities.plus(nulledNaN),
131+
baseTestName = "DOUBLE PRECISION UNSUPPORTED VALS",
118132
)
119133

120134
// Character types
@@ -401,17 +415,11 @@ class PostgresSourceFieldTypeMapperTest : FieldTypeMapperTest() {
401415
val executor = JdbcTestDbExecutor(schema, jdbcConfig)
402416
executor.executeUpdate("CREATE EXTENSION IF NOT EXISTS hstore;")
403417
}
404-
private val preservedInfinities =
405-
mapOf(
406-
"'Infinity'" to "\"Infinity\"",
407-
"'-Infinity'" to "\"-Infinity\"",
408-
)
409418
private val nulledInfinities =
410419
mapOf(
411420
"'Infinity'" to "null",
412421
"'-Infinity'" to "null",
413422
)
414-
private val preservedNaN = mapOf("'NaN'" to "\"NaN\"")
415423
private val nulledNaN = mapOf("'NaN'" to "null")
416424
}
417425

0 commit comments

Comments
 (0)