Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@
NoopMessageRepository,
)
from airbyte_cdk.sources.message.repository import StateFilteringMessageRepository
from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY
from airbyte_cdk.sources.streams.call_rate import (
APIBudget,
FixedWindowCallRatePolicy,
Expand Down Expand Up @@ -3578,7 +3579,14 @@ def create_state_delegating_stream(
has_parent = False if has_parent_state is None else has_parent_state

if not stream_state and not has_parent:
return self._create_component_from_model(model.full_refresh_stream, config=config, **kwargs) # type: ignore[no-any-return]
return self._create_component_from_model(
model.full_refresh_stream, config=config, **kwargs
) # type: ignore[no-any-return]

if stream_state and stream_state.get(NO_CURSOR_STATE_KEY):
return self._create_component_from_model(
model.incremental_stream, config=config, **kwargs
) # type: ignore[no-any-return]

incremental_stream: DefaultStream = self._create_component_from_model(
model.incremental_stream, config=config, **kwargs
Expand Down Expand Up @@ -3615,20 +3623,12 @@ def _is_cursor_older_than_retention_period(

for cursor in cursors:
if not hasattr(cursor, "get_cursor_datetime_from_state"):
raise SystemError(
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not have "
f"get_cursor_datetime_from_state method. Cursor age validation with "
f"api_retention_period is not supported for this cursor type."
)
continue

try:
cursor_datetime = cursor.get_cursor_datetime_from_state(stream_state)
except NotImplementedError:
raise SystemError(
f"Stream '{stream_name}' cursor type '{type(cursor).__name__}' does not implement "
f"get_cursor_datetime_from_state. Cursor age validation with "
f"api_retention_period is not supported for this cursor type."
)
continue

if cursor_datetime is not None:
break
Expand Down
37 changes: 37 additions & 0 deletions unit_tests/sources/declarative/test_state_delegating_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,40 @@ def test_cursor_age_validation_raises_error_for_unparseable_cursor():

with pytest.raises(ValueError, match="not-a-date"):
source.discover(logger=MagicMock(), config=_CONFIG)


@freezegun.freeze_time("2024-07-15")
def test_final_state_cursor_skips_retention_check_and_uses_incremental():
"""When state is a final state from FinalStateCursor, skip retention check and use incremental."""
manifest = _create_manifest_with_retention_period("P7D")

with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(
url="https://api.test.com/items_with_filtration?start=2024-07-01&end=2024-07-15"
),
HttpResponse(
body=json.dumps(
[
{"id": 1, "name": "item_1", "updated_at": "2024-07-14"},
]
)
),
)

state = [
AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
stream_state=AirbyteStateBlob(__ab_no_cursor_state_message=True),
),
)
]
source = ConcurrentDeclarativeSource(
source_config=manifest, config=_CONFIG, catalog=None, state=state
)
configured_catalog = create_configured_catalog(source, _CONFIG)

records = get_records(source, _CONFIG, configured_catalog, state)
assert len(records) == 1
Loading