Skip to content

Commit be72c5c

Browse files
feat: Add support for per-partition state and IncrementingCountCursor validation
- Add _extract_cursor_value_from_state helper to handle different state structures - For per-partition state, use global cursor value from 'state' key - Raise ValueError when IncrementingCountCursor is used with api_retention_period - Add unit tests for per-partition state (cursor too old and within retention) - Add unit test for IncrementingCountCursor error handling Co-Authored-By: unknown <>
1 parent 567ca7a commit be72c5c

2 files changed

Lines changed: 181 additions & 3 deletions

File tree

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3599,13 +3599,22 @@ def _is_cursor_older_than_retention_period(
35993599
If the cursor is too old, the incremental API may not have data going back that far,
36003600
so we should fall back to a full refresh to avoid data loss.
36013601
3602-
The state could have been produced by either full_refresh_stream (first sync) or
3603-
incremental_stream (subsequent syncs), so we try parsing with formats from both.
3602+
This method handles different state structures:
3603+
- Simple cursor state: {"cursor_field": "value"}
3604+
- Per-partition state: {"state": {"cursor_field": "value"}, "states": [...]}
36043605
36053606
Returns True if the cursor is older than the retention period or if the cursor is
36063607
invalid/unparseable (should use full refresh).
36073608
Returns False if the cursor is within the retention period (safe to use incremental).
36083609
"""
3610+
for incremental_sync in incremental_sync_sources:
3611+
if isinstance(incremental_sync, IncrementingCountCursorModel):
3612+
raise ValueError(
3613+
f"Stream '{stream_name}' uses IncrementingCountCursor which is not supported "
3614+
f"with api_retention_period. IncrementingCountCursor does not use datetime-based "
3615+
f"cursors, so cursor age validation cannot be performed."
3616+
)
3617+
36093618
cursor_field = None
36103619
for incremental_sync in incremental_sync_sources:
36113620
cursor_field = getattr(incremental_sync, "cursor_field", None)
@@ -3615,7 +3624,7 @@ def _is_cursor_older_than_retention_period(
36153624
if not cursor_field:
36163625
return True
36173626

3618-
cursor_value = stream_state.get(cursor_field)
3627+
cursor_value = self._extract_cursor_value_from_state(stream_state, cursor_field)
36193628
if not cursor_value:
36203629
return True
36213630

@@ -3641,6 +3650,29 @@ def _is_cursor_older_than_retention_period(
36413650

36423651
return False
36433652

3653+
def _extract_cursor_value_from_state(
3654+
self,
3655+
stream_state: Mapping[str, Any],
3656+
cursor_field: str,
3657+
) -> Any:
3658+
"""Extract cursor value from state, handling different state structures.
3659+
3660+
Supports:
3661+
- Simple cursor state: {"cursor_field": "value"} -> returns "value"
3662+
- Per-partition state: {"state": {"cursor_field": "value"}, ...} -> returns "value"
3663+
(uses global cursor from "state" key)
3664+
3665+
Returns None if cursor value cannot be extracted.
3666+
"""
3667+
if cursor_field in stream_state:
3668+
return stream_state.get(cursor_field)
3669+
3670+
global_state = stream_state.get("state")
3671+
if isinstance(global_state, dict) and cursor_field in global_state:
3672+
return global_state.get(cursor_field)
3673+
3674+
return None
3675+
36443676
def _parse_cursor_datetime(
36453677
self,
36463678
cursor_value: str,

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from unittest.mock import MagicMock
99

1010
import freezegun
11+
import pytest
1112

1213
from airbyte_cdk.models import (
1314
AirbyteStateBlob,
@@ -405,3 +406,148 @@ def test_cursor_age_validation_emits_warning_when_falling_back(caplog):
405406
assert any(
406407
"TestStream" in msg and "older than" in msg and "P7D" in msg for msg in warning_messages
407408
), f"Expected warning about stale cursor not found. Warnings: {warning_messages}"
409+
410+
411+
@freezegun.freeze_time("2024-07-15")
412+
def test_cursor_age_validation_with_per_partition_state_uses_global_cursor():
413+
"""Test that per-partition state structure uses global cursor for age validation."""
414+
manifest = _create_manifest_with_retention_period("P7D")
415+
416+
with HttpMocker() as http_mocker:
417+
http_mocker.get(
418+
HttpRequest(url="https://api.test.com/items"),
419+
HttpResponse(
420+
body=json.dumps(
421+
[
422+
{"id": 1, "name": "item_1", "updated_at": "2024-07-13"},
423+
]
424+
)
425+
),
426+
)
427+
428+
state = [
429+
AirbyteStateMessage(
430+
type=AirbyteStateType.STREAM,
431+
stream=AirbyteStreamState(
432+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
433+
stream_state=AirbyteStateBlob(
434+
state={"updated_at": "2024-07-01"},
435+
states=[
436+
{
437+
"partition": {"parent_id": "1"},
438+
"cursor": {"updated_at": "2024-07-10"},
439+
},
440+
{
441+
"partition": {"parent_id": "2"},
442+
"cursor": {"updated_at": "2024-07-05"},
443+
},
444+
],
445+
use_global_cursor=False,
446+
),
447+
),
448+
)
449+
]
450+
source = ConcurrentDeclarativeSource(
451+
source_config=manifest, config=_CONFIG, catalog=None, state=state
452+
)
453+
configured_catalog = create_configured_catalog(source, _CONFIG)
454+
455+
records = get_records(source, _CONFIG, configured_catalog, state)
456+
assert len(records) == 1
457+
458+
459+
@freezegun.freeze_time("2024-07-15")
460+
def test_cursor_age_validation_with_per_partition_state_within_retention():
461+
"""Test per-partition state with global cursor within retention uses incremental.
462+
463+
This test verifies that when the global cursor in a per-partition state structure
464+
is within the retention period, the incremental stream is selected (not full refresh).
465+
We verify this by checking that the incremental endpoint is called, not the full refresh one.
466+
"""
467+
manifest = _create_manifest_with_retention_period("P30D")
468+
469+
with HttpMocker() as http_mocker:
470+
http_mocker.get(
471+
HttpRequest(
472+
url="https://api.test.com/items_with_filtration",
473+
query_params={"start": "2024-07-01", "end": "2024-07-15"},
474+
),
475+
HttpResponse(
476+
body=json.dumps(
477+
[
478+
{"id": 3, "name": "item_3", "updated_at": "2024-07-14"},
479+
]
480+
)
481+
),
482+
)
483+
484+
state = [
485+
AirbyteStateMessage(
486+
type=AirbyteStateType.STREAM,
487+
stream=AirbyteStreamState(
488+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
489+
stream_state=AirbyteStateBlob(
490+
state={"updated_at": "2024-07-10"},
491+
states=[
492+
{
493+
"partition": {"parent_id": "1"},
494+
"cursor": {"updated_at": "2024-07-10"},
495+
},
496+
],
497+
use_global_cursor=False,
498+
),
499+
),
500+
)
501+
]
502+
source = ConcurrentDeclarativeSource(
503+
source_config=manifest, config=_CONFIG, catalog=None, state=state
504+
)
505+
configured_catalog = create_configured_catalog(source, _CONFIG)
506+
507+
records = get_records(source, _CONFIG, configured_catalog, state)
508+
assert len(records) == 1
509+
510+
511+
def _create_manifest_with_incrementing_count_cursor(api_retention_period: str) -> dict:
512+
"""Create a manifest with IncrementingCountCursor and api_retention_period."""
513+
manifest = copy.deepcopy(_MANIFEST)
514+
manifest["definitions"]["TestStream"]["api_retention_period"] = api_retention_period
515+
516+
incrementing_cursor = {
517+
"type": "IncrementingCountCursor",
518+
"cursor_field": "id",
519+
"start_value": 0,
520+
}
521+
manifest["definitions"]["TestStream"]["full_refresh_stream"]["incremental_sync"] = (
522+
incrementing_cursor
523+
)
524+
manifest["definitions"]["TestStream"]["incremental_stream"]["incremental_sync"] = (
525+
incrementing_cursor
526+
)
527+
return manifest
528+
529+
530+
def test_cursor_age_validation_raises_error_for_incrementing_count_cursor():
531+
"""Test that IncrementingCountCursor with api_retention_period raises an error."""
532+
manifest = _create_manifest_with_incrementing_count_cursor("P7D")
533+
534+
state = [
535+
AirbyteStateMessage(
536+
type=AirbyteStateType.STREAM,
537+
stream=AirbyteStreamState(
538+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
539+
stream_state=AirbyteStateBlob(id=100),
540+
),
541+
)
542+
]
543+
544+
source = ConcurrentDeclarativeSource(
545+
source_config=manifest, config=_CONFIG, catalog=None, state=state
546+
)
547+
548+
with pytest.raises(ValueError) as exc_info:
549+
source.discover(logger=MagicMock(), config=_CONFIG)
550+
551+
assert "IncrementingCountCursor" in str(exc_info.value)
552+
assert "not supported" in str(exc_info.value)
553+
assert "api_retention_period" in str(exc_info.value)

0 commit comments

Comments
 (0)