Skip to content

Commit 6bb6c97

Browse files
feat(declarative): Add interpolation support for CursorPagination page_size (#878)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alfredo Garcia <alfredo.garcia@hallmark.edu>
1 parent 9b23860 commit 6bb6c97

File tree

5 files changed

+95
-18
lines changed

5 files changed

+95
-18
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,16 @@ definitions:
512512
page_size:
513513
title: Page Size
514514
description: The number of records to include in each pages.
515-
type: integer
515+
anyOf:
516+
- type: integer
517+
title: Number of Records
518+
- type: string
519+
title: Interpolated Value
520+
interpolation_context:
521+
- config
516522
examples:
517523
- 100
524+
- "{{ config['page_size'] }}"
518525
stop_condition:
519526
title: Stop Condition
520527
description: Template string evaluating when to stop paginating.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ class CursorPagination(BaseModel):
114114
],
115115
title="Cursor Value",
116116
)
117-
page_size: Optional[int] = Field(
117+
page_size: Optional[Union[int, str]] = Field(
118118
None,
119119
description="The number of records to include in each pages.",
120-
examples=[100],
120+
examples=[100, "{{ config['page_size'] }}"],
121121
title="Page Size",
122122
)
123123
stop_condition: Optional[str] = Field(
@@ -2741,7 +2741,7 @@ class HttpRequester(BaseModelWithDeprecations):
27412741
)
27422742
use_cache: Optional[bool] = Field(
27432743
False,
2744-
description="Enables stream requests caching. This field is automatically set by the CDK.",
2744+
description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).",
27452745
title="Use Cache",
27462746
)
27472747
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,14 +1554,18 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
15541554
f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}"
15551555
)
15561556

1557-
interpolated_start_value = (
1558-
InterpolatedString.create(
1559-
incrementing_count_cursor_model.start_value, # type: ignore
1557+
start_value: Union[int, str, None] = incrementing_count_cursor_model.start_value
1558+
# Pydantic Union type coercion can convert int 0 to string '0' depending on Union order.
1559+
# We need to handle both int and str representations of numeric values.
1560+
# Evaluate the InterpolatedString and convert to int for the ConcurrentCursor.
1561+
if start_value is not None:
1562+
interpolated_start_value = InterpolatedString.create(
1563+
str(start_value), # Ensure we pass a string to InterpolatedString.create
15601564
parameters=incrementing_count_cursor_model.parameters or {},
15611565
)
1562-
if incrementing_count_cursor_model.start_value
1563-
else 0
1564-
)
1566+
evaluated_start_value: int = int(interpolated_start_value.eval(config=config))
1567+
else:
1568+
evaluated_start_value = 0
15651569

15661570
cursor_field = self._get_catalog_defined_cursor_field(
15671571
stream_name=stream_name,
@@ -1593,7 +1597,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
15931597
connector_state_converter=connector_state_converter,
15941598
cursor_field=cursor_field,
15951599
slice_boundary_fields=None,
1596-
start=interpolated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
1600+
start=evaluated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
15971601
end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
15981602
)
15991603

@@ -1745,10 +1749,16 @@ def create_cursor_pagination(
17451749
self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder))
17461750
)
17471751

