Skip to content

Commit 31f7216

Browse files
authored
Merge branch 'main' into devin/1763137629-excel-parser-openpyxl-fallback
2 parents d2b0255 + a67a6cf commit 31f7216

10 files changed

Lines changed: 327 additions & 35 deletions

File tree

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
name: Label Community PRs
2+
3+
# This workflow automatically adds the "community" label to PRs from forks.
4+
# This enables automatic tracking on the Community PRs project board.
5+
6+
on:
7+
pull_request_target:
8+
types:
9+
- opened
10+
- reopened
11+
12+
jobs:
13+
label-community-pr:
14+
name: Add "Community" Label to PR
15+
# Only run for PRs from forks
16+
if: github.event.pull_request.head.repo.fork == true
17+
runs-on: ubuntu-24.04
18+
permissions:
19+
issues: write
20+
pull-requests: write
21+
steps:
22+
- name: Add community label
23+
# This action uses GitHub's addLabels API, which is idempotent.
24+
# If the label already exists, the API call succeeds without error.
25+
uses: actions-ecosystem/action-add-labels@bd52874380e3909a1ac983768df6976535ece7f8 # v1.1.3
26+
with:
27+
github_token: ${{ secrets.GITHUB_TOKEN }}
28+
labels: community

.github/workflows/release_drafter.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ on:
1111
# pull_request_target:
1212
# types: [opened, reopened, synchronize]
1313

14+
concurrency:
15+
group: release-drafter-${{ github.ref }}
16+
cancel-in-progress: false
17+
1418
jobs:
1519
update_release_draft:
1620
permissions:
@@ -19,7 +23,8 @@ jobs:
1923
runs-on: ubuntu-24.04
2024
steps:
2125
# Drafts the next Release notes as Pull Requests are merged into "main"
22-
- uses: release-drafter/release-drafter@v6
26+
# Pinned to v6.0.0 to avoid v6.1.0 bug: https://github.com/release-drafter/release-drafter/issues/1425
27+
- uses: release-drafter/release-drafter@v6.0.0
2328
with:
2429
config-name: release-drafter.yml
2530
env:

airbyte_cdk/legacy/sources/declarative/manifest_declarative_source.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -341,29 +341,41 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea
341341
def _initialize_cache_for_parent_streams(
342342
stream_configs: List[Dict[str, Any]],
343343
) -> List[Dict[str, Any]]:
344+
"""Enable caching for parent streams unless explicitly disabled.
345+
346+
Caching is enabled by default for parent streams to optimize performance when the same
347+
parent data is needed by multiple child streams. However, explicit `use_cache: false`
348+
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
349+
APIs where caching causes duplicate records).
350+
"""
344351
parent_streams = set()
345352

353+
def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
354+
"""Set use_cache to True only if not explicitly disabled."""
355+
if requester.get("use_cache") is not False:
356+
requester["use_cache"] = True
357+
346358
def update_with_cache_parent_configs(
347359
parent_configs: list[dict[str, Any]],
348360
) -> None:
349361
for parent_config in parent_configs:
350362
parent_streams.add(parent_config["stream"]["name"])
351363
if parent_config["stream"]["type"] == "StateDelegatingStream":
352-
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
353-
"use_cache"
354-
] = True
355-
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
356-
"use_cache"
357-
] = True
364+
_set_cache_if_not_disabled(
365+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
366+
)
367+
_set_cache_if_not_disabled(
368+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
369+
)
358370
else:
359-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
371+
_set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
360372

361373
for stream_config in stream_configs:
362374
if stream_config.get("incremental_sync", {}).get("parent_stream"):
363375
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
364-
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
365-
"use_cache"
366-
] = True
376+
_set_cache_if_not_disabled(
377+
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
378+
)
367379

