Skip to content

Commit 0e33418

Browse files
fix: Address Copilot review comments
- Fix YAML bullet point indentation for consistency (Comment 8) - Add type guard for cursor_value to handle unexpected types (Comment 9) - Add test for warning log emission when cursor is too old (Comment 10) Co-Authored-By: unknown <>
1 parent 21da112 commit 0e33418

3 files changed

Lines changed: 45 additions & 6 deletions

File tree

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3757,12 +3757,12 @@ definitions:
37573757
description: |
37583758
The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.
37593759
This is useful for APIs like Stripe Events API which only retain data for 30 days.
3760-
* **PT1H**: 1 hour
3761-
* **P1D**: 1 day
3762-
* **P1W**: 1 week
3763-
* **P1M**: 1 month
3764-
* **P1Y**: 1 year
3765-
* **P30D**: 30 days
3760+
* **PT1H**: 1 hour
3761+
* **P1D**: 1 day
3762+
* **P1W**: 1 week
3763+
* **P1M**: 1 month
3764+
* **P1Y**: 1 year
3765+
* **P30D**: 30 days
37663766
type: string
37673767
examples:
37683768
- "P30D"

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3609,6 +3609,9 @@ def _is_cursor_older_than_retention_period(
36093609
if not cursor_value:
36103610
return False
36113611

3612+
if not isinstance(cursor_value, (str, int)):
3613+
return False
3614+
36123615
retention_duration = parse_duration(api_retention_period)
36133616
retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration
36143617

unit_tests/sources/declarative/test_state_delegating_stream.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
import copy
66
import json
7+
import logging
78
from unittest.mock import MagicMock
89

910
import freezegun
11+
import pytest
1012

1113
from airbyte_cdk.models import (
1214
AirbyteStateBlob,
@@ -370,3 +372,37 @@ def test_cursor_age_validation_with_1_day_retention_falls_back():
370372

371373
records = get_records(source, _CONFIG, configured_catalog, state)
372374
assert len(records) == 1
375+
376+
377+
@freezegun.freeze_time("2024-07-15")
378+
def test_cursor_age_validation_emits_warning_when_falling_back(caplog):
379+
"""Test that a warning is emitted when cursor is older than retention period."""
380+
manifest = _create_manifest_with_retention_period("P7D")
381+
382+
with HttpMocker() as http_mocker:
383+
http_mocker.get(
384+
HttpRequest(url="https://api.test.com/items"),
385+
HttpResponse(body=json.dumps([{"id": 1, "updated_at": "2024-07-14"}])),
386+
)
387+
388+
state = [
389+
AirbyteStateMessage(
390+
type=AirbyteStateType.STREAM,
391+
stream=AirbyteStreamState(
392+
stream_descriptor=StreamDescriptor(name="TestStream", namespace=None),
393+
stream_state=AirbyteStateBlob(updated_at="2024-07-01"),
394+
),
395+
)
396+
]
397+
398+
with caplog.at_level(logging.WARNING):
399+
source = ConcurrentDeclarativeSource(
400+
source_config=manifest, config=_CONFIG, catalog=None, state=state
401+
)
402+
configured_catalog = create_configured_catalog(source, _CONFIG)
403+
get_records(source, _CONFIG, configured_catalog, state)
404+
405+
warning_messages = [r.message for r in caplog.records if r.levelno == logging.WARNING]
406+
assert any("TestStream" in msg and "older than" in msg and "P7D" in msg for msg in warning_messages), (
407+
f"Expected warning about stale cursor not found. Warnings: {warning_messages}"
408+
)

0 commit comments

Comments
 (0)