Skip to content

Commit e0ef3eb

Browse files
feat: add pages_per_checkpoint_interval to declarative schema for incremental sync cursors
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
1 parent fed6100 commit e0ef3eb

3 files changed

Lines changed: 28 additions & 9 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,10 @@ definitions:
876876
title: Inject Start Value Into Outgoing HTTP Request
877877
description: Optionally configures how the start value will be sent in requests to the source API.
878878
"$ref": "#/definitions/RequestOption"
879+
pages_per_checkpoint_interval:
880+
title: Pages Per Checkpoint Interval
881+
description: The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.
882+
type: integer
879883
$parameters:
880884
type: object
881885
additionalProperties: true
@@ -1160,6 +1164,10 @@ definitions:
11601164
- "P1W"
11611165
- "P1M"
11621166
- "P1Y"
1167+
pages_per_checkpoint_interval:
1168+
title: Pages Per Checkpoint Interval
1169+
description: The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.
1170+
type: integer
11631171
$parameters:
11641172
type: object
11651173
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,6 +1604,11 @@ class IncrementingCountCursor(BaseModel):
16041604
description="Optionally configures how the start value will be sent in requests to the source API.",
16051605
title="Inject Start Value Into Outgoing HTTP Request",
16061606
)
1607+
pages_per_checkpoint_interval: Optional[int] = Field(
1608+
None,
1609+
description="The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.",
1610+
title="Pages Per Checkpoint Interval",
1611+
)
16071612
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
16081613

16091614

@@ -1720,6 +1725,11 @@ class DatetimeBasedCursor(BaseModel):
17201725
examples=["P1W", "{{ config['step_increment'] }}"],
17211726
title="Step",
17221727
)
1728+
pages_per_checkpoint_interval: Optional[int] = Field(
1729+
None,
1730+
description="The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.",
1731+
title="Pages Per Checkpoint Interval",
1732+
)
17231733
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
17241734

17251735

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,6 @@
546546
NoopFileWriter,
547547
)
548548
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import (
549-
DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL,
550549
PaginationTracker,
551550
)
552551
from airbyte_cdk.sources.declarative.schema import (
@@ -3466,24 +3465,28 @@ def _get_url(req: Requester) -> str:
34663465
additional_query_properties=query_properties,
34673466
log_formatter=self._get_log_formatter(log_formatter, name),
34683467
pagination_tracker_factory=self._create_pagination_tracker_factory(
3469-
model.pagination_reset, cursor
3468+
model.pagination_reset,
3469+
cursor,
3470+
incremental_sync.pages_per_checkpoint_interval if incremental_sync else None,
34703471
),
34713472
parameters=model.parameters or {},
34723473
)
34733474

34743475
def _create_pagination_tracker_factory(
3475-
self, model: Optional[PaginationResetModel], cursor: Cursor
3476+
self,
3477+
model: Optional[PaginationResetModel],
3478+
cursor: Cursor,
3479+
pages_per_checkpoint_interval: int | None = None,
34763480
) -> Callable[[], PaginationTracker]:
34773481
checkpoint_cursor: Optional[ConcurrentCursor] = (
34783482
cursor if isinstance(cursor, ConcurrentCursor) else None
34793483
)
3484+
effective_interval = pages_per_checkpoint_interval if checkpoint_cursor else None
34803485

34813486
if model is None:
34823487
return lambda: PaginationTracker(
34833488
checkpoint_cursor=checkpoint_cursor,
3484-
pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL
3485-
if checkpoint_cursor
3486-
else None,
3489+
pages_per_checkpoint_interval=effective_interval,
34873490
)
34883491

34893492
# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
@@ -3510,9 +3513,7 @@ def _create_pagination_tracker_factory(
35103513
cursor_factory(),
35113514
limit,
35123515
checkpoint_cursor=checkpoint_cursor,
3513-
pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL
3514-
if checkpoint_cursor
3515-
else None,
3516+
pages_per_checkpoint_interval=effective_interval,
35163517
)
35173518

35183519
def _get_log_formatter(

0 commit comments

Comments
 (0)