Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.8.0-rc.3
dockerImageTag: 3.8.0-rc.4
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,22 @@ class PostgresSourceJdbcUnsplittableSnapshotWithCursorPartition(
val cursorUpperBoundQuerySpec = SelectQuerySpec(SelectColumnMaxValue(cursor), from)
}

class PostgresSourceJdbcUnsplittableSnapshotWithXminPartition(
selectQueryGenerator: SelectQueryGenerator,
streamState: PostgresSourceJdbcStreamState,
) :
PostgresSourceJdbcUnsplittablePartition(selectQueryGenerator, streamState),
JdbcCursorPartition<PostgresSourceJdbcStreamState> {
override val completeState: OpaqueStateValue
get() =
PostgresSourceJdbcStreamStateValue.xminIncrementalCheckpoint(
streamState.cursorUpperBound ?: Jsons.nullNode(),
)

override val cursorUpperBoundQuery: SelectQuery
get() = xminCursorUpperBoundQuery
}

class PostgresSourceJdbcUnsplittableCursorIncrementalPartition(
selectQueryGenerator: SelectQueryGenerator,
streamState: PostgresSourceJdbcStreamState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ open class PostgresSourceJdbcPartitionFactory(
true
)
}
?: error(
"Unexpected incremental sync for a table ${stream.id} with no filenode."
?: PostgresSourceJdbcUnsplittableSnapshotWithXminPartition(
selectQueryGenerator,
streamState,
)
}
is UserDefinedCursorIncrementalConfiguration -> {
Expand Down Expand Up @@ -297,18 +298,14 @@ open class PostgresSourceJdbcPartitionFactory(
// Incremental done
null
} else {
filenode?.let { // Incremental ongoing
PostgresSourceJdbcXminIncrementalPartition(
selectQueryGenerator,
streamState,
xminLowerBound = sv.xmin,
isLowerBoundIncluded = true,
xminUpperBound = streamState.cursorUpperBound,
)
}
?: error(
"Unexpected incremental sync for a table ${stream.id} with no filenode."
)
// Incremental ongoing
PostgresSourceJdbcXminIncrementalPartition(
selectQueryGenerator,
streamState,
xminLowerBound = sv.xmin,
isLowerBoundIncluded = true,
xminUpperBound = streamState.cursorUpperBound,
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,26 @@
package io.airbyte.integrations.source.postgres

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.output.CatalogValidationFailureHandler
import io.airbyte.cdk.read.ConfiguredSyncMode
import io.airbyte.cdk.read.DefaultJdbcSharedState
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.StreamFeedBootstrap
import io.airbyte.cdk.util.Jsons
import io.airbyte.integrations.source.postgres.config.PostgresSourceConfiguration
import io.airbyte.integrations.source.postgres.config.XminIncrementalConfiguration
import io.airbyte.integrations.source.postgres.ctid.Ctid
import io.airbyte.integrations.source.postgres.operations.PostgresSourceSelectQueryGenerator
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test

class PostgresSourceJdbcPartitionFactoryTest {
Expand Down Expand Up @@ -321,4 +331,133 @@ class PostgresSourceJdbcPartitionFactoryTest {
assertEquals(Ctid(18, 1), bounds[3].first)
assertNull(bounds[3].second)
}

@Nested
inner class XminNullFilenodeTests {

/**
* Tests that when filenode is null (Postgres < 14, no TID range scan support), the Xmin
* cold start path falls back to an unsplittable snapshot partition instead of throwing an
* error.
*
* This is a regression test for https://github.com/airbytehq/oncall/issues/11886
*/
@Test
fun `xmin cold start with null filenode should fall back to unsplittable snapshot`() {
val stream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor()
.withName("test_table")
.withNamespace("test_namespace")
),
schema = emptySet(),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = null,
configuredCursor = null,
)

val config =
mockk<PostgresSourceConfiguration> {
every { incrementalConfiguration } returns XminIncrementalConfiguration
every { global } returns false
}

val sharedState = mockk<DefaultJdbcSharedState>(relaxed = true)
val selectQueryGenerator = mockk<PostgresSourceSelectQueryGenerator>()

val streamFeedBootstrap =
mockk<StreamFeedBootstrap> {
every { feed } returns stream
every { currentState } returns null
}

val testFactory =
spyk(
PostgresSourceJdbcPartitionFactory(
sharedState = sharedState,
selectQueryGenerator = selectQueryGenerator,
config = config,
handler = mockk<CatalogValidationFailureHandler>(),
connectionFactory = mockk<PostgresSourceJdbcConnectionFactory>()
)
) { every { tidRangeScanCapableDBServer } returns false }

val partition = testFactory.create(streamFeedBootstrap)

assertTrue(
partition is PostgresSourceJdbcUnsplittableSnapshotWithXminPartition,
"Expected PostgresSourceJdbcUnsplittableSnapshotWithXminPartition " +
"but got ${partition?.javaClass?.simpleName}"
)
}

/**
* Tests that when filenode is null (Postgres < 14) and there is ongoing xmin incremental
* state, the factory creates an XminIncrementalPartition directly without guarding on
* filenode.
*
* This is a regression test for https://github.com/airbytehq/oncall/issues/11886
*/
@Test
fun `xmin incremental ongoing with null filenode should create partition without error`() {
val stream =
Stream(
id =
StreamIdentifier.from(
StreamDescriptor()
.withName("test_table")
.withNamespace("test_namespace")
),
schema = emptySet(),
configuredSyncMode = ConfiguredSyncMode.INCREMENTAL,
configuredPrimaryKey = null,
configuredCursor = null,
)

val config =
mockk<PostgresSourceConfiguration> {
every { incrementalConfiguration } returns XminIncrementalConfiguration
every { global } returns false
}

val sharedState = mockk<DefaultJdbcSharedState>(relaxed = true)
val selectQueryGenerator = mockk<PostgresSourceSelectQueryGenerator>()

// State with xmin set and no ctid (snapshot complete, incremental ongoing)
val xminCheckpoint = Jsons.numberNode(12345L)
val stateValue =
PostgresSourceJdbcStreamStateValue(
stateType = "xmin_based",
xmin = xminCheckpoint,
)
val opaqueState = Jsons.valueToTree<JsonNode>(stateValue)

val streamFeedBootstrap =
mockk<StreamFeedBootstrap> {
every { feed } returns stream
every { currentState } returns opaqueState
}

val testFactory =
spyk(
PostgresSourceJdbcPartitionFactory(
sharedState = sharedState,
selectQueryGenerator = selectQueryGenerator,
config = config,
handler = mockk<CatalogValidationFailureHandler>(),
connectionFactory = mockk<PostgresSourceJdbcConnectionFactory>()
)
) { every { tidRangeScanCapableDBServer } returns false }

val partition = testFactory.create(streamFeedBootstrap)

assertTrue(
partition is PostgresSourceJdbcXminIncrementalPartition,
"Expected PostgresSourceJdbcXminIncrementalPartition " +
"but got ${partition?.javaClass?.simpleName}"
)
}
}
}
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|------------|------------|-----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.8.0-rc.4 | 2026-04-04 | [76088](https://github.com/airbytehq/airbyte/pull/76088) | Fix Xmin incremental sync failure on Postgres < 14 when filenode is null |
| 3.8.0-rc.3 | 2026-04-02 | [76055](https://github.com/airbytehq/airbyte/pull/76055) | Use CDK 1.1.1 |
| 3.8.0-rc.2 | 2026-04-01 | [75957](https://github.com/airbytehq/airbyte/pull/75957) | Fix non-CDC connections failing due to unconditional CDC bean initialization |
| 3.8.0-rc.1 | 2026-03-31 | [75637](https://github.com/airbytehq/airbyte/pull/75637) | Initial release candidate for rewritten connector on the bulk CDK |
Expand Down
Loading