Skip to content

Commit 057f8dc

Browse files
feat: add ClientSideIncrementalRetrieverDecorator for custom retrievers
This change automatically applies client-side incremental filtering to custom retrievers when is_client_side_incremental is enabled in the manifest. Previously, custom retrievers bypassed the ClientSideIncrementalRecordFilterDecorator that SimpleRetriever uses, causing all records to be emitted on every sync even when client_side_incremental was configured. Changes: - Add ClientSideIncrementalRetrieverDecorator class that wraps any Retriever and filters records using cursor.should_be_synced() - Update model_to_component_factory.create_default_stream() to wrap custom retrievers with the decorator when is_client_side_incremental is enabled - Add unit tests for the new decorator Co-Authored-By: Daryna Ishchenko <darina.ishchenko17@gmail.com>
1 parent 15542de commit 057f8dc

4 files changed

Lines changed: 275 additions & 1 deletion

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@
535535
)
536536
from airbyte_cdk.sources.declarative.retrievers import (
537537
AsyncRetriever,
538+
ClientSideIncrementalRetrieverDecorator,
538539
LazySimpleRetriever,
539540
SimpleRetriever,
540541
)
@@ -2077,6 +2078,7 @@ def create_default_stream(
20772078
else concurrent_cursor
20782079
)
20792080

2081+
is_client_side_incremental = self._is_client_side_filtering_enabled(model)
20802082
retriever = self._create_component_from_model(
20812083
model=model.retriever,
20822084
config=config,
@@ -2086,14 +2088,23 @@ def create_default_stream(
20862088
stream_slicer=stream_slicer,
20872089
partition_router=partition_router,
20882090
has_stop_condition_cursor=self._is_stop_condition_on_cursor(model),
2089-
is_client_side_incremental_sync=self._is_client_side_filtering_enabled(model),
2091+
is_client_side_incremental_sync=is_client_side_incremental,
20902092
cursor=concurrent_cursor,
20912093
transformations=transformations,
20922094
file_uploader=file_uploader,
20932095
incremental_sync=model.incremental_sync,
20942096
)
20952097
if isinstance(retriever, AsyncRetriever):
20962098
stream_slicer = retriever.stream_slicer
2099+
elif (
2100+
is_client_side_incremental
2101+
and not isinstance(retriever, SimpleRetriever)
2102+
and not isinstance(concurrent_cursor, FinalStateCursor)
2103+
):
2104+
retriever = ClientSideIncrementalRetrieverDecorator(
2105+
retriever=retriever,
2106+
cursor=concurrent_cursor,
2107+
)
20972108

20982109
schema_loader: SchemaLoader
20992110
if model.schema_loader and isinstance(model.schema_loader, list):

airbyte_cdk/sources/declarative/retrievers/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@
33
#
44

55
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
6+
from airbyte_cdk.sources.declarative.retrievers.client_side_incremental_retriever_decorator import (
7+
ClientSideIncrementalRetrieverDecorator,
8+
)
69
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
710
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
811
LazySimpleRetriever,
912
SimpleRetriever,
1013
)
1114

1215
__all__ = [
16+
"ClientSideIncrementalRetrieverDecorator",
1317
"Retriever",
1418
"SimpleRetriever",
1519
"AsyncRetriever",
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, Iterable, Mapping, Optional
6+
7+
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
8+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
9+
from airbyte_cdk.sources.streams.core import StreamData
10+
from airbyte_cdk.sources.types import Record, StreamSlice
11+
12+
13+
class ClientSideIncrementalRetrieverDecorator(Retriever):
14+
"""
15+
Decorator that wraps a Retriever and applies client-side incremental filtering.
16+
17+
This decorator filters out records that are older than the cursor state,
18+
enabling client-side incremental sync for custom retrievers that don't
19+
natively support the ClientSideIncrementalRecordFilterDecorator.
20+
21+
When a stream uses `is_client_side_incremental: true` with a custom retriever,
22+
this decorator ensures that only records newer than the cursor state are emitted.
23+
24+
Attributes:
25+
retriever: The underlying retriever to wrap
26+
cursor: The cursor used to determine if records should be synced
27+
"""
28+
29+
def __init__(
30+
self,
31+
retriever: Retriever,
32+
cursor: Cursor,
33+
):
34+
self._retriever = retriever
35+
self._cursor = cursor
36+
37+
def read_records(
38+
self,
39+
records_schema: Mapping[str, Any],
40+
stream_slice: Optional[StreamSlice] = None,
41+
) -> Iterable[StreamData]:
42+
for record in self._retriever.read_records(
43+
records_schema=records_schema,
44+
stream_slice=stream_slice,
45+
):
46+
if isinstance(record, Record):
47+
if self._cursor.should_be_synced(record):
48+
yield record
49+
elif isinstance(record, Mapping):
50+
record_obj = Record(
51+
data=record, associated_slice=stream_slice, stream_name=""
52+
)
53+
if self._cursor.should_be_synced(record_obj):
54+
yield record
55+
else:
56+
yield record
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from datetime import datetime, timedelta, timezone
6+
from typing import Any
7+
from unittest.mock import Mock
8+
9+
import pytest
10+
11+
from airbyte_cdk.sources.declarative.retrievers import (
12+
ClientSideIncrementalRetrieverDecorator,
13+
Retriever,
14+
)
15+
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
16+
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
17+
CustomFormatConcurrentStreamStateConverter,
18+
)
19+
from airbyte_cdk.sources.types import Record, StreamSlice
20+
21+
DATE_FORMAT = "%Y-%m-%d"
22+
23+
24+
class MockRetriever(Retriever):
25+
"""Mock retriever that yields predefined records."""
26+
27+
def __init__(self, records: list[dict[str, Any]]):
28+
self._records = records
29+
30+
def read_records(
31+
self,
32+
records_schema: dict[str, Any],
33+
stream_slice: StreamSlice | None = None,
34+
):
35+
for record in self._records:
36+
yield record
37+
38+
39+
@pytest.fixture
40+
def cursor_with_state():
41+
"""Create a cursor with state set to 2021-01-03."""
42+
return ConcurrentCursor(
43+
stream_name="test_stream",
44+
stream_namespace=None,
45+
stream_state={"created_at": "2021-01-03"},
46+
message_repository=Mock(),
47+
connector_state_manager=Mock(),
48+
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
49+
datetime_format=DATE_FORMAT
50+
),
51+
cursor_field=CursorField("created_at"),
52+
slice_boundary_fields=("start", "end"),
53+
start=datetime(2021, 1, 1, tzinfo=timezone.utc),
54+
end_provider=lambda: datetime(2021, 1, 10, tzinfo=timezone.utc),
55+
slice_range=timedelta(days=365 * 10),
56+
)
57+
58+
59+
@pytest.fixture
60+
def cursor_without_state():
61+
"""Create a cursor without state."""
62+
return ConcurrentCursor(
63+
stream_name="test_stream",
64+
stream_namespace=None,
65+
stream_state={},
66+
message_repository=Mock(),
67+
connector_state_manager=Mock(),
68+
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
69+
datetime_format=DATE_FORMAT
70+
),
71+
cursor_field=CursorField("created_at"),
72+
slice_boundary_fields=("start", "end"),
73+
start=datetime(2021, 1, 1, tzinfo=timezone.utc),
74+
end_provider=lambda: datetime(2021, 1, 10, tzinfo=timezone.utc),
75+
slice_range=timedelta(days=365 * 10),
76+
)
77+
78+
79+
@pytest.mark.parametrize(
80+
"records,cursor_state,expected_ids",
81+
[
82+
pytest.param(
83+
[
84+
{"id": 1, "created_at": "2020-01-03"},
85+
{"id": 2, "created_at": "2021-01-03"},
86+
{"id": 3, "created_at": "2021-01-04"},
87+
{"id": 4, "created_at": "2021-02-01"},
88+
],
89+
{"created_at": "2021-01-03"},
90+
[2, 3, 4],
91+
id="filters_records_older_than_cursor_state",
92+
),
93+
pytest.param(
94+
[
95+
{"id": 1, "created_at": "2020-01-03"},
96+
{"id": 2, "created_at": "2021-01-03"},
97+
{"id": 3, "created_at": "2021-01-04"},
98+
],
99+
{},
100+
[2, 3],
101+
id="no_state_uses_start_date_for_filtering",
102+
),
103+
pytest.param(
104+
[],
105+
{"created_at": "2021-01-03"},
106+
[],
107+
id="empty_records_returns_empty",
108+
),
109+
],
110+
)
111+
def test_client_side_incremental_retriever_decorator_with_dict_records(
112+
records: list[dict[str, Any]],
113+
cursor_state: dict[str, Any],
114+
expected_ids: list[int],
115+
):
116+
"""Test filtering with dict records."""
117+
cursor = ConcurrentCursor(
118+
stream_name="test_stream",
119+
stream_namespace=None,
120+
stream_state=cursor_state,
121+
message_repository=Mock(),
122+
connector_state_manager=Mock(),
123+
connector_state_converter=CustomFormatConcurrentStreamStateConverter(
124+
datetime_format=DATE_FORMAT
125+
),
126+
cursor_field=CursorField("created_at"),
127+
slice_boundary_fields=("start", "end"),
128+
start=datetime(2021, 1, 1, tzinfo=timezone.utc),
129+
end_provider=lambda: datetime(2021, 12, 31, tzinfo=timezone.utc),
130+
slice_range=timedelta(days=365 * 10),
131+
)
132+
133+
mock_retriever = MockRetriever(records)
134+
decorator = ClientSideIncrementalRetrieverDecorator(
135+
retriever=mock_retriever,
136+
cursor=cursor,
137+
)
138+
139+
stream_slice = StreamSlice(partition={}, cursor_slice={})
140+
result = list(decorator.read_records(records_schema={}, stream_slice=stream_slice))
141+
142+
assert [r["id"] for r in result] == expected_ids
143+
144+
145+
def test_client_side_incremental_retriever_decorator_with_record_objects(
146+
cursor_with_state,
147+
):
148+
"""Test filtering with Record objects."""
149+
stream_slice = StreamSlice(partition={}, cursor_slice={})
150+
records = [
151+
Record(
152+
data={"id": 1, "created_at": "2020-01-03"},
153+
associated_slice=stream_slice,
154+
stream_name="test_stream",
155+
),
156+
Record(
157+
data={"id": 2, "created_at": "2021-01-03"},
158+
associated_slice=stream_slice,
159+
stream_name="test_stream",
160+
),
161+
Record(
162+
data={"id": 3, "created_at": "2021-01-04"},
163+
associated_slice=stream_slice,
164+
stream_name="test_stream",
165+
),
166+
]
167+
168+
class MockRetrieverWithRecords(Retriever):
169+
def read_records(self, records_schema, stream_slice=None):
170+
yield from records
171+
172+
mock_retriever = MockRetrieverWithRecords()
173+
decorator = ClientSideIncrementalRetrieverDecorator(
174+
retriever=mock_retriever,
175+
cursor=cursor_with_state,
176+
)
177+
178+
result = list(decorator.read_records(records_schema={}, stream_slice=stream_slice))
179+
180+
assert [r["id"] for r in result] == [2, 3]
181+
182+
183+
def test_client_side_incremental_retriever_decorator_passes_through_non_record_data(
184+
cursor_with_state,
185+
):
186+
"""Test that non-dict/non-Record data is passed through unchanged."""
187+
stream_slice = StreamSlice(partition={}, cursor_slice={})
188+
189+
class MockRetrieverWithMixedData(Retriever):
190+
def read_records(self, records_schema, stream_slice=None):
191+
yield "some_string"
192+
yield 123
193+
yield {"id": 1, "created_at": "2021-01-04"}
194+
195+
mock_retriever = MockRetrieverWithMixedData()
196+
decorator = ClientSideIncrementalRetrieverDecorator(
197+
retriever=mock_retriever,
198+
cursor=cursor_with_state,
199+
)
200+
201+
result = list(decorator.read_records(records_schema={}, stream_slice=stream_slice))
202+
203+
assert result == ["some_string", 123, {"id": 1, "created_at": "2021-01-04"}]

0 commit comments

Comments
 (0)