Skip to content

Commit 3876b4d

Browse files
fix: make ResponseToFileExtractor respect configured delimiter instead of hardcoding comma
ResponseToFileExtractor was using pd.read_csv(dialect='unix') which hardcodes a comma delimiter, ignoring the configured download_decoder's CsvDecoder delimiter setting. This caused tab-separated (TSV) files to be parsed as a single column. Changes: - Add 'delimiter' parameter to ResponseToFileExtractor (default: ',') - Replace dialect='unix' with explicit CSV options + configurable delimiter - Add delimiter property to declarative component schema - Pass delimiter from model to component in factory - Add tests for tab-delimited CSV parsing Related: #909 (gzip decompression fix for ResponseToFileExtractor) Co-Authored-By: Ilja Herdt <ilja.herdt@airbyte.io>
1 parent 7f41401 commit 3876b4d

File tree

5 files changed

+163
-64
lines changed

5 files changed

+163
-64
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1940,6 +1940,11 @@ definitions:
19401940
type:
19411941
type: string
19421942
enum: [ResponseToFileExtractor]
1943+
delimiter:
1944+
title: Delimiter
1945+
description: The delimiter used to separate values in the CSV data. Defaults to comma (',').
1946+
type: string
1947+
default: ","
19431948
$parameters:
19441949
type: object
19451950
additionalProperties: true

airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4+
import csv
45
import logging
56
import os
67
import uuid
@@ -30,9 +31,12 @@ class ResponseToFileExtractor(RecordExtractor):
3031
"""
3132

3233
parameters: InitVar[Mapping[str, Any]]
34+
delimiter: str = ","
3335

3436
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3537
self.logger = logging.getLogger("airbyte")
38+
if self.delimiter.startswith("\\"):
39+
self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape")
3640

3741
def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
3842
"""
@@ -137,7 +141,14 @@ def _read_with_chunks(
137141
try:
138142
with open(path, "r", encoding=file_encoding) as data:
139143
chunks = pd.read_csv(
140-
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
144+
data,
145+
chunksize=chunk_size,
146+
iterator=True,
147+
dtype=object,
148+
delimiter=self.delimiter,
149+
quoting=csv.QUOTE_ALL,
150+
doublequote=True,
151+
lineterminator="\n",
141152
)
142153
for chunk in chunks:
143154
chunk = chunk.replace({nan: None}).to_dict(orient="records")

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 91 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
31
# generated by datamodel-codegen:
42
# filename: declarative_component_schema.yaml
53

@@ -502,6 +500,11 @@ class DpathExtractor(BaseModel):
502500

503501
class ResponseToFileExtractor(BaseModel):
504502
type: Literal["ResponseToFileExtractor"]
503+
delimiter: Optional[str] = Field(
504+
",",
505+
description="The delimiter used to separate values in the CSV data. Defaults to comma (',').",
506+
title="Delimiter",
507+
)
505508
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
506509

507510

@@ -930,24 +933,28 @@ class OAuthConfigSpecification(BaseModel):
930933
class Config:
931934
extra = Extra.allow
932935

933-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
934-
None,
935-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
936-
examples=[
937-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
938-
{
939-
"app_id": {
940-
"type": "string",
941-
"path_in_connector_config": ["info", "app_id"],
942-
}
943-
},
944-
],
945-
title="OAuth user input",
936+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
937+
Field(
938+
None,
939+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
940+
examples=[
941+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
942+
{
943+
"app_id": {
944+
"type": "string",
945+
"path_in_connector_config": ["info", "app_id"],
946+
}
947+
},
948+
],
949+
title="OAuth user input",
950+
)
946951
)
947-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
948-
None,
949-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
950-
title="DeclarativeOAuth Connector Specification",
952+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
953+
Field(
954+
None,
955+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
956+
title="DeclarativeOAuth Connector Specification",
957+
)
951958
)
952959
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
953960
None,
@@ -965,7 +972,9 @@ class Config:
965972
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
966973
None,
967974
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
968-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
975+
examples=[
976+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
977+
],
969978
title="OAuth input specification",
970979
)
971980
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1469,7 +1478,9 @@ class CustomConfigTransformation(BaseModel):
14691478
class_name: str = Field(
14701479
...,
14711480
description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_<name>.<package>.<class_name>`.",
1472-
examples=["source_declarative_manifest.components.MyCustomConfigTransformation"],
1481+
examples=[
1482+
"source_declarative_manifest.components.MyCustomConfigTransformation"
1483+
],
14731484
)
14741485
parameters: Optional[Dict[str, Any]] = Field(
14751486
None,
@@ -1897,7 +1908,9 @@ class OAuthAuthenticator(BaseModel):
18971908
scopes: Optional[List[str]] = Field(
18981909
None,
18991910
description="List of scopes that should be granted to the access token.",
1900-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
1911+
examples=[
1912+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
1913+
],
19011914
title="Scopes",
19021915
)
19031916
token_expiry_date: Optional[str] = Field(
@@ -2124,7 +2137,9 @@ class RecordSelector(BaseModel):
21242137
description="Responsible for filtering records to be emitted by the Source.",
21252138
title="Record Filter",
21262139
)
2127-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
2140+
schema_normalization: Optional[
2141+
Union[SchemaNormalization, CustomSchemaNormalization]
2142+
] = Field(
21282143
None,
21292144
description="Responsible for normalization according to the schema.",
21302145
title="Schema Normalization",
@@ -2166,10 +2181,12 @@ class DpathValidator(BaseModel):
21662181
],
21672182
title="Field Path",
21682183
)
2169-
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
2170-
...,
2171-
description="The condition that the specified config value will be evaluated against",
2172-
title="Validation Strategy",
2184+
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = (
2185+
Field(
2186+
...,
2187+
description="The condition that the specified config value will be evaluated against",
2188+
title="Validation Strategy",
2189+
)
21732190
)
21742191

21752192

@@ -2186,10 +2203,12 @@ class PredicateValidator(BaseModel):
21862203
],
21872204
title="Value",
21882205
)
2189-
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field(
2190-
...,
2191-
description="The validation strategy to apply to the value.",
2192-
title="Validation Strategy",
2206+
validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = (
2207+
Field(
2208+
...,
2209+
description="The validation strategy to apply to the value.",
2210+
title="Validation Strategy",
2211+
)
21932212
)
21942213

21952214

@@ -2214,12 +2233,12 @@ class ConfigAddFields(BaseModel):
22142233

22152234
class CompositeErrorHandler(BaseModel):
22162235
type: Literal["CompositeErrorHandler"]
2217-
error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = (
2218-
Field(
2219-
...,
2220-
description="List of error handlers to iterate on to determine how to handle a failed response.",
2221-
title="Error Handlers",
2222-
)
2236+
error_handlers: List[
2237+
Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]
2238+
] = Field(
2239+
...,
2240+
description="List of error handlers to iterate on to determine how to handle a failed response.",
2241+
title="Error Handlers",
22232242
)
22242243
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22252244

@@ -2381,9 +2400,9 @@ class Config:
23812400

23822401
type: Literal["DeclarativeSource"]
23832402
check: Union[CheckStream, CheckDynamicStream]
2384-
streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = (
2385-
None
2386-
)
2403+
streams: Optional[
2404+
List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]
2405+
] = None
23872406
dynamic_streams: List[DynamicDeclarativeStream]
23882407
version: str = Field(
23892408
...,
@@ -2508,16 +2527,20 @@ class Config:
25082527
extra = Extra.allow
25092528

25102529
type: Literal["DeclarativeStream"]
2511-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
2530+
name: Optional[str] = Field(
2531+
"", description="The stream name.", example=["Users"], title="Name"
2532+
)
25122533
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
25132534
...,
25142535
description="Component used to coordinate how records are extracted across stream slices and request pages.",
25152536
title="Retriever",
25162537
)
2517-
incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field(
2518-
None,
2519-
description="Component used to fetch data incrementally based on a time field in the data.",
2520-
title="Incremental Sync",
2538+
incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = (
2539+
Field(
2540+
None,
2541+
description="Component used to fetch data incrementally based on a time field in the data.",
2542+
title="Incremental Sync",
2543+
)
25212544
)
25222545
primary_key: Optional[PrimaryKey] = Field("", title="Primary Key")
25232546
schema_loader: Optional[
@@ -2691,18 +2714,20 @@ class HttpRequester(BaseModelWithDeprecations):
26912714
description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.",
26922715
title="Query Properties",
26932716
)
2694-
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field(
2695-
None,
2696-
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2697-
examples=[
2698-
{"unit": "day"},
2699-
{
2700-
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2701-
},
2702-
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2703-
{"sort_by[asc]": "updated_at"},
2704-
],
2705-
title="Query Parameters",
2717+
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = (
2718+
Field(
2719+
None,
2720+
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2721+
examples=[
2722+
{"unit": "day"},
2723+
{
2724+
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2725+
},
2726+
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2727+
{"sort_by[asc]": "updated_at"},
2728+
],
2729+
title="Query Parameters",
2730+
)
27062731
)
27072732
request_headers: Optional[Union[Dict[str, str], str]] = Field(
27082733
None,
@@ -2874,7 +2899,9 @@ class QueryProperties(BaseModel):
28742899

28752900
class StateDelegatingStream(BaseModel):
28762901
type: Literal["StateDelegatingStream"]
2877-
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2902+
name: str = Field(
2903+
..., description="The stream name.", example=["Users"], title="Name"
2904+
)
28782905
full_refresh_stream: DeclarativeStream = Field(
28792906
...,
28802907
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2961,13 +2988,17 @@ class AsyncRetriever(BaseModel):
29612988
status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
29622989
..., description="Responsible for fetching the actual status of the async job."
29632990
)
2964-
download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field(
2991+
download_target_extractor: Optional[
2992+
Union[DpathExtractor, CustomRecordExtractor]
2993+
] = Field(
29652994
None,
29662995
description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.",
29672996
)
29682997
download_extractor: Optional[
29692998
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
2970-
] = Field(None, description="Responsible for fetching the records from provided urls.")
2999+
] = Field(
3000+
None, description="Responsible for fetching the records from provided urls."
3001+
)
29713002
creation_requester: Union[HttpRequester, CustomRequester] = Field(
29723003
...,
29733004
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",

0 commit comments

Comments
 (0)