368380
elif stream_config.get("retriever", {}).get("partition_router", {}):
369381
partition_router = stream_config["retriever"]["partition_router"]
@@ -380,14 +392,14 @@ def update_with_cache_parent_configs(
380392
for stream_config in stream_configs:
381393
if stream_config["name"] in parent_streams:
382394
if stream_config["type"] == "StateDelegatingStream":
383-
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
384-
True
395+
_set_cache_if_not_disabled(
396+
stream_config["full_refresh_stream"]["retriever"]["requester"]
385397
)
386-
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
387-
True
398+
_set_cache_if_not_disabled(
399+
stream_config["incremental_stream"]["retriever"]["requester"]
388400
)
389401
else:
390-
stream_config["retriever"]["requester"]["use_cache"] = True
402+
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
391403
return stream_configs
392404

393405
def spec(self, logger: logging.Logger) -> ConnectorSpecification:

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -424,29 +424,41 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
424424
def _initialize_cache_for_parent_streams(
425425
stream_configs: List[Dict[str, Any]],
426426
) -> List[Dict[str, Any]]:
427+
"""Enable caching for parent streams unless explicitly disabled.
428+
429+
Caching is enabled by default for parent streams to optimize performance when the same
430+
parent data is needed by multiple child streams. However, explicit `use_cache: false`
431+
settings are respected for streams that cannot use caching (e.g., scroll-based pagination
432+
APIs where caching causes duplicate records).
433+
"""
427434
parent_streams = set()
428435

436+
def _set_cache_if_not_disabled(requester: Dict[str, Any]) -> None:
437+
"""Set use_cache to True only if not explicitly disabled."""
438+
if requester.get("use_cache") is not False:
439+
requester["use_cache"] = True
440+
429441
def update_with_cache_parent_configs(
430442
parent_configs: list[dict[str, Any]],
431443
) -> None:
432444
for parent_config in parent_configs:
433445
parent_streams.add(parent_config["stream"]["name"])
434446
if parent_config["stream"]["type"] == "StateDelegatingStream":
435-
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
436-
"use_cache"
437-
] = True
438-
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
439-
"use_cache"
440-
] = True
447+
_set_cache_if_not_disabled(
448+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"]
449+
)
450+
_set_cache_if_not_disabled(
451+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"]
452+
)
441453
else:
442-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
454+
_set_cache_if_not_disabled(parent_config["stream"]["retriever"]["requester"])
443455

444456
for stream_config in stream_configs:
445457
if stream_config.get("incremental_sync", {}).get("parent_stream"):
446458
parent_streams.add(stream_config["incremental_sync"]["parent_stream"]["name"])
447-
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"][
448-
"use_cache"
449-
] = True
459+
_set_cache_if_not_disabled(
460+
stream_config["incremental_sync"]["parent_stream"]["retriever"]["requester"]
461+
)
450462

451463
elif stream_config.get("retriever", {}).get("partition_router", {}):
452464
partition_router = stream_config["retriever"]["partition_router"]
@@ -463,14 +475,14 @@ def update_with_cache_parent_configs(
463475
for stream_config in stream_configs:
464476
if stream_config["name"] in parent_streams:
465477
if stream_config["type"] == "StateDelegatingStream":
466-
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
467-
True
478+
_set_cache_if_not_disabled(
479+
stream_config["full_refresh_stream"]["retriever"]["requester"]
468480
)
469-
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
470-
True
481+
_set_cache_if_not_disabled(
482+
stream_config["incremental_stream"]["retriever"]["requester"]
471483
)
472484
else:
473-
stream_config["retriever"]["requester"]["use_cache"] = True
485+
_set_cache_if_not_disabled(stream_config["retriever"]["requester"])
474486
return stream_configs
475487

476488
def spec(self, logger: logging.Logger) -> ConnectorSpecification:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2286,7 +2286,7 @@ definitions:
22862286
- "$ref": "#/definitions/CustomErrorHandler"
22872287
use_cache:
22882288
title: Use Cache
2289-
description: Enables stream requests caching. This field is automatically set by the CDK.
2289+
description: Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).
22902290
type: boolean
22912291
default: false
22922292
$parameters:

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4339,8 +4339,14 @@ def _get_catalog_defined_cursor_field(
43394339
configured_stream = self._stream_name_to_configured_stream.get(stream_name)
43404340

43414341
# Depending on the operation is being performed, there may not be a configured stream yet. In this
4342-
# case we return None which will then use the default cursor field defined on the cursor model
4343-
if not configured_stream or not configured_stream.cursor_field:
4342+
# case we return None which will then use the default cursor field defined on the cursor model.
4343+
# We also treat cursor_field: [""] (list with empty string) as no cursor field, since this can
4344+
# occur when the platform serializes "no cursor configured" streams incorrectly.
4345+
if (
4346+
not configured_stream
4347+
or not configured_stream.cursor_field
4348+
or not configured_stream.cursor_field[0]
4349+
):
43444350
return None
43454351
elif len(configured_stream.cursor_field) > 1:
43464352
raise ValueError(

airbyte_cdk/sources/streams/concurrent/helpers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ def get_cursor_field_from_stream(stream: Stream) -> Optional[str]:
3434
raise ValueError(
3535
f"Nested cursor fields are not supported. Got {stream.cursor_field} for {stream.name}"
3636
)
37-
elif len(stream.cursor_field) == 0:
37+
elif len(stream.cursor_field) == 0 or not stream.cursor_field[0]:
38+
# Treat cursor_field: [""] (list with empty string) as no cursor field
3839
return None
3940
else:
4041
return stream.cursor_field[0]

unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,112 @@ def test_only_parent_streams_use_cache():
16521652
assert not get_retriever(streams[2]).requester.use_cache
16531653

16541654

1655+
def test_parent_stream_respects_explicit_use_cache_false():
1656+
"""Test that explicit use_cache: false is respected for parent streams.
1657+
1658+
This is important for APIs that use scroll-based pagination (like Intercom's /companies/scroll
1659+
endpoint), where caching must be disabled because the same scroll_param is returned in
1660+
pagination responses, causing duplicate records and infinite pagination loops.
1661+
"""
1662+
# Parent stream with explicit use_cache: false
1663+
companies_stream = {
1664+
"type": "DeclarativeStream",
1665+
"$parameters": {
1666+
"name": "companies",
1667+
"primary_key": "id",
1668+
"url_base": "https://api.intercom.io/",
1669+
},
1670+
"schema_loader": {
1671+
"name": "{{ parameters.stream_name }}",
1672+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
1673+
},
1674+
"retriever": {
1675+
"paginator": {
1676+
"type": "DefaultPaginator",
1677+
"page_token_option": {"type": "RequestPath"},
1678+
"pagination_strategy": {
1679+
"type": "CursorPagination",
1680+
"cursor_value": "{{ response.get('scroll_param') }}",
1681+
"page_size": 100,
1682+
},
1683+
},
1684+
"requester": {
1685+
"path": "companies/scroll",
1686+
"use_cache": False, # Explicitly disabled for scroll-based pagination
1687+
"authenticator": {
1688+
"type": "BearerAuthenticator",
1689+
"api_token": "{{ config['api_key'] }}",
1690+
},
1691+
},
1692+
"record_selector": {"extractor": {"type": "DpathExtractor", "field_path": ["data"]}},
1693+
},
1694+
}
1695+
1696+
manifest = {
1697+
"version": "0.29.3",
1698+
"definitions": {},
1699+
"streams": [
1700+
deepcopy(companies_stream),
1701+
{
1702+
"type": "DeclarativeStream",
1703+
"$parameters": {
1704+
"name": "company_segments",
1705+
"primary_key": "id",
1706+
"url_base": "https://api.intercom.io/",
1707+
},
1708+
"schema_loader": {
1709+
"name": "{{ parameters.stream_name }}",
1710+
"file_path": "./source_intercom/schemas/{{ parameters.name }}.yaml",
1711+
},
1712+
"retriever": {
1713+
"paginator": {"type": "NoPagination"},
1714+
"requester": {
1715+
"path": "companies/{{ stream_partition.parent_id }}/segments",
1716+
"authenticator": {
1717+
"type": "BearerAuthenticator",
1718+
"api_token": "{{ config['api_key'] }}",
1719+
},
1720+
},
1721+
"record_selector": {
1722+
"extractor": {"type": "DpathExtractor", "field_path": ["data"]}
1723+
},
1724+
"partition_router": {
1725+
"parent_stream_configs": [
1726+
{
1727+
"parent_key": "id",
1728+
"partition_field": "parent_id",
1729+
"stream": deepcopy(companies_stream),
1730+
}
1731+
],
1732+
"type": "SubstreamPartitionRouter",
1733+
},
1734+
},
1735+
},
1736+
],
1737+
"check": {"type": "CheckStream", "stream_names": ["companies"]},
1738+
}
1739+
source = ManifestDeclarativeSource(source_config=manifest)
1740+
1741+
streams = source.streams({})
1742+
assert len(streams) == 2
1743+
1744+
# Main stream with explicit use_cache: false should remain false (parent for substream)
1745+
assert streams[0].name == "companies"
1746+
# use_cache should remain False because it was explicitly set to False
1747+
assert not get_retriever(streams[0]).requester.use_cache
1748+
1749+
# Substream
1750+
assert streams[1].name == "company_segments"
1751+
1752+
# Parent stream created for substream should also respect use_cache: false
1753+
stream_slicer = streams[1]._stream_partition_generator._stream_slicer
1754+
assert stream_slicer.parent_stream_configs[0].stream.name == "companies"
1755+
# The parent stream in the substream config should also have use_cache: false
1756+
assert not stream_slicer.parent_stream_configs[
1757+
0
1758+
].stream._stream_partition_generator._partition_factory._retriever.requester.use_cache
1759+
1760+
16551761
def _run_read(manifest: Mapping[str, Any], stream_name: str) -> List[AirbyteMessage]:
16561762
catalog = ConfiguredAirbyteCatalog(
16571763
streams=[

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5051,6 +5051,12 @@ def test_create_stream_with_multiple_schema_loaders():
50515051
"updated_at",
50525052
id="test_allow_catalog_defined_cursor_field_false_defaults_to_stream_defined_cursor_field",
50535053
),
5054+
pytest.param(
5055+
True,
5056+
"",
5057+
"updated_at",
5058+
id="test_empty_string_catalog_cursor_field_defaults_to_stream_defined_cursor_field",
5059+
),
50545060
],
50555061
)
50565062
def test_catalog_defined_cursor_field(

0 commit comments

Comments
 (0)