diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index e5b6e4c4a24c..b8953a81682f 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.8.0-rc.3 + dockerImageTag: 3.8.0-rc.4 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartition.kt b/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartition.kt index 6e9f9fc26bf4..d820f0ee1183 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartition.kt +++ b/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartition.kt @@ -112,6 +112,22 @@ class PostgresSourceJdbcUnsplittableSnapshotWithCursorPartition( val cursorUpperBoundQuerySpec = SelectQuerySpec(SelectColumnMaxValue(cursor), from) } +class PostgresSourceJdbcUnsplittableSnapshotWithXminPartition( + selectQueryGenerator: SelectQueryGenerator, + streamState: PostgresSourceJdbcStreamState, +) : + PostgresSourceJdbcUnsplittablePartition(selectQueryGenerator, streamState), + JdbcCursorPartition { + override val completeState: OpaqueStateValue + get() = + PostgresSourceJdbcStreamStateValue.xminIncrementalCheckpoint( + streamState.cursorUpperBound ?: Jsons.nullNode(), + ) + + override val cursorUpperBoundQuery: SelectQuery + get() = xminCursorUpperBoundQuery +} + class PostgresSourceJdbcUnsplittableCursorIncrementalPartition( selectQueryGenerator: SelectQueryGenerator, streamState: PostgresSourceJdbcStreamState, diff --git a/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactory.kt b/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactory.kt index 6f9bfee685b2..81862fd0277d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactory.kt +++ b/airbyte-integrations/connectors/source-postgres/src/main/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactory.kt @@ -103,8 +103,9 @@ open class PostgresSourceJdbcPartitionFactory( true ) } - ?: error( - "Unexpected incremental sync for a table ${stream.id} with no filenode." + ?: PostgresSourceJdbcUnsplittableSnapshotWithXminPartition( + selectQueryGenerator, + streamState, ) } is UserDefinedCursorIncrementalConfiguration -> { @@ -297,18 +298,14 @@ open class PostgresSourceJdbcPartitionFactory( // Incremental done null } else { - filenode?.let { // Incremental ongoing - PostgresSourceJdbcXminIncrementalPartition( - selectQueryGenerator, - streamState, - xminLowerBound = sv.xmin, - isLowerBoundIncluded = true, - xminUpperBound = streamState.cursorUpperBound, - ) - } - ?: error( - "Unexpected incremental sync for a table ${stream.id} with no filenode." - ) + // Incremental ongoing + PostgresSourceJdbcXminIncrementalPartition( + selectQueryGenerator, + streamState, + xminLowerBound = sv.xmin, + isLowerBoundIncluded = true, + xminUpperBound = streamState.cursorUpperBound, + ) } } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactoryTest.kt b/airbyte-integrations/connectors/source-postgres/src/test/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactoryTest.kt index 593c36bb0127..063e804570ef 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactoryTest.kt +++ b/airbyte-integrations/connectors/source-postgres/src/test/kotlin/io/airbyte/integrations/source/postgres/PostgresSourceJdbcPartitionFactoryTest.kt @@ -5,16 +5,26 @@ package io.airbyte.integrations.source.postgres import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.StreamIdentifier import io.airbyte.cdk.output.CatalogValidationFailureHandler +import io.airbyte.cdk.read.ConfiguredSyncMode import io.airbyte.cdk.read.DefaultJdbcSharedState +import io.airbyte.cdk.read.Stream +import io.airbyte.cdk.read.StreamFeedBootstrap import io.airbyte.cdk.util.Jsons import io.airbyte.integrations.source.postgres.config.PostgresSourceConfiguration +import io.airbyte.integrations.source.postgres.config.XminIncrementalConfiguration import io.airbyte.integrations.source.postgres.ctid.Ctid import io.airbyte.integrations.source.postgres.operations.PostgresSourceSelectQueryGenerator +import io.airbyte.protocol.models.v0.StreamDescriptor +import io.mockk.every import io.mockk.mockk +import io.mockk.spyk import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test class PostgresSourceJdbcPartitionFactoryTest { @@ -321,4 +331,133 @@ class PostgresSourceJdbcPartitionFactoryTest { assertEquals(Ctid(18, 1), bounds[3].first) assertNull(bounds[3].second) } + + @Nested + inner class XminNullFilenodeTests { + + /** + * Tests that when filenode is null (Postgres < 14, no TID range scan support), the Xmin + * cold start path falls back to an unsplittable snapshot partition instead of throwing an + * error. + * + * This is a regression test for https://github.com/airbytehq/oncall/issues/11886 + */ + @Test + fun `xmin cold start with null filenode should fall back to unsplittable snapshot`() { + val stream = + Stream( + id = + StreamIdentifier.from( + StreamDescriptor() + .withName("test_table") + .withNamespace("test_namespace") + ), + schema = emptySet(), + configuredSyncMode = ConfiguredSyncMode.INCREMENTAL, + configuredPrimaryKey = null, + configuredCursor = null, + ) + + val config = + mockk { + every { incrementalConfiguration } returns XminIncrementalConfiguration + every { global } returns false + } + + val sharedState = mockk(relaxed = true) + val selectQueryGenerator = mockk() + + val streamFeedBootstrap = + mockk { + every { feed } returns stream + every { currentState } returns null + } + + val testFactory = + spyk( + PostgresSourceJdbcPartitionFactory( + sharedState = sharedState, + selectQueryGenerator = selectQueryGenerator, + config = config, + handler = mockk(), + connectionFactory = mockk() + ) + ) { every { tidRangeScanCapableDBServer } returns false } + + val partition = testFactory.create(streamFeedBootstrap) + + assertTrue( + partition is PostgresSourceJdbcUnsplittableSnapshotWithXminPartition, + "Expected PostgresSourceJdbcUnsplittableSnapshotWithXminPartition " + + "but got ${partition?.javaClass?.simpleName}" + ) + } + + /** + * Tests that when filenode is null (Postgres < 14) and there is ongoing xmin incremental + * state, the factory creates an XminIncrementalPartition directly without guarding on + * filenode. + * + * This is a regression test for https://github.com/airbytehq/oncall/issues/11886 + */ + @Test + fun `xmin incremental ongoing with null filenode should create partition without error`() { + val stream = + Stream( + id = + StreamIdentifier.from( + StreamDescriptor() + .withName("test_table") + .withNamespace("test_namespace") + ), + schema = emptySet(), + configuredSyncMode = ConfiguredSyncMode.INCREMENTAL, + configuredPrimaryKey = null, + configuredCursor = null, + ) + + val config = + mockk { + every { incrementalConfiguration } returns XminIncrementalConfiguration + every { global } returns false + } + + val sharedState = mockk(relaxed = true) + val selectQueryGenerator = mockk() + + // State with xmin set and no ctid (snapshot complete, incremental ongoing) + val xminCheckpoint = Jsons.numberNode(12345L) + val stateValue = + PostgresSourceJdbcStreamStateValue( + stateType = "xmin_based", + xmin = xminCheckpoint, + ) + val opaqueState = Jsons.valueToTree(stateValue) + + val streamFeedBootstrap = + mockk { + every { feed } returns stream + every { currentState } returns opaqueState + } + + val testFactory = + spyk( + PostgresSourceJdbcPartitionFactory( + sharedState = sharedState, + selectQueryGenerator = selectQueryGenerator, + config = config, + handler = mockk(), + connectionFactory = mockk() + ) + ) { every { tidRangeScanCapableDBServer } returns false } + + val partition = testFactory.create(streamFeedBootstrap) + + assertTrue( + partition is PostgresSourceJdbcXminIncrementalPartition, + "Expected PostgresSourceJdbcXminIncrementalPartition " + + "but got ${partition?.javaClass?.simpleName}" + ) + } + } } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 76b39607df0f..36a4b7b8ef79 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -357,6 +357,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |------------|------------|-----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.8.0-rc.4 | 2026-04-04 | [76088](https://github.com/airbytehq/airbyte/pull/76088) | Fix Xmin incremental sync failure on Postgres < 14 when filenode is null | | 3.8.0-rc.3 | 2026-04-02 | [76055](https://github.com/airbytehq/airbyte/pull/76055) | Use CDK 1.1.1 | | 3.8.0-rc.2 | 2026-04-01 | [75957](https://github.com/airbytehq/airbyte/pull/75957) | Fix non-CDC connections failing due to unconditional CDC bean initialization | | 3.8.0-rc.1 | 2026-03-31 | [75637](https://github.com/airbytehq/airbyte/pull/75637) | Initial release candidate for rewritten connector on the bulk CDK |