feat(cloud): add pagination for cloud list operations#1024
Conversation
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds PAGE_SIZE and pagination helpers; list_connections/workspaces/sources/destinations and get_job_logs now accept/validate limit, auto-paginate using PAGE_SIZE with client-side per-page filtering. Cloud wrappers forward limit; MCP tools adopt limit semantics. Unit tests validate paginated request sequences and limit behavior. ChangesPagination Support for List and Job Log Functions
Sequence DiagramsequenceDiagram
participant Caller
participant ListFunction as api_util.list_functions
participant PaginationUtils
participant API as AirbyteAPI
participant Result
Caller->>ListFunction: call with limit (or no limit)
ListFunction->>PaginationUtils: _validate_pagination_params(limit)
alt invalid limit
ListFunction->>Result: raise PyAirbyteInputError
else
loop while API indicates next_page and remaining>0
ListFunction->>PaginationUtils: _get_page_limit(remaining)
ListFunction->>API: request list endpoint with page_limit and current_offset
API-->>ListFunction: returns page (data + next_page)
ListFunction->>ListFunction: filter page items, accumulate matches, decrement remaining, advance offset
end
ListFunction->>Result: return accumulated results truncated to limit
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Want extra tests for very large limits or non-matching name_filters to ensure loops stop correctly and remaining logic is robust, wdyt? 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1779121915-cloud-pagination' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1779121915-cloud-pagination'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
list operations
Aaron ("AJ") Steers (aaronsteers)
left a comment
There was a problem hiding this comment.
Let's simplify and remove support for offset. So we'll support limit only in our Python API. Also add limit support on the relevant MCP tools if it does not already exist.
|
Addressed AJ's latest review request to simplify Cloud pagination to public Changes pushed in
Local validation passed:
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
airbyte/mcp/cloud.py (2)
1172-1205:⚠️ Potential issue | 🟠 Major | ⚡ Quick winCould we avoid forwarding
limitbefore the local connection filters run?
workspace.list_connections(limit=limit)truncates the candidate set beforename_containsandfailing_connections_onlyare applied, so this can miss later matching/failing connections and return fewer thanlimitresults. The existing break onlen(results) >= limitalready gives you the final cap once matching rows are appended. wdyt?Possible adjustment
workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) - connections = workspace.list_connections(limit=limit) + list_limit = None if name_contains or failing_connections_only else limit + connections = workspace.list_connections(limit=list_limit)Also applies to: 1264-1265
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte/mcp/cloud.py` around lines 1172 - 1205, The call workspace.list_connections(limit=limit) prematurely truncates candidates before local filters (name_contains, failing_connections_only) are applied; change it to fetch without the upstream limit (pass None or omit limit) and then apply the local filtering loop that inspects name_contains, with_connection_status and failing_connections_only, appending to results and breaking when len(results) >= limit (reusing the existing break logic); update both occurrences that pass limit through to workspace.list_connections to ensure the final cap is enforced only after local filters run.
812-827:⚠️ Potential issue | 🟠 Major | ⚡ Quick winCould we apply
limitafter the localname_containsfilter instead here?Passing
limitintoworkspace.list_sources()/workspace.list_destinations()before the substring filter can under-fill the response even when later items match. If the firstlimitrecords missname_contains, this returns too few results. wdyt?Possible adjustment
def list_deployed_cloud_source_connectors(...): workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) - sources = workspace.list_sources(limit=limit) + sources = workspace.list_sources(limit=None if name_contains else limit) if name_contains: needle = name_contains.lower() sources = [s for s in sources if s.name is not None and needle in s.name.lower()] + if limit is not None: + sources = sources[:limit] def list_deployed_cloud_destination_connectors(...): workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) - destinations = workspace.list_destinations(limit=limit) + destinations = workspace.list_destinations(limit=None if name_contains else limit) if name_contains: needle = name_contains.lower() destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] + if limit is not None: + destinations = destinations[:limit]Also applies to: 863-878
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte/mcp/cloud.py` around lines 812 - 827, The current code passes limit into workspace.list_sources(...) which can cause under-filled results when name_contains is applied afterwards; change the logic so list_sources(...) is called without the limit parameter, perform the name_contains filtering on the full list (using the existing needle check in the list comprehension), and only then apply the limit by slicing the filtered list (e.g., sources = sources[:limit] if limit is not None). Make the same change for the analogous destinations flow (workspace.list_destinations, the name_contains filter block) so filtering happens before truncation.airbyte/_util/api_util.py (1)
353-370:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRemove the
workspace_idsfilter to list all accessible workspaces, wdyt?Line 353 marks
workspace_idas intentionally unused, but line 368 passes it as a filter (workspace_ids=[workspace_id]). According to the Airbyte API, providingworkspace_idsrestricts results to only those exact workspace IDs. Since the function's docstring says "List workspaces" without qualification, this suggests it should return all accessible workspaces, not just the one provided. This contradiction means the pagination loop will only ever return a single workspace, defeating the purpose of the function.💡 Minimal fix
- _ = workspace_id # Not used (yet) airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, bearer_token=bearer_token, api_root=api_root, ) result: list[models.WorkspaceResponse] = [] current_offset = 0 remaining = limit base_context = {"workspace_id": workspace_id, "api_root": api_root} while remaining is None or remaining > 0: try: response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces( api.ListWorkspacesRequest( - workspace_ids=[workspace_id], offset=current_offset, limit=PAGE_SIZE, ), )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte/_util/api_util.py` around lines 353 - 370, The loop is incorrectly filtering results by passing workspace_ids=[workspace_id] to airbyte_instance.workspaces.list_workspaces (despite workspace_id being marked unused), so remove the workspace_ids parameter from the api.ListWorkspacesRequest call in that function (leave offset/current_offset and limit/PAGE_SIZE intact) so the request lists all accessible workspaces; also remove or update the dummy assignment "_ = workspace_id" (or drop the workspace_id param from the function signature) to avoid the unused-variable confusion.
🧹 Nitpick comments (2)
airbyte/cloud/workspaces.py (1)
761-837: ⚡ Quick winConsider enriching the docstrings for list methods, wdyt?
The implementation looks solid! The
limitparameter is cleanly forwarded toapi_util, and the list comprehensions are straightforward. However, the docstrings forlist_connections,list_sources, andlist_destinationsare quite minimal.For a public API, it might help users to understand:
- What happens when
limit=None(auto-pagination through all results)- How
limitinteracts withnameandname_filter(e.g., does filtering happen before or after limiting?)- Maybe a quick usage example?
This would make the pagination feature more discoverable and reduce potential confusion.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte/cloud/workspaces.py` around lines 761 - 837, Docstrings for list_connections, list_sources, and list_destinations are too minimal; update each method's docstring to explicitly state that limit=None triggers automatic pagination to return all results, that filtering (name and name_filter) is applied server-side/when calling api_util before limiting (i.e., filters narrow the result set then limit caps results), and show a short usage example demonstrating limit vs. no-limit; reference the methods list_connections, list_sources, list_destinations and the underlying api_util.list_connections/list_sources/list_destinations calls so readers know where pagination/filtering behavior comes from.tests/unit_tests/test_mcp_cloud.py (1)
118-141: ⚡ Quick winCould we add a filtered-path case here too?
This verifies that
limitis forwarded, but it would not catch the regression where early non-matching items cause the MCP tool to return fewer thanlimitresults. A case withname_contains(and one forfailing_connections_only) would pin that contract down. wdyt?🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit_tests/test_mcp_cloud.py` around lines 118 - 141, Add a filtered-path test so the MCP tool forwards `limit` when filtering is applied: extend or add a test alongside test_mcp_cloud_list_tools_pass_limit_to_workspace that calls the same `tool(...)` with `name_contains` (and a separate case with `failing_connections_only`) set so some workspace items are non-matching, assert workspace.limits[limit_key] == 1, and assert len(results) == 1 to ensure the tool returns `limit` matching items (use the same `_CloudWorkspace` stub and `monkeypatch` of cloud_mcp._get_cloud_workspace to control listed items).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@airbyte/_util/api_util.py`:
- Around line 353-370: The loop is incorrectly filtering results by passing
workspace_ids=[workspace_id] to airbyte_instance.workspaces.list_workspaces
(despite workspace_id being marked unused), so remove the workspace_ids
parameter from the api.ListWorkspacesRequest call in that function (leave
offset/current_offset and limit/PAGE_SIZE intact) so the request lists all
accessible workspaces; also remove or update the dummy assignment "_ =
workspace_id" (or drop the workspace_id param from the function signature) to
avoid the unused-variable confusion.
In `@airbyte/mcp/cloud.py`:
- Around line 1172-1205: The call workspace.list_connections(limit=limit)
prematurely truncates candidates before local filters (name_contains,
failing_connections_only) are applied; change it to fetch without the upstream
limit (pass None or omit limit) and then apply the local filtering loop that
inspects name_contains, with_connection_status and failing_connections_only,
appending to results and breaking when len(results) >= limit (reusing the
existing break logic); update both occurrences that pass limit through to
workspace.list_connections to ensure the final cap is enforced only after local
filters run.
- Around line 812-827: The current code passes limit into
workspace.list_sources(...) which can cause under-filled results when
name_contains is applied afterwards; change the logic so list_sources(...) is
called without the limit parameter, perform the name_contains filtering on the
full list (using the existing needle check in the list comprehension), and only
then apply the limit by slicing the filtered list (e.g., sources =
sources[:limit] if limit is not None). Make the same change for the analogous
destinations flow (workspace.list_destinations, the name_contains filter block)
so filtering happens before truncation.
---
Nitpick comments:
In `@airbyte/cloud/workspaces.py`:
- Around line 761-837: Docstrings for list_connections, list_sources, and
list_destinations are too minimal; update each method's docstring to explicitly
state that limit=None triggers automatic pagination to return all results, that
filtering (name and name_filter) is applied server-side/when calling api_util
before limiting (i.e., filters narrow the result set then limit caps results),
and show a short usage example demonstrating limit vs. no-limit; reference the
methods list_connections, list_sources, list_destinations and the underlying
api_util.list_connections/list_sources/list_destinations calls so readers know
where pagination/filtering behavior comes from.
In `@tests/unit_tests/test_mcp_cloud.py`:
- Around line 118-141: Add a filtered-path test so the MCP tool forwards `limit`
when filtering is applied: extend or add a test alongside
test_mcp_cloud_list_tools_pass_limit_to_workspace that calls the same
`tool(...)` with `name_contains` (and a separate case with
`failing_connections_only`) set so some workspace items are non-matching, assert
workspace.limits[limit_key] == 1, and assert len(results) == 1 to ensure the
tool returns `limit` matching items (use the same `_CloudWorkspace` stub and
`monkeypatch` of cloud_mcp._get_cloud_workspace to control listed items).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 6de2af50-6642-46fe-a500-a178eb4999fc
📒 Files selected for processing (6)
airbyte/_util/api_util.pyairbyte/cloud/connections.pyairbyte/cloud/workspaces.pyairbyte/mcp/cloud.pytests/unit_tests/test_cloud_api_util.pytests/unit_tests/test_mcp_cloud.py
💤 Files with no reviewable changes (1)
- airbyte/cloud/connections.py
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/unit_tests/test_cloud_api_util.py
Summary
Adds optional
limitandoffsetpagination support to PyAirbyte Cloud list operations for connections, workspaces, sources, and destinations, while preserving the existing auto-pagination behavior when no limit is provided.Also adds pagination for
get_job_logs()so sync history requests fetch additional pages when the requested limit exceeds one API page. The list operation implementation preserves exact empty-stringnamefiltering, keeps filtered list fetches at full API page size before applying client-side result limits, rejects non-positivelimitvalues, and distinguishes true job-log 404s from auth/service failures.Removed stale pagination TODOs from workspace list wrappers and added focused unit coverage for job log pagination plus resource-list pagination edge cases. Added docstrings for the new pagination test helpers/tests after CodeRabbit flagged docstring coverage.
Requested by AJ Steers from Slack thread: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1779121915926609?thread_ts=1779121915.926609&cid=C08BHPUMEPJ
Review & Testing Checklist for Human
limitandoffsetparameters onCloudWorkspace.list_connections(),list_sources(), andlist_destinations()match expected public API semantics.nameorname_filtershould continue scanning full API pages until enough matching results are found or the API is exhausted.limit=0should raisePyAirbyteInputError, whileoffset=0remains valid.get_job_logs()non-2xx handling matches caller expectations: 404 maps toAirbyteMissingResourceError, other non-2xx responses map toAirbyteError.Notes
Local validation completed:
uv run --project /home/ubuntu/repos/PyAirbyte ruff format --check /home/ubuntu/repos/PyAirbyte/airbyte/_util/api_util.py /home/ubuntu/repos/PyAirbyte/airbyte/cloud/workspaces.py /home/ubuntu/repos/PyAirbyte/tests/unit_tests/test_cloud_api_util.pyuv run --project /home/ubuntu/repos/PyAirbyte ruff check /home/ubuntu/repos/PyAirbyte/airbyte/_util/api_util.py /home/ubuntu/repos/PyAirbyte/airbyte/cloud/workspaces.py /home/ubuntu/repos/PyAirbyte/tests/unit_tests/test_cloud_api_util.pyuv run --project /home/ubuntu/repos/PyAirbyte ruff check /home/ubuntu/repos/PyAirbyte/tests/unit_tests/test_cloud_api_util.py --select Duv run --project /home/ubuntu/repos/PyAirbyte pyrefly check /home/ubuntu/repos/PyAirbyte/airbyte/_util/api_util.py /home/ubuntu/repos/PyAirbyte/airbyte/cloud/workspaces.py /home/ubuntu/repos/PyAirbyte/tests/unit_tests/test_cloud_api_util.py --output-format=jsonuv run --project /home/ubuntu/repos/PyAirbyte pytest /home/ubuntu/repos/PyAirbyte/tests/unit_tests/test_cloud_api_util.py -qlist_connections(limit=2), andget_previous_sync_logs(limit=1)completed successfully.Devin session
Requested by: Aaron ("AJ") Steers (@aaronsteers)
Summary by CodeRabbit
New Features
Improvements
Tests