Skip to content

Commit 43dc47e

Browse files
fix: Raise ValueError for unparseable cursor datetime when api_retention_period is set
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
1 parent 653022b commit 43dc47e

2 files changed

Lines changed: 30 additions & 3 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3609,9 +3609,9 @@ def _is_cursor_older_than_retention_period(
36093609
Delegates cursor datetime extraction to cursor class instances via
36103610
get_cursor_datetime_from_state, which handles format-specific parsing.
36113611
3612-
Returns True if the cursor is older than the retention period or if the cursor is
3613-
invalid/unparseable (should use full refresh).
3612+
Returns True if the cursor is older than the retention period (should use full refresh).
36143613
Returns False if the cursor is within the retention period (safe to use incremental).
3614+
Raises ValueError if the cursor datetime could not be parsed from state.
36153615
"""
36163616
# Skip retention check for concurrent state format (e.g. {"state_type": "date-range", "slices": [...]}).
36173617
# The DatetimeBasedCursor used for the age check only handles sequential state format.
@@ -3640,7 +3640,11 @@ def _is_cursor_older_than_retention_period(
36403640
break
36413641

36423642
if cursor_datetime is None:
3643-
return True
3643+
raise ValueError(
3644+
f"Stream '{stream_name}' has api_retention_period set to '{api_retention_period}' "
3645+
f"but the cursor datetime could not be parsed from state. Check that cursor_field "
3646+
f"and datetime_format match the state format."
3647+
)
36443648

36453649
retention_duration = parse_duration(api_retention_period)
36463650
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from unittest.mock import MagicMock
99

1010
import freezegun
11+
import pytest
1112

1213
from airbyte_cdk.models import (
1314
AirbyteStateBlob,
@@ -553,3 +554,25 @@ def test_cursor_age_validation_skips_incrementing_count_cursor():
553554

554555
records = get_records(source, _CONFIG, configured_catalog, state)
555556
assert len(records) >= 0
557+
558+
559+
def test_cursor_age_validation_raises_error_for_unparseable_cursor():
560+
"""Test that unparseable cursor datetime raises ValueError when api_retention_period is set."""
561+
manifest = _create_manifest_with_retention_period("P7D")
562+
563+
state = [
564+
AirbyteStateMessage(
565+
type=AirbyteStateType.STREAM,
566+
stream=AirbyteStreamState(
567+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
568+
stream_state=AirbyteStateBlob(updated_at="not-a-date"),
569+
),
570+
)
571+
]
572+
573+
source = ConcurrentDeclarativeSource(
574+
source_config=manifest, config=_CONFIG, catalog=None, state=state
575+
)
576+
577+
with pytest.raises(ValueError, match="could not be parsed from state"):
578+
source.discover(logger=MagicMock(), config=_CONFIG)

0 commit comments

Comments
 (0)