From c74c37a8b4f4760eeae369d9fb1bcbefbc564a6f Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Fri, 7 Nov 2025 10:18:40 -0500 Subject: [PATCH 1/4] Support refresh_token_error on the OAuthAuthenticatorModel level and add sensible defaults --- airbyte_cdk/sources/declarative/auth/oauth.py | 16 ++++- .../declarative_component_schema.yaml | 31 ++++++++- .../models/declarative_component_schema.py | 26 ++++++-- .../parsers/model_to_component_factory.py | 64 ++++++++++++++++++- 4 files changed, 124 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/sources/declarative/auth/oauth.py b/airbyte_cdk/sources/declarative/auth/oauth.py index a6320a28f..7d889fdec 100644 --- a/airbyte_cdk/sources/declarative/auth/oauth.py +++ b/airbyte_cdk/sources/declarative/auth/oauth.py @@ -5,7 +5,7 @@ import logging from dataclasses import InitVar, dataclass, field from datetime import datetime, timedelta -from typing import Any, List, Mapping, Optional, Union +from typing import Any, List, Mapping, Optional, Tuple, Union from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean @@ -46,6 +46,9 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut refresh_request_headers (Optional[Mapping[str, Any]]): The request headers to send in the refresh request grant_type: The grant_type to request for access_token. If set to refresh_token, the refresh_token parameter has to be provided message_repository (MessageRepository): the message repository used to emit logs on HTTP requests + refresh_token_error_status_codes (Tuple[int, ...]): Status codes to identify refresh token errors in response + refresh_token_error_key (str): Key to identify refresh token error in response + refresh_token_error_values (Tuple[str, ...]): List of values to check for exception during token refresh process """ config: Mapping[str, Any] @@ -72,9 +75,18 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut message_repository: MessageRepository = NoopMessageRepository() profile_assertion: Optional[DeclarativeAuthenticator] = None use_profile_assertion: Optional[Union[InterpolatedBoolean, str, bool]] = False + refresh_token_error_status_codes: Tuple[int, ...] = () + refresh_token_error_key: str = "" + refresh_token_error_values: Tuple[str, ...] = () def __post_init__(self, parameters: Mapping[str, Any]) -> None: - super().__init__() + # Convert lists to tuples for parent class compatibility + + super().__init__( + refresh_token_error_status_codes=self.refresh_token_error_status_codes, + refresh_token_error_key=self.refresh_token_error_key, + refresh_token_error_values=self.refresh_token_error_values, + ) if self.token_refresh_endpoint is not None: self._token_refresh_endpoint: Optional[InterpolatedString] = InterpolatedString.create( self.token_refresh_endpoint, parameters=parameters diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index f5e9a8548..373f2494b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1427,6 +1427,31 @@ definitions: type: string examples: - "%Y-%m-%d %H:%M:%S.%f+00:00" + refresh_token_error_status_codes: + title: Refresh Token Error Status Codes + description: Status Codes to Identify refresh token error in response (Refresh Token Error Key and Refresh Token Error Values should be also specified). Responses with one of the error status code and containing an error value will be flagged as a config error + type: array + items: + type: integer + default: [400] + examples: + - [400, 500] + refresh_token_error_key: + title: Refresh Token Error Key + description: Key to Identify refresh token error in response (Refresh Token Error Status Codes and Refresh Token Error Values should be also specified). + type: string + default: "error" + examples: + - "error" + refresh_token_error_values: + title: Refresh Token Error Values + description: 'List of values to check for exception during token refresh process. Used to check if the error found in the response matches the key from the Refresh Token Error Key field (e.g. response={"error": "invalid_grant"}). Only responses with one of the error status code and containing an error value will be flagged as a config error' + type: array + items: + type: string + default: ["invalid_grant", "invalid_permissions"] + examples: + - ["invalid_grant", "invalid_permissions"] refresh_token_updater: title: Refresh Token Updater description: When the refresh token updater is defined, new refresh tokens, access tokens and the access token expiry date are written back from the authentication response to the config object. This is important if the refresh token can only used once. @@ -1468,7 +1493,7 @@ definitions: examples: - ["credentials", "token_expiry_date"] refresh_token_error_status_codes: - title: Refresh Token Error Status Codes + title: (Deprecated - Use the same field on the OAuthAuthenticator level) Refresh Token Error Status Codes description: Status Codes to Identify refresh token error in response (Refresh Token Error Key and Refresh Token Error Values should be also specified). Responses with one of the error status code and containing an error value will be flagged as a config error type: array items: @@ -1477,14 +1502,14 @@ definitions: examples: - [400, 500] refresh_token_error_key: - title: Refresh Token Error Key + title: (Deprecated - Use the same field on the OAuthAuthenticator level) Refresh Token Error Key description: Key to Identify refresh token error in response (Refresh Token Error Status Codes and Refresh Token Error Values should be also specified). type: string default: "" examples: - "error" refresh_token_error_values: - title: Refresh Token Error Values + title: (Deprecated - Use the same field on the OAuthAuthenticator level) Refresh Token Error Values description: 'List of values to check for exception during token refresh process. Used to check if the error found in the response matches the key from the Refresh Token Error Key field (e.g. response={"error": "invalid_grant"}). Only responses with one of the error status code and containing an error value will be flagged as a config error' type: array items: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 35186ef71..e108af64f 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -426,19 +424,19 @@ class RefreshTokenUpdater(BaseModel): [], description="Status Codes to Identify refresh token error in response (Refresh Token Error Key and Refresh Token Error Values should be also specified). Responses with one of the error status code and containing an error value will be flagged as a config error", examples=[[400, 500]], - title="Refresh Token Error Status Codes", + title="(Deprecated - Use the same field on the OAuthAuthenticator level) Refresh Token Error Status Codes", ) refresh_token_error_key: Optional[str] = Field( "", description="Key to Identify refresh token error in response (Refresh Token Error Status Codes and Refresh Token Error Values should be also specified).", examples=["error"], - title="Refresh Token Error Key", + title="(Deprecated - Use the same field on the OAuthAuthenticator level) Refresh Token Error Key", ) refresh_token_error_values: Optional[List[str]] = Field( [], description='List of values to check for exception during token refresh process. Used to check if the error found in the response matches the key from the Refresh Token Error Key field (e.g. response={"error": "invalid_grant"}). Only responses with one of the error status code and containing an error value will be flagged as a config error', examples=[["invalid_grant", "invalid_permissions"]], - title="Refresh Token Error Values", + title="(Deprecated - Use the same field on the OAuthAuthenticator level) Refresh Token Error Values", ) @@ -1900,6 +1898,24 @@ class OAuthAuthenticator(BaseModel): examples=["%Y-%m-%d %H:%M:%S.%f+00:00"], title="Token Expiry Date Format", ) + refresh_token_error_status_codes: Optional[List[int]] = Field( + [400], + description="Status Codes to Identify refresh token error in response (Refresh Token Error Key and Refresh Token Error Values should be also specified). Responses with one of the error status code and containing an error value will be flagged as a config error", + examples=[[400, 500]], + title="Refresh Token Error Status Codes", + ) + refresh_token_error_key: Optional[str] = Field( + "error", + description="Key to Identify refresh token error in response (Refresh Token Error Status Codes and Refresh Token Error Values should be also specified).", + examples=["error"], + title="Refresh Token Error Key", + ) + refresh_token_error_values: Optional[List[str]] = Field( + ["invalid_grant", "invalid_permissions"], + description='List of values to check for exception during token refresh process. Used to check if the error found in the response matches the key from the Refresh Token Error Key field (e.g. response={"error": "invalid_grant"}). Only responses with one of the error status code and containing an error value will be flagged as a config error', + examples=[["invalid_grant", "invalid_permissions"]], + title="Refresh Token Error Values", + ) refresh_token_updater: Optional[RefreshTokenUpdater] = Field( None, description="When the refresh token updater is defined, new refresh tokens, access tokens and the access token expiry date are written back from the authentication response to the config object. This is important if the refresh token can only used once.", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index fdaf26bba..6bf1d44a2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -18,6 +18,7 @@ Mapping, MutableMapping, Optional, + Tuple, Type, Union, cast, @@ -400,6 +401,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordSelector as RecordSelectorModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + RefreshTokenUpdater as RefreshTokenUpdaterModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RemoveFields as RemoveFieldsModel, ) @@ -2789,6 +2793,9 @@ def create_oauth_authenticator( else None ) + refresh_token_error_status_codes, refresh_token_error_key, refresh_token_error_values = ( + self._get_refresh_token_error_information(model) + ) if model.refresh_token_updater: # ignore type error because fixing it would have a lot of dependencies, revisit later return DeclarativeSingleUseRefreshTokenOauth2Authenticator( # type: ignore @@ -2839,9 +2846,9 @@ def create_oauth_authenticator( token_expiry_date_format=model.token_expiry_date_format, token_expiry_is_time_of_expiration=bool(model.token_expiry_date_format), message_repository=self._message_repository, - refresh_token_error_status_codes=model.refresh_token_updater.refresh_token_error_status_codes, - refresh_token_error_key=model.refresh_token_updater.refresh_token_error_key, - refresh_token_error_values=model.refresh_token_updater.refresh_token_error_values, + refresh_token_error_status_codes=refresh_token_error_status_codes, + refresh_token_error_key=refresh_token_error_key, + refresh_token_error_values=refresh_token_error_values, ) # ignore type error because fixing it would have a lot of dependencies, revisit later return DeclarativeOauth2Authenticator( # type: ignore @@ -2868,8 +2875,59 @@ def create_oauth_authenticator( message_repository=self._message_repository, profile_assertion=profile_assertion, use_profile_assertion=model.use_profile_assertion, + refresh_token_error_status_codes=refresh_token_error_status_codes, + refresh_token_error_key=refresh_token_error_key, + refresh_token_error_values=refresh_token_error_values, ) + @staticmethod + def _get_refresh_token_error_information( + model: OAuthAuthenticatorModel, + ) -> Tuple[Tuple[int, ...], str, Tuple[str, ...]]: + """ + In a previous version of the CDK, the auth error as config_error was only done if a refresh token updater was + defined. As a transition, we added those fields on the OAuthAuthenticatorModel. This method ensures that the + information is defined only once and return the right fields. + """ + refresh_token_updater = model.refresh_token_updater + is_defined_on_refresh_token_updated = refresh_token_updater and ( + refresh_token_updater.refresh_token_error_status_codes + or refresh_token_updater.refresh_token_error_key + or refresh_token_updater.refresh_token_error_values + ) + is_defined_on_oauth_authenticator = ( + model.refresh_token_error_status_codes + or model.refresh_token_error_key + or model.refresh_token_error_values + ) + if is_defined_on_refresh_token_updated and is_defined_on_oauth_authenticator: + raise ValueError( + "refresh_token_error should either be defined on the OAuthAuthenticatorModel or the RefreshTokenUpdaterModel, not both" + ) + + if is_defined_on_refresh_token_updated: + not_optional_refresh_token_updater: RefreshTokenUpdaterModel = refresh_token_updater # type: ignore # we know from the condition that this is not None + return ( + tuple(not_optional_refresh_token_updater.refresh_token_error_status_codes) + if not_optional_refresh_token_updater.refresh_token_error_status_codes + else (), + not_optional_refresh_token_updater.refresh_token_error_key or "", + tuple(not_optional_refresh_token_updater.refresh_token_error_values) + if not_optional_refresh_token_updater.refresh_token_error_values + else (), + ) + elif is_defined_on_oauth_authenticator: + return ( + tuple(model.refresh_token_error_status_codes) + if model.refresh_token_error_status_codes + else (), + model.refresh_token_error_key or "", + tuple(model.refresh_token_error_values) if model.refresh_token_error_values else (), + ) + + # returning default values we think cover most cases + return (400,), "error", ("invalid_grant", "invalid_permissions") + def create_offset_increment( self, model: OffsetIncrementModel, From b5530041a658e0fdb5cc1c155d603441c543844c Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Fri, 7 Nov 2025 10:28:28 -0500 Subject: [PATCH 2/4] self code review --- airbyte_cdk/sources/declarative/auth/oauth.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/auth/oauth.py b/airbyte_cdk/sources/declarative/auth/oauth.py index 7d889fdec..e1ad84e09 100644 --- a/airbyte_cdk/sources/declarative/auth/oauth.py +++ b/airbyte_cdk/sources/declarative/auth/oauth.py @@ -80,8 +80,6 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut refresh_token_error_values: Tuple[str, ...] = () def __post_init__(self, parameters: Mapping[str, Any]) -> None: - # Convert lists to tuples for parent class compatibility - super().__init__( refresh_token_error_status_codes=self.refresh_token_error_status_codes, refresh_token_error_key=self.refresh_token_error_key, From 245e2142c78ab3ffef02d3ba9ea069132a7d3d46 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Mon, 10 Nov 2025 09:15:57 -0500 Subject: [PATCH 3/4] fix test --- .../declarative_component_schema.yaml | 3 - .../models/declarative_component_schema.py | 150 +++++++++++------- .../test_model_to_component_factory.py | 4 +- 3 files changed, 91 insertions(+), 66 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 373f2494b..153a5d105 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1433,14 +1433,12 @@ definitions: type: array items: type: integer - default: [400] examples: - [400, 500] refresh_token_error_key: title: Refresh Token Error Key description: Key to Identify refresh token error in response (Refresh Token Error Status Codes and Refresh Token Error Values should be also specified). type: string - default: "error" examples: - "error" refresh_token_error_values: @@ -1449,7 +1447,6 @@ definitions: type: array items: type: string - default: ["invalid_grant", "invalid_permissions"] examples: - ["invalid_grant", "invalid_permissions"] refresh_token_updater: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e108af64f..81df131c5 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -926,24 +926,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -961,7 +965,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1465,7 +1471,9 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], + examples=[ + "source_declarative_manifest.components.MyCustomConfigTransformation" + ], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1883,7 +1891,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1899,19 +1909,19 @@ class OAuthAuthenticator(BaseModel): title="Token Expiry Date Format", ) refresh_token_error_status_codes: Optional[List[int]] = Field( - [400], + None, description="Status Codes to Identify refresh token error in response (Refresh Token Error Key and Refresh Token Error Values should be also specified). Responses with one of the error status code and containing an error value will be flagged as a config error", examples=[[400, 500]], title="Refresh Token Error Status Codes", ) refresh_token_error_key: Optional[str] = Field( - "error", + None, description="Key to Identify refresh token error in response (Refresh Token Error Status Codes and Refresh Token Error Values should be also specified).", examples=["error"], title="Refresh Token Error Key", ) refresh_token_error_values: Optional[List[str]] = Field( - ["invalid_grant", "invalid_permissions"], + None, description='List of values to check for exception during token refresh process. Used to check if the error found in the response matches the key from the Refresh Token Error Key field (e.g. response={"error": "invalid_grant"}). Only responses with one of the error status code and containing an error value will be flagged as a config error', examples=[["invalid_grant", "invalid_permissions"]], title="Refresh Token Error Values", @@ -2100,7 +2110,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2142,10 +2154,12 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", + ) ) @@ -2162,10 +2176,12 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", + ) ) @@ -2190,12 +2206,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( - Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", - ) + error_handlers: List[ + Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] + ] = Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2357,9 +2373,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( - None - ) + streams: Optional[ + List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] + ] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2484,16 +2500,20 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2667,18 +2687,20 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( + Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", + ) ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2850,7 +2872,9 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2937,13 +2961,17 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + download_target_extractor: Optional[ + Union[DpathExtractor, CustomRecordExtractor] + ] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 591d47ae6..7dd819944 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -636,9 +636,9 @@ def test_single_use_oauth_branch(): # default values assert authenticator._access_token_config_path == ["credentials", "access_token"] assert authenticator._token_expiry_date_config_path == ["credentials", "token_expiry_date"] - assert authenticator._refresh_token_error_status_codes == [400] + assert authenticator._refresh_token_error_status_codes == (400,) assert authenticator._refresh_token_error_key == "error" - assert authenticator._refresh_token_error_values == ["invalid_grant"] + assert authenticator._refresh_token_error_values == ("invalid_grant",) def test_list_based_stream_slicer_with_values_refd(): From 3336a1f08cf1f7a24f0a0c3aafb3c747db880281 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Mon, 10 Nov 2025 16:08:35 +0000 Subject: [PATCH 4/4] Auto-fix lint and format issues --- .../models/declarative_component_schema.py | 144 +++++++----------- 1 file changed, 58 insertions(+), 86 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 81df131c5..0f5c0f1f9 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -926,28 +926,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -965,9 +961,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1471,9 +1465,7 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=[ - "source_declarative_manifest.components.MyCustomConfigTransformation" - ], + examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1891,9 +1883,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2110,9 +2100,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2154,12 +2142,10 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", ) @@ -2176,12 +2162,10 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", ) @@ -2206,12 +2190,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[ - Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] - ] = Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( + Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", + ) ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2373,9 +2357,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[ - List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] - ] = None + streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( + None + ) dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2500,20 +2484,16 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2687,20 +2667,18 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( - Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", - ) + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2872,9 +2850,7 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2961,17 +2937,13 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[ - Union[DpathExtractor, CustomRecordExtractor] - ] = Field( + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",