Skip to content

feat(cdk): Add cursor age validation to StateDelegatingStream#890

Merged
Alfredo Garcia (agarctfi) merged 54 commits intomainfrom
devin/1770066385-state-delegating-stream-cursor-age-validation
Mar 12, 2026
Merged

feat(cdk): Add cursor age validation to StateDelegatingStream#890
Alfredo Garcia (agarctfi) merged 54 commits intomainfrom
devin/1770066385-state-delegating-stream-cursor-age-validation

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration bot commented Feb 2, 2026

Summary

Adds an optional api_retention_period field to StateDelegatingStream that validates whether a cursor is within an API's data retention window before using incremental sync. When the cursor is older than the retention period, the connector automatically falls back to full refresh to avoid data loss.

This addresses the issue where APIs like Stripe Events only retain data for 30 days - if a sync fails mid-way and resumes after the retention window, incremental sync would miss data.

Key changes:

  • Added api_retention_period field to StateDelegatingStream schema (ISO8601 duration format: P30D, P1D, PT1H, etc.)
  • Added get_cursor_datetime_from_state to concurrent cursor hierarchy (Cursor base, ConcurrentCursor, ConcurrentPerPartitionCursor, FinalStateCursor)
  • Raises ValueError at model level when IncrementingCountCursor is used with api_retention_period
  • Emits warning log when falling back to full refresh due to stale cursor
  • Clears stream state and emits an empty state message to the platform when falling back to full refresh, so the platform does not retain stale state across sync attempts
  • No breaking changes — the field is optional with no default

Example usage:

type: StateDelegatingStream
name: events
api_retention_period: P30D  # 30 days
full_refresh_stream: ...
incremental_stream: ...

Updates since last revision

