From f2b612160a69b1a7be978d68235731df20929ac5 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 19 Mar 2025 13:48:15 -0700 Subject: [PATCH 1/4] update to enable config configurability --- .../declarative/async_job/job_tracker.py | 28 +++++++++++++++---- .../declarative_component_schema.yaml | 11 ++++++-- .../models/declarative_component_schema.py | 6 ++-- .../declarative/async_job/test_job_tracker.py | 20 +++++++++++++ 4 files changed, 54 insertions(+), 11 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index 62d4feab9..86b03afb2 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -3,9 +3,11 @@ import logging import threading import uuid -from typing import Set +from dataclasses import dataclass, field +from typing import Set, Union, Mapping, Any from airbyte_cdk.logger import lazy_log +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString LOGGER = logging.getLogger("airbyte") @@ -14,15 +16,29 @@ class ConcurrentJobLimitReached(Exception): pass +@dataclass class JobTracker: - def __init__(self, limit: int): + limit: Union[int, InterpolatedString] + config: Mapping[str, Any] = field(default_factory=dict) + + def __post_init__(self): self._jobs: Set[str] = set() - if limit < 1: + self._lock = threading.Lock() + if isinstance(self.limit, str): + try: + self.limit = int( + InterpolatedString(self.limit, parameters={}).eval(config=self.config) + ) + except Exception as e: + LOGGER.warning( + f"Error interpolating max job count: {self.limit}. Setting to 1. {e}" + ) + self.limit = 1 + if self.limit < 1: LOGGER.warning( - f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value." + f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value." ) - self._limit = 1 if limit < 1 else limit - self._lock = threading.Lock() + self._limit = self.limit if self.limit >= 1 else 1 def try_to_get_intent(self) -> str: lazy_log( diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 967a71ccd..e3e0425a5 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -47,7 +47,12 @@ properties: max_concurrent_async_job_count: title: Maximum Concurrent Asynchronous Jobs description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information. - type: integer + type: + - integer + - string + examples: + - 3 + - "{{ config['max_concurrent_async_job_count'] }}" metadata: type: object description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata. @@ -2894,7 +2899,7 @@ definitions: title: Lazy Read Pointer description: If set, this will enable lazy reading, using the initial read of parent records to extract child records. type: array - default: [ ] + default: [] items: - type: string interpolation_context: @@ -3199,7 +3204,7 @@ definitions: properties: type: type: string - enum: [ StateDelegatingStream ] + enum: [StateDelegatingStream] name: title: Name description: The stream name. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 480fa51c7..c7331d2c6 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1890,9 +1890,10 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None - max_concurrent_async_job_count: Optional[int] = Field( + max_concurrent_async_job_count: Optional[Union[int, str]] = Field( None, description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", + examples=[3, "{{ config['max_concurrent_async_job_count'] }}"], title="Maximum Concurrent Asynchronous Jobs", ) metadata: Optional[Dict[str, Any]] = Field( @@ -1922,9 +1923,10 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None - max_concurrent_async_job_count: Optional[int] = Field( + max_concurrent_async_job_count: Optional[Union[int, str]] = Field( None, description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", + examples=[3, "{{ config['max_concurrent_async_job_count'] }}"], title="Maximum Concurrent Asynchronous Jobs", ) metadata: Optional[Dict[str, Any]] = Field( diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index e20304621..30bc45ea3 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -45,3 +45,23 @@ def _reach_limit(self) -> List[str]: def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int): tracker = JobTracker(limit) assert tracker._limit == 1 + + +@pytest.mark.parametrize( + ("limit", "config"), + [ + ("2", {}), + ("{{ config['max_concurrent_async_job_count'] }}", {"max_concurrent_async_job_count": 2}), + ], +) +def test_given_limit_is_string_when_init_then_set_to_1(limit, config): + tracker = JobTracker(limit, config) + assert tracker._limit == 1 + + +def test_given_interpolated_limit_and_empty_config_when_init_then_set_to_1(): + tracker = JobTracker( + "{{ config['max_concurrent_async_job_count'] }}", + {"max_concurrent_async_job_count": "hello"}, + ) + assert tracker._limit == 1 From 44775386256ad31be90b61d7f5109618b93b8eb1 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 19 Mar 2025 13:53:45 -0700 Subject: [PATCH 2/4] chore: format/type-check/lint --- airbyte_cdk/sources/declarative/async_job/job_tracker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index 86b03afb2..7d0ebba75 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -4,7 +4,7 @@ import threading import uuid from dataclasses import dataclass, field -from typing import Set, Union, Mapping, Any +from typing import Any, Mapping, Set, Union from airbyte_cdk.logger import lazy_log from airbyte_cdk.sources.declarative.interpolation import InterpolatedString @@ -18,10 +18,10 @@ class ConcurrentJobLimitReached(Exception): @dataclass class JobTracker: - limit: Union[int, InterpolatedString] + limit: Union[int, str] config: Mapping[str, Any] = field(default_factory=dict) - def __post_init__(self): + def __post_init__(self) -> None: self._jobs: Set[str] = set() self._lock = threading.Lock() if isinstance(self.limit, str): From cc8ea411aa20a9f7298eacd2e3aa31ff2746da6d Mon Sep 17 00:00:00 2001 From: Patrick Nilan Date: Wed, 19 Mar 2025 13:58:14 -0700 Subject: [PATCH 3/4] Update unit_tests/sources/declarative/async_job/test_job_tracker.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .../sources/declarative/async_job/test_job_tracker.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index 30bc45ea3..ff434b008 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -48,15 +48,15 @@ def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int): @pytest.mark.parametrize( - ("limit", "config"), + ("limit", "config", "expected_limit"), [ - ("2", {}), - ("{{ config['max_concurrent_async_job_count'] }}", {"max_concurrent_async_job_count": 2}), + ("2", {}, 2), + ("{{ config['max_concurrent_async_job_count'] }}", {"max_concurrent_async_job_count": 2}, 2), ], ) -def test_given_limit_is_string_when_init_then_set_to_1(limit, config): +def test_given_limit_as_string_when_init_then_interpolate_correctly(limit, config, expected_limit): tracker = JobTracker(limit, config) - assert tracker._limit == 1 + assert tracker._limit == expected_limit def test_given_interpolated_limit_and_empty_config_when_init_then_set_to_1(): From c12df2cdde8d37268ffad82f65263eb20e4aff49 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 19 Mar 2025 14:06:37 -0700 Subject: [PATCH 4/4] chore: format --- .../sources/declarative/async_job/test_job_tracker.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index ff434b008..4ddba1112 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -51,7 +51,11 @@ def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int): ("limit", "config", "expected_limit"), [ ("2", {}, 2), - ("{{ config['max_concurrent_async_job_count'] }}", {"max_concurrent_async_job_count": 2}, 2), + ( + "{{ config['max_concurrent_async_job_count'] }}", + {"max_concurrent_async_job_count": 2}, + 2, + ), ], ) def test_given_limit_as_string_when_init_then_interpolate_correctly(limit, config, expected_limit):