1752+
# Pydantic v1 Union type coercion can convert int to string depending on Union order.
1753+
# If page_size is a string that represents an integer (not an interpolation), convert it back.
1754+
page_size = model.page_size
1755+
if isinstance(page_size, str) and page_size.isdigit():
1756+
page_size = int(page_size)
1757+
17481758
return CursorPaginationStrategy(
17491759
cursor_value=model.cursor_value,
17501760
decoder=decoder_to_use,
1751-
page_size=model.page_size,
1761+
page_size=page_size,
17521762
stop_condition=model.stop_condition,
17531763
config=config,
17541764
parameters=model.parameters or {},
@@ -2917,8 +2927,14 @@ def create_offset_increment(
29172927
else None
29182928
)
29192929

2930+
# Pydantic v1 Union type coercion can convert int to string depending on Union order.
2931+
# If page_size is a string that represents an integer (not an interpolation), convert it back.
2932+
page_size = model.page_size
2933+
if isinstance(page_size, str) and page_size.isdigit():
2934+
page_size = int(page_size)
2935+
29202936
return OffsetIncrement(
2921-
page_size=model.page_size,
2937+
page_size=page_size,
29222938
config=config,
29232939
decoder=decoder_to_use,
29242940
extractor=extractor,
@@ -2930,8 +2946,14 @@ def create_offset_increment(
29302946
def create_page_increment(
29312947
model: PageIncrementModel, config: Config, **kwargs: Any
29322948
) -> PageIncrement:
2949+
# Pydantic v1 Union type coercion can convert int to string depending on Union order.
2950+
# If page_size is a string that represents an integer (not an interpolation), convert it back.
2951+
page_size = model.page_size
2952+
if isinstance(page_size, str) and page_size.isdigit():
2953+
page_size = int(page_size)
2954+
29332955
return PageIncrement(
2934-
page_size=model.page_size,
2956+
page_size=page_size,
29352957
config=config,
29362958
start_from_page=model.start_from_page or 0,
29372959
inject_on_first_request=model.inject_on_first_request or False,

airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
@dataclass
2424
class CursorPaginationStrategy(PaginationStrategy):
2525
"""
26-
Pagination strategy that evaluates an interpolated string to define the next page token
26+
Pagination strategy that evaluates an interpolated string to define the next page token.
2727
2828
Attributes:
29-
page_size (Optional[int]): the number of records to request
29+
page_size (Optional[Union[str, int]]): the number of records to request
3030
cursor_value (Union[InterpolatedString, str]): template string evaluating to the cursor value
3131
config (Config): connection config
3232
stop_condition (Optional[InterpolatedBoolean]): template string evaluating when to stop paginating
@@ -36,7 +36,7 @@ class CursorPaginationStrategy(PaginationStrategy):
3636
cursor_value: Union[InterpolatedString, str]
3737
config: Config
3838
parameters: InitVar[Mapping[str, Any]]
39-
page_size: Optional[int] = None
39+
page_size: Optional[Union[str, int]] = None
4040
stop_condition: Optional[Union[InterpolatedBoolean, str]] = None
4141
decoder: Decoder = field(
4242
default_factory=lambda: PaginationDecoderDecorator(decoder=JsonDecoder(parameters={}))
@@ -54,6 +54,14 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
5454
else:
5555
self._stop_condition = self.stop_condition
5656

57+
if isinstance(self.page_size, int) or (self.page_size is None):
58+
self._page_size = self.page_size
59+
else:
60+
page_size = InterpolatedString(self.page_size, parameters=parameters).eval(self.config)
61+
if not isinstance(page_size, int):
62+
raise Exception(f"{page_size} is of type {type(page_size)}. Expected {int}")
63+
self._page_size = page_size
64+
5765
@property
5866
def initial_token(self) -> Optional[Any]:
5967
"""
@@ -95,4 +103,4 @@ def next_page_token(
95103
return token if token else None
96104

97105
def get_page_size(self) -> Optional[int]:
98-
return self.page_size
106+
return self._page_size

unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,43 @@ def test_last_record_is_node_if_no_records():
112112
response = requests.Response()
113113
next_page_token = strategy.next_page_token(response, 0, None)
114114
assert next_page_token is None
115+
116+
117+
@pytest.mark.parametrize(
118+
"page_size_input, config, expected_page_size",
119+
[
120+
pytest.param(100, {}, 100, id="static_integer"),
121+
pytest.param("100", {}, 100, id="static_string"),
122+
pytest.param(
123+
"{{ config['page_size'] }}", {"page_size": 50}, 50, id="interpolated_from_config"
124+
),
125+
pytest.param("{{ config.get('page_size', 100) }}", {}, 100, id="interpolated_with_default"),
126+
pytest.param(
127+
"{{ config.get('page_size', 100) }}",
128+
{"page_size": 200},
129+
200,
130+
id="interpolated_override_default",
131+
),
132+
pytest.param(None, {}, None, id="none_page_size"),
133+
],
134+
)
135+
def test_interpolated_page_size(page_size_input, config, expected_page_size):
136+
"""Test that page_size supports interpolation from config."""
137+
strategy = CursorPaginationStrategy(
138+
page_size=page_size_input,
139+
cursor_value="token",
140+
config=config,
141+
parameters={},
142+
)
143+
assert strategy.get_page_size() == expected_page_size
144+
145+
146+
def test_interpolated_page_size_raises_on_non_integer():
147+
"""Test that initialization raises an exception when interpolation resolves to a non-integer."""
148+
with pytest.raises(Exception, match="is of type .* Expected"):
149+
CursorPaginationStrategy(
150+
page_size="{{ config['page_size'] }}",
151+
cursor_value="token",
152+
config={"page_size": "invalid"},
153+
parameters={},
154+
)

0 commit comments

Comments
 (0)