Latest changes (fix temporal ordering of state clearing — per Alfredo Garcia (@agarctfi)'s review):

Previously, full_refresh_stream was constructed before clearing state, so its cursor inherited the old stale cursor position — making the "full refresh" fallback start from the stale cursor instead of start_date. This fix restructures create_state_delegating_stream so that:

  1. Only the incremental_stream cursor is used for the retention check (no need to construct full_refresh_stream first)
  2. State is cleared before constructing full_refresh_stream, so its cursor starts from start_date
  3. NO_CURSOR_STATE_KEY is checked directly on the raw state dict — if present, the previous sync was a completed full refresh and the stream uses incremental (not full refresh again)
  4. _is_cursor_older_than_retention_period simplified: removed full_refresh_cursor parameter, only uses incremental_cursor

Previous changes (removed early return for NO_CURSOR_STATE_KEY — per tolik0's feedback):

  • Removed the explicit early return for NO_CURSOR_STATE_KEY in _is_cursor_older_than_retention_period
  • FinalStateCursor.get_cursor_datetime_from_state now handles this case by returning now() for NO_CURSOR_STATE_KEY: True state, which is always within any retention period
  • This simplifies the code by letting the cursor class handle its own state format

Earlier changes (state clearing on fallback to full refresh — per brianjlai):

  • When cursor age validation triggers a fallback to full refresh, the stream's state is now cleared via ConnectorStateManager.update_state_for_stream and an empty state message is emitted through the message repository.

Review & Testing Checklist for Human

  • Verify the temporal ordering fix — Confirm that full_refresh_stream is constructed after clearing state (line 3605-3610 in factory), so its cursor starts from start_date instead of the old stale cursor position. This is the core bug fix.
  • Verify NO_CURSOR_STATE_KEY behavior — When state contains {NO_CURSOR_STATE_KEY: True} (completed full refresh), verify the stream correctly uses incremental sync instead of falling back to full refresh again (test: test_no_cursor_state_key_uses_incremental_not_full_refresh).
  • Verify parent stream edge case — When a StateDelegatingStream is used as a parent stream (not in the configured catalog), verify it does not emit state messages that would cause "Stream not found" errors in the destination (test: test_unconfigured_parent_stream_does_not_emit_state_on_retention_fallback).
  • Verify state-clearing message ordering — The empty state message is emitted during stream creation (in the factory) before the read starts. Verify that the message repository correctly delivers this empty state message to the platform before any record messages.

Recommended test plan:

  1. Apply this change to the Stripe connector's events stream with api_retention_period: P30D, set a cursor state older than 30 days, and verify it falls back to full refresh with the expected warning message and that the entity endpoint is called with created[gte]=<start_date> (not the old cursor)
  2. Test with NO_CURSOR_STATE_KEY: True state (e.g., after a completed full refresh) to verify it correctly uses incremental sync without falling back to full refresh
  3. Test with IncrementingCountCursor to confirm it raises ValueError during discover() with a clear error message

Notes

Fixes: airbytehq/oncall#11103

Link to Devin run: https://app.devin.ai/sessions/782b85a317204e3c833c3ecc3bc02f1e
Previous Devin sessions: https://app.devin.ai/sessions/c6b25a1216c547139ef8242062f7f135, https://app.devin.ai/sessions/443d1d1a2f524eb1bc378aa48f24aa9c, https://app.devin.ai/sessions/5e2b4ff66c50407789e500d6e25e5d5c
Requested by: Alfredo Garcia (@agarctfi)

This adds an optional api_retention_period field to StateDelegatingStream
that validates whether a cursor is within an API's data retention window
before switching from full refresh to incremental sync.

When the cursor value is older than the retention period, the connector
automatically falls back to a full refresh to avoid data loss. This is
useful for APIs like Stripe Events API which only retain data for 30 days.

Key changes:
- Add api_retention_period field to StateDelegatingStream schema (ISO8601 duration)
- Implement cursor age validation in model_to_component_factory.py
- Emit warning log when falling back to full refresh due to stale cursor
- Add unit tests for cursor age validation

Fixes: airbytehq/oncall#11103
Co-Authored-By: unknown <>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 3, 2026

PyTest Results (Fast)

3 906 tests  +51   3 894 ✅ +51   7m 1s ⏱️ +34s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 99119a9. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Feb 3, 2026

PyTest Results (Full)

3 909 tests  +51   3 897 ✅ +51   10m 45s ⏱️ -16s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit 99119a9. ± Comparison against base commit 15542de.

♻️ This comment has been updated with latest results.

@agarctfi
Copy link
Copy Markdown
Contributor

Alfredo Garcia (agarctfi) commented Feb 3, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 2 commits February 3, 2026 16:03
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
@agarctfi Alfredo Garcia (agarctfi) marked this pull request as ready for review February 3, 2026 17:34
Copilot AI review requested due to automatic review settings February 3, 2026 17:34
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds cursor age validation to StateDelegatingStream to automatically fall back to full refresh when a cursor is older than an API's data retention period. This prevents data loss scenarios where APIs (like Stripe Events) only retain data for a limited time window (e.g., 30 days), and a sync resumes after that window has passed.

Changes:

  • Added optional api_retention_period field to StateDelegatingStream schema (ISO8601 duration format)
  • Implemented cursor age validation logic that compares cursor datetime against retention cutoff
  • Added warning log emission when falling back to full refresh due to stale cursor

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added api_retention_period field definition with ISO8601 duration examples
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Auto-generated model updates reflecting the new field in StateDelegatingStream
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Core implementation of cursor age validation with three new helper methods
unit_tests/sources/declarative/test_state_delegating_stream.py Added three test cases covering cursor age validation scenarios (too old, within retention, edge case)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Fix YAML bullet point indentation for consistency (Comment 8)
- Add type guard for cursor_value to handle unexpected types (Comment 9)
- Add test for warning log emission when cursor is too old (Comment 10)

Co-Authored-By: unknown <>
devin-ai-integration bot and others added 3 commits February 3, 2026 18:24
Co-Authored-By: unknown <>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
…_model

Addresses review comment from tolik0 - the incremental_sync check is now
performed in the calling method before invoking _is_cursor_older_than_retention_period.

Co-Authored-By: unknown <>
Addresses review comment from tolik0 - if the cursor value or format is
incorrect, we should use full_refresh_stream instead, as it indicates
that the stream_state is unusable.

Co-Authored-By: unknown <>
Address tolik0's review comment: During the first sync, the state will be
produced by full_refresh_stream, and during subsequent syncs, by
incremental_stream. We need to correctly parse the state for both cases.

Changes:
- Extract incremental_sync from both full_refresh_stream and incremental_stream
- Update _is_cursor_older_than_retention_period to accept list of sources
- Update _parse_cursor_datetime to collect and try formats from all sources

Co-Authored-By: unknown <>
When cursor age validation detects a stale cursor on a StateDelegatingStream,
it clears the stream state and emits an empty state message. However, if the
stream is not in the user's configured catalog (e.g. a parent stream created
as a dependency), the destination does not know about it and crashes with
'Stream not found'.

This fix checks whether the stream is in the configured catalog before emitting
the state-clearing message. If no catalog is provided (e.g. during discover),
state is emitted as before for backward compatibility.

Co-Authored-By: unknown <>
…unconfigured streams

Instead of running cursor age validation and then suppressing just the state
message, skip the entire api_retention_period block for streams not in the
configured catalog. This avoids unnecessary work (creating both stream
components, comparing cursor age) for parent-stream dependencies that the
destination doesn't know about.

Co-Authored-By: unknown <>
…sor fallback

When cursor age validation detects a stale cursor, the state must be
cleared before constructing the full_refresh_stream so its cursor starts
from start_date instead of inheriting the old stale cursor position.

Changes:
- Reorder create_state_delegating_stream to clear state before stream construction
- Simplify _is_cursor_older_than_retention_period to only use incremental_cursor
- Handle NO_CURSOR_STATE_KEY directly on raw state dict (completed full refresh = current)
- Update test to verify NO_CURSOR_STATE_KEY uses incremental (not full refresh again)

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
devin-ai-integration bot added a commit to airbytehq/airbyte that referenced this pull request Mar 2, 2026
Bumps the source-declarative-manifest base image to pick up the
StateDelegatingStream cursor age validation fix from
airbytehq/airbyte-python-cdk#890.

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
devin-ai-integration bot added a commit to airbytehq/airbyte that referenced this pull request Mar 3, 2026
Bumps the source-declarative-manifest base image to pick up the
StateDelegatingStream cursor age validation fix from
airbytehq/airbyte-python-cdk#890. Bumps dockerImageTag to 5.15.21-rc.3.

Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
@agarctfi Alfredo Garcia (agarctfi) merged commit 0e57414 into main Mar 12, 2026
32 checks passed
@agarctfi Alfredo Garcia (agarctfi) deleted the devin/1770066385-state-delegating-stream-cursor-age-validation branch March 12, 2026 17:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants