feat(cdk): Add cursor age validation to StateDelegatingStream#890
Conversation
This adds an optional api_retention_period field to StateDelegatingStream that validates whether a cursor is within an API's data retention window before switching from full refresh to incremental sync. When the cursor value is older than the retention period, the connector automatically falls back to a full refresh to avoid data loss. This is useful for APIs like Stripe Events API which only retain data for 30 days. Key changes: - Add api_retention_period field to StateDelegatingStream schema (ISO8601 duration) - Implement cursor age validation in model_to_component_factory.py - Emit warning log when falling back to full refresh due to stale cursor - Add unit tests for cursor age validation Fixes: airbytehq/oncall#11103 Co-Authored-By: unknown <>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Co-Authored-By: unknown <>
…sor-age-validation
|
/autofix
|
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This pull request adds cursor age validation to StateDelegatingStream to automatically fall back to full refresh when a cursor is older than an API's data retention period. This prevents data loss scenarios where APIs (like Stripe Events) only retain data for a limited time window (e.g., 30 days), and a sync resumes after that window has passed.
Changes:
- Added optional
api_retention_periodfield toStateDelegatingStreamschema (ISO8601 duration format) - Implemented cursor age validation logic that compares cursor datetime against retention cutoff
- Added warning log emission when falling back to full refresh due to stale cursor
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
airbyte_cdk/sources/declarative/declarative_component_schema.yaml |
Added api_retention_period field definition with ISO8601 duration examples |
airbyte_cdk/sources/declarative/models/declarative_component_schema.py |
Auto-generated model updates reflecting the new field in StateDelegatingStream |
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py |
Core implementation of cursor age validation with three new helper methods |
unit_tests/sources/declarative/test_state_delegating_stream.py |
Added three test cases covering cursor age validation scenarios (too old, within retention, edge case) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
- Fix YAML bullet point indentation for consistency (Comment 8) - Add type guard for cursor_value to handle unexpected types (Comment 9) - Add test for warning log emission when cursor is too old (Comment 10) Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
…_model Addresses review comment from tolik0 - the incremental_sync check is now performed in the calling method before invoking _is_cursor_older_than_retention_period. Co-Authored-By: unknown <>
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Addresses review comment from tolik0 - if the cursor value or format is incorrect, we should use full_refresh_stream instead, as it indicates that the stream_state is unusable. Co-Authored-By: unknown <>
Address tolik0's review comment: During the first sync, the state will be produced by full_refresh_stream, and during subsequent syncs, by incremental_stream. We need to correctly parse the state for both cases. Changes: - Extract incremental_sync from both full_refresh_stream and incremental_stream - Update _is_cursor_older_than_retention_period to accept list of sources - Update _parse_cursor_datetime to collect and try formats from all sources Co-Authored-By: unknown <>
When cursor age validation detects a stale cursor on a StateDelegatingStream, it clears the stream state and emits an empty state message. However, if the stream is not in the user's configured catalog (e.g. a parent stream created as a dependency), the destination does not know about it and crashes with 'Stream not found'. This fix checks whether the stream is in the configured catalog before emitting the state-clearing message. If no catalog is provided (e.g. during discover), state is emitted as before for backward compatibility. Co-Authored-By: unknown <>
…unconfigured streams Instead of running cursor age validation and then suppressing just the state message, skip the entire api_retention_period block for streams not in the configured catalog. This avoids unnecessary work (creating both stream components, comparing cursor age) for parent-stream dependencies that the destination doesn't know about. Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
…sor fallback When cursor age validation detects a stale cursor, the state must be cleared before constructing the full_refresh_stream so its cursor starts from start_date instead of inheriting the old stale cursor position. Changes: - Reorder create_state_delegating_stream to clear state before stream construction - Simplify _is_cursor_older_than_retention_period to only use incremental_cursor - Handle NO_CURSOR_STATE_KEY directly on raw state dict (completed full refresh = current) - Update test to verify NO_CURSOR_STATE_KEY uses incremental (not full refresh again) Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Bumps the source-declarative-manifest base image to pick up the StateDelegatingStream cursor age validation fix from airbytehq/airbyte-python-cdk#890. Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
…tale cursor fallback" This reverts commit 39a2aee.
Bumps the source-declarative-manifest base image to pick up the StateDelegatingStream cursor age validation fix from airbytehq/airbyte-python-cdk#890. Bumps dockerImageTag to 5.15.21-rc.3. Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Summary
Adds an optional
api_retention_periodfield toStateDelegatingStreamthat validates whether a cursor is within an API's data retention window before using incremental sync. When the cursor is older than the retention period, the connector automatically falls back to full refresh to avoid data loss.This addresses the issue where APIs like Stripe Events only retain data for 30 days - if a sync fails mid-way and resumes after the retention window, incremental sync would miss data.
Key changes:
api_retention_periodfield to StateDelegatingStream schema (ISO8601 duration format: P30D, P1D, PT1H, etc.)get_cursor_datetime_from_stateto concurrent cursor hierarchy (Cursorbase,ConcurrentCursor,ConcurrentPerPartitionCursor,FinalStateCursor)ValueErrorat model level whenIncrementingCountCursoris used withapi_retention_periodExample usage:
Updates since last revision
Latest changes (fix temporal ordering of state clearing — per Alfredo Garcia (@agarctfi)'s review):
Previously,
full_refresh_streamwas constructed before clearing state, so its cursor inherited the old stale cursor position — making the "full refresh" fallback start from the stale cursor instead ofstart_date. This fix restructurescreate_state_delegating_streamso that:incremental_streamcursor is used for the retention check (no need to constructfull_refresh_streamfirst)full_refresh_stream, so its cursor starts fromstart_dateNO_CURSOR_STATE_KEYis checked directly on the raw state dict — if present, the previous sync was a completed full refresh and the stream uses incremental (not full refresh again)_is_cursor_older_than_retention_periodsimplified: removedfull_refresh_cursorparameter, only usesincremental_cursorPrevious changes (removed early return for NO_CURSOR_STATE_KEY — per tolik0's feedback):
NO_CURSOR_STATE_KEYin_is_cursor_older_than_retention_periodFinalStateCursor.get_cursor_datetime_from_statenow handles this case by returningnow()forNO_CURSOR_STATE_KEY: Truestate, which is always within any retention periodEarlier changes (state clearing on fallback to full refresh — per brianjlai):
ConnectorStateManager.update_state_for_streamand an empty state message is emitted through the message repository.Review & Testing Checklist for Human
full_refresh_streamis constructed after clearing state (line 3605-3610 in factory), so its cursor starts fromstart_dateinstead of the old stale cursor position. This is the core bug fix.{NO_CURSOR_STATE_KEY: True}(completed full refresh), verify the stream correctly uses incremental sync instead of falling back to full refresh again (test:test_no_cursor_state_key_uses_incremental_not_full_refresh).StateDelegatingStreamis used as a parent stream (not in the configured catalog), verify it does not emit state messages that would cause "Stream not found" errors in the destination (test:test_unconfigured_parent_stream_does_not_emit_state_on_retention_fallback).Recommended test plan:
api_retention_period: P30D, set a cursor state older than 30 days, and verify it falls back to full refresh with the expected warning message and that the entity endpoint is called withcreated[gte]=<start_date>(not the old cursor)NO_CURSOR_STATE_KEY: Truestate (e.g., after a completed full refresh) to verify it correctly uses incremental sync without falling back to full refreshIncrementingCountCursorto confirm it raisesValueErrorduringdiscover()with a clear error messageNotes
Fixes: airbytehq/oncall#11103
Link to Devin run: https://app.devin.ai/sessions/782b85a317204e3c833c3ecc3bc02f1e
Previous Devin sessions: https://app.devin.ai/sessions/c6b25a1216c547139ef8242062f7f135, https://app.devin.ai/sessions/443d1d1a2f524eb1bc378aa48f24aa9c, https://app.devin.ai/sessions/5e2b4ff66c50407789e500d6e25e5d5c
Requested by: Alfredo Garcia (@agarctfi)