Skip to content

Commit 8828eea

Browse files
refactor: Clean NO_CURSOR_STATE_KEY from ConcurrentCursor, add tests for FinalStateCursor and parent retention
Co-Authored-By: gl_anatolii.yatsuk@airbyte.io <gl_anatolii.yatsuk@airbyte.io>
1 parent bddc671 commit 8828eea

4 files changed

Lines changed: 225 additions & 3 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,7 @@
618618
NoopMessageRepository,
619619
)
620620
from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository
621+
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
621622
from airbyte_cdk.sources.streams.call_rate import (
622623
APIBudget,
623624
FixedWindowCallRatePolicy,
@@ -3611,6 +3612,9 @@ def _is_cursor_older_than_retention_period(
36113612
Returns True if the cursor is older than the retention period (should use full refresh).
36123613
Returns False if the cursor is within the retention period (safe to use incremental).
36133614
"""
3615+
if stream_state.get(NO_CURSOR_STATE_KEY):
3616+
return False
3617+
36143618
cursor_datetime: datetime.datetime | None = None
36153619

36163620
for cursor in cursors:

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,9 +608,6 @@ def get_cursor_datetime_from_state(
608608
609609
Returns the cursor datetime if present and parseable, otherwise returns None.
610610
"""
611-
if stream_state.get(NO_CURSOR_STATE_KEY):
612-
return datetime.datetime.now(datetime.timezone.utc)
613-
614611
# Check if state is in concurrent format (need to convert to dict for type compatibility)
615612
mutable_state: MutableMapping[str, Any] = dict(stream_state)
616613
if self._connector_state_converter.is_state_message_compatible(mutable_state):

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,3 +603,215 @@ def test_final_state_cursor_skips_retention_check_and_uses_incremental():
603603

604604
records = get_records(source, _CONFIG, configured_catalog, state)
605605
assert len(records) == 1
606+
607+
608+
_PARENT_CHILD_MANIFEST: dict = {
609+
"version": "6.0.0",
610+
"type": "DeclarativeSource",
611+
"check": {"type": "CheckStream", "stream_names": ["ChildStream"]},
612+
"definitions": {
613+
"ParentStream": {
614+
"type": "StateDelegatingStream",
615+
"name": "ParentStream",
616+
"full_refresh_stream": {
617+
"type": "DeclarativeStream",
618+
"name": "ParentStream",
619+
"primary_key": [],
620+
"schema_loader": {
621+
"type": "InlineSchemaLoader",
622+
"schema": {
623+
"$schema": "http://json-schema.org/schema#",
624+
"properties": {},
625+
"type": "object",
626+
},
627+
},
628+
"retriever": {
629+
"type": "SimpleRetriever",
630+
"requester": {
631+
"type": "HttpRequester",
632+
"url_base": "https://api.test.com",
633+
"path": "/parents",
634+
"http_method": "GET",
635+
},
636+
"record_selector": {
637+
"type": "RecordSelector",
638+
"extractor": {"type": "DpathExtractor", "field_path": []},
639+
},
640+
},
641+
"incremental_sync": {
642+
"type": "DatetimeBasedCursor",
643+
"start_datetime": {
644+
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
645+
},
646+
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
647+
"datetime_format": "%Y-%m-%d",
648+
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
649+
"cursor_field": "updated_at",
650+
},
651+
},
652+
"incremental_stream": {
653+
"type": "DeclarativeStream",
654+
"name": "ParentStream",
655+
"primary_key": [],
656+
"schema_loader": {
657+
"type": "InlineSchemaLoader",
658+
"schema": {
659+
"$schema": "http://json-schema.org/schema#",
660+
"properties": {},
661+
"type": "object",
662+
},
663+
},
664+
"retriever": {
665+
"type": "SimpleRetriever",
666+
"requester": {
667+
"type": "HttpRequester",
668+
"url_base": "https://api.test.com",
669+
"path": "/parents_incremental",
670+
"http_method": "GET",
671+
},
672+
"record_selector": {
673+
"type": "RecordSelector",
674+
"extractor": {"type": "DpathExtractor", "field_path": []},
675+
},
676+
},
677+
"incremental_sync": {
678+
"type": "DatetimeBasedCursor",
679+
"start_datetime": {
680+
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
681+
},
682+
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
683+
"datetime_format": "%Y-%m-%d",
684+
"cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"],
685+
"cursor_granularity": "P1D",
686+
"step": "P15D",
687+
"cursor_field": "updated_at",
688+
"start_time_option": {
689+
"type": "RequestOption",
690+
"field_name": "start",
691+
"inject_into": "request_parameter",
692+
},
693+
"end_time_option": {
694+
"type": "RequestOption",
695+
"field_name": "end",
696+
"inject_into": "request_parameter",
697+
},
698+
},
699+
},
700+
},
701+
"ChildStream": {
702+
"type": "DeclarativeStream",
703+
"name": "ChildStream",
704+
"primary_key": [],
705+
"schema_loader": {
706+
"type": "InlineSchemaLoader",
707+
"schema": {
708+
"$schema": "http://json-schema.org/schema#",
709+
"properties": {},
710+
"type": "object",
711+
},
712+
},
713+
"retriever": {
714+
"type": "SimpleRetriever",
715+
"requester": {
716+
"type": "HttpRequester",
717+
"url_base": "https://api.test.com",
718+
"path": "/children/{{ stream_slice.parent_id }}",
719+
"http_method": "GET",
720+
},
721+
"record_selector": {
722+
"type": "RecordSelector",
723+
"extractor": {"type": "DpathExtractor", "field_path": []},
724+
},
725+
"partition_router": {
726+
"type": "SubstreamPartitionRouter",
727+
"parent_stream_configs": [
728+
{
729+
"stream": "#/definitions/ParentStream",
730+
"parent_key": "id",
731+
"partition_field": "parent_id",
732+
"incremental_dependency": True,
733+
}
734+
],
735+
},
736+
},
737+
"incremental_sync": {
738+
"type": "DatetimeBasedCursor",
739+
"start_datetime": {
740+
"datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}"
741+
},
742+
"end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"},
743+
"datetime_format": "%Y-%m-%d",
744+
"cursor_datetime_formats": ["%Y-%m-%d"],
745+
"cursor_field": "updated_at",
746+
},
747+
},
748+
},
749+
"streams": [{"$ref": "#/definitions/ChildStream"}],
750+
"spec": {
751+
"connection_specification": {
752+
"$schema": "http://json-schema.org/draft-07/schema#",
753+
"type": "object",
754+
"required": [],
755+
"properties": {},
756+
"additionalProperties": True,
757+
},
758+
"documentation_url": "https://example.org",
759+
"type": "Spec",
760+
},
761+
}
762+
763+
764+
def _create_parent_child_manifest_with_retention_period(
765+
api_retention_period: str,
766+
) -> dict:
767+
manifest = copy.deepcopy(_PARENT_CHILD_MANIFEST)
768+
manifest["definitions"]["ParentStream"]["api_retention_period"] = api_retention_period
769+
return manifest
770+
771+
772+
@freezegun.freeze_time("2024-07-15")
773+
def test_parent_state_delegating_stream_retention_falls_back_to_full_refresh():
774+
"""When parent StateDelegatingStream has old cursor in child state, retention triggers full refresh for parent."""
775+
manifest = _create_parent_child_manifest_with_retention_period("P7D")
776+
777+
with HttpMocker() as http_mocker:
778+
http_mocker.get(
779+
HttpRequest(url="https://api.test.com/parents"),
780+
HttpResponse(
781+
body=json.dumps(
782+
[{"id": 1, "name": "parent_1", "updated_at": "2024-07-14"}]
783+
)
784+
),
785+
)
786+
http_mocker.get(
787+
HttpRequest(url="https://api.test.com/children/1"),
788+
HttpResponse(
789+
body=json.dumps(
790+
[{"id": 10, "name": "child_1", "updated_at": "2024-07-14"}]
791+
)
792+
),
793+
)
794+
795+
state = [
796+
AirbyteStateMessage(
797+
type=AirbyteStateType.STREAM,
798+
stream=AirbyteStreamState(
799+
stream_descriptor=StreamDescriptor(
800+
name="ChildStream", namespace=None
801+
),
802+
stream_state=AirbyteStateBlob(
803+
use_global_cursor=False,
804+
state={"updated_at": "2024-07-14"},
805+
states=[],
806+
parent_state={"ParentStream": {"updated_at": "2024-06-01"}},
807+
lookback_window=0,
808+
),
809+
),
810+
)
811+
]
812+
source = ConcurrentDeclarativeSource(
813+
source_config=manifest, config=_CONFIG, catalog=None, state=state
814+
)
815+
configured_catalog = create_configured_catalog(source, _CONFIG)
816+
records = get_records(source, _CONFIG, configured_catalog, state)
817+
assert len(records) == 1

unit_tests/sources/streams/concurrent/test_cursor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1515
from airbyte_cdk.sources.message import MessageRepository
16+
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
1617
from airbyte_cdk.sources.streams.concurrent.clamping import (
1718
ClampingEndProvider,
1819
ClampingStrategy,
@@ -24,6 +25,7 @@
2425
ConcurrentCursor,
2526
CursorField,
2627
CursorValueType,
28+
FinalStateCursor,
2729
)
2830
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
2931
from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
@@ -1387,3 +1389,10 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then
13871389
)
13881390
== True
13891391
)
1392+
1393+
1394+
@freezegun.freeze_time("2024-07-15")
1395+
def test_final_state_cursor_get_cursor_datetime_from_state_returns_current_datetime():
1396+
cursor = FinalStateCursor("test_stream", None, Mock(spec=MessageRepository))
1397+
result = cursor.get_cursor_datetime_from_state({NO_CURSOR_STATE_KEY: True})
1398+
assert result == datetime(2024, 7, 15, tzinfo=timezone.utc)

0 commit comments

Comments
 (0)