-
Notifications
You must be signed in to change notification settings - Fork 44
feat: pagination reset #781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Maxime Carbonneau-Leclerc (maxi297)
merged 15 commits into
main
from
maxi297/pagination_reset
Oct 9, 2025
Merged
Changes from 1 commit
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
a2bc95d
pagination reset
f90b110
fix case with custom retriever
3fb450c
mypy
37698f5
fix warnings
2a50950
fixes while testing
2029415
mypy + format
ab5d315
mypy
26c2735
mypy
cc4da11
format
c8e5c1e
Merge branch 'main' into maxi297/pagination_reset
maxi297 f247e93
One cursor per PaginationTracker
71065f0
coderabbitai
2ae3f57
code review
fbf907c
fix test
92b9dff
format/lint
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| from typing import Optional | ||
|
|
||
| from airbyte_cdk.sources.declarative.models import FailureType | ||
| from airbyte_cdk.sources.declarative.types import Record, StreamSlice | ||
| from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor | ||
| from airbyte_cdk.utils.traced_exception import AirbyteTracedException | ||
|
|
||
|
|
||
| class PaginationTracker: | ||
| _record_count: int | ||
| _number_of_attempt_with_same_slice: int | ||
|
|
||
| def __init__( | ||
| self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None | ||
| ) -> None: | ||
| """ | ||
| Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all | ||
| implementations of `Cursor` can support this use case. For example, if the `ConcurrentPerPartitionCursor` | ||
| switch to global state, we stop keeping track of the state per partition and therefore can't get an accurate | ||
| view for a specific stream_slice. In order to solve that, we decided to scope this feature to use only | ||
| ConcurrentCursor which is the only "leaf" cursor that actually emits stream slices with `cursor_partition`. | ||
| """ | ||
| self._cursor = cursor | ||
| self._limit = max_number_of_records | ||
| self.reset() | ||
|
|
||
| """ | ||
| Given we have a cursor, we do not allow for the same slice to be processed twice because we assume we will | ||
| always process the same slice. | ||
|
|
||
| Given no cursor, we assume that the pagination reset is for retrying purposes and we allow to retry once. | ||
| """ | ||
| self._allowed_number_of_attempt_with_same_slice = 1 if self._cursor else 2 | ||
| self._number_of_attempt_with_same_slice = 0 | ||
|
|
||
| def observe(self, record: Record) -> None: | ||
| self._record_count += 1 | ||
| if self._cursor: | ||
|
maxi297 marked this conversation as resolved.
|
||
| self._cursor.observe(record) | ||
|
|
||
| def has_reached_limit(self) -> bool: | ||
| return self._limit is not None and self._record_count >= self._limit | ||
|
|
||
| def reset(self) -> None: | ||
| self._record_count = 0 | ||
|
|
||
| def reduce_slice_range_if_possible(self, stream_slice: StreamSlice) -> StreamSlice: | ||
| new_slice = self._cursor.reduce_slice_range(stream_slice) if self._cursor else stream_slice | ||
|
|
||
| self._number_of_attempt_with_same_slice += 1 | ||
| if new_slice == stream_slice: | ||
| if ( | ||
| self._number_of_attempt_with_same_slice | ||
| >= self._allowed_number_of_attempt_with_same_slice | ||
| ): | ||
| raise AirbyteTracedException( | ||
| internal_message=f"There were {self._number_of_attempt_with_same_slice} attempts with the same slice already while the max allowed is {self._allowed_number_of_attempt_with_same_slice}", | ||
| failure_type=FailureType.system_error, | ||
| ) | ||
|
maxi297 marked this conversation as resolved.
Outdated
|
||
|
|
||
| return new_slice | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.