diff --git a/airbyte_cdk/sources/declarative/async_job/job_tracker.py b/airbyte_cdk/sources/declarative/async_job/job_tracker.py index 62d4feab9..7d0ebba75 100644 --- a/airbyte_cdk/sources/declarative/async_job/job_tracker.py +++ b/airbyte_cdk/sources/declarative/async_job/job_tracker.py @@ -3,9 +3,11 @@ import logging import threading import uuid -from typing import Set +from dataclasses import dataclass, field +from typing import Any, Mapping, Set, Union from airbyte_cdk.logger import lazy_log +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString LOGGER = logging.getLogger("airbyte") @@ -14,15 +16,29 @@ class ConcurrentJobLimitReached(Exception): pass +@dataclass class JobTracker: - def __init__(self, limit: int): + limit: Union[int, str] + config: Mapping[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: self._jobs: Set[str] = set() - if limit < 1: + self._lock = threading.Lock() + if isinstance(self.limit, str): + try: + self.limit = int( + InterpolatedString(self.limit, parameters={}).eval(config=self.config) + ) + except Exception as e: + LOGGER.warning( + f"Error interpolating max job count: {self.limit}. Setting to 1. {e}" + ) + self.limit = 1 + if self.limit < 1: LOGGER.warning( - f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value." + f"The `max_concurrent_async_job_count` property is less than 1: {self.limit}. Setting to 1. Please update the source manifest to set a valid value." ) - self._limit = 1 if limit < 1 else limit - self._lock = threading.Lock() + self._limit = self.limit if self.limit >= 1 else 1 def try_to_get_intent(self) -> str: lazy_log( diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 967a71ccd..e3e0425a5 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -47,7 +47,12 @@ properties: max_concurrent_async_job_count: title: Maximum Concurrent Asynchronous Jobs description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information. - type: integer + type: + - integer + - string + examples: + - 3 + - "{{ config['max_concurrent_async_job_count'] }}" metadata: type: object description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata. @@ -2894,7 +2899,7 @@ definitions: title: Lazy Read Pointer description: If set, this will enable lazy reading, using the initial read of parent records to extract child records. type: array - default: [ ] + default: [] items: - type: string interpolation_context: @@ -3199,7 +3204,7 @@ definitions: properties: type: type: string - enum: [ StateDelegatingStream ] + enum: [StateDelegatingStream] name: title: Name description: The stream name. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 480fa51c7..c7331d2c6 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1890,9 +1890,10 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None - max_concurrent_async_job_count: Optional[int] = Field( + max_concurrent_async_job_count: Optional[Union[int, str]] = Field( None, description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", + examples=[3, "{{ config['max_concurrent_async_job_count'] }}"], title="Maximum Concurrent Asynchronous Jobs", ) metadata: Optional[Dict[str, Any]] = Field( @@ -1922,9 +1923,10 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None - max_concurrent_async_job_count: Optional[int] = Field( + max_concurrent_async_job_count: Optional[Union[int, str]] = Field( None, description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", + examples=[3, "{{ config['max_concurrent_async_job_count'] }}"], title="Maximum Concurrent Asynchronous Jobs", ) metadata: Optional[Dict[str, Any]] = Field( diff --git a/unit_tests/sources/declarative/async_job/test_job_tracker.py b/unit_tests/sources/declarative/async_job/test_job_tracker.py index e20304621..4ddba1112 100644 --- a/unit_tests/sources/declarative/async_job/test_job_tracker.py +++ b/unit_tests/sources/declarative/async_job/test_job_tracker.py @@ -45,3 +45,27 @@ def _reach_limit(self) -> List[str]: def test_given_limit_is_less_than_1_when_init_then_set_to_1(limit: int): tracker = JobTracker(limit) assert tracker._limit == 1 + + +@pytest.mark.parametrize( + ("limit", "config", "expected_limit"), + [ + ("2", {}, 2), + ( + "{{ config['max_concurrent_async_job_count'] }}", + {"max_concurrent_async_job_count": 2}, + 2, + ), + ], +) +def test_given_limit_as_string_when_init_then_interpolate_correctly(limit, config, expected_limit): + tracker = JobTracker(limit, config) + assert tracker._limit == expected_limit + + +def test_given_interpolated_limit_and_empty_config_when_init_then_set_to_1(): + tracker = JobTracker( + "{{ config['max_concurrent_async_job_count'] }}", + {"max_concurrent_async_job_count": "hello"}, + ) + assert tracker._limit == 1