diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 84aaa6c53..8fb7a62c2 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1940,6 +1940,11 @@ definitions: type: type: string enum: [ResponseToFileExtractor] + delimiter: + title: Delimiter + description: The delimiter used to separate values in the CSV data. Defaults to comma (','). + type: string + default: "," $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index eea4b80b2..da71500f2 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import csv import logging import os import uuid @@ -30,9 +31,12 @@ class ResponseToFileExtractor(RecordExtractor): """ parameters: InitVar[Mapping[str, Any]] + delimiter: str = "," def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.logger = logging.getLogger("airbyte") + if self.delimiter.startswith("\\"): + self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape") def _get_response_encoding(self, headers: Dict[str, Any]) -> str: """ @@ -137,7 +141,14 @@ def _read_with_chunks( try: with open(path, "r", encoding=file_encoding) as data: chunks = pd.read_csv( - data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object + data, + chunksize=chunk_size, + iterator=True, + dtype=object, + delimiter=self.delimiter, + quoting=csv.QUOTE_ALL, + doublequote=True, + lineterminator="\n", ) for chunk in chunks: chunk = chunk.replace({nan: None}).to_dict(orient="records") diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5d2f0521f..614ceb5a4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -502,6 +500,11 @@ class DpathExtractor(BaseModel): class ResponseToFileExtractor(BaseModel): type: Literal["ResponseToFileExtractor"] + delimiter: Optional[str] = Field( + ",", + description="The delimiter used to separate values in the CSV data. Defaults to comma (',').", + title="Delimiter", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2bd7d268d..295d79a97 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2383,7 +2383,10 @@ def create_response_to_file_extractor( model: ResponseToFileExtractorModel, **kwargs: Any, ) -> ResponseToFileExtractor: - return ResponseToFileExtractor(parameters=model.parameters or {}) + return ResponseToFileExtractor( + parameters=model.parameters or {}, + delimiter=model.delimiter or ",", + ) @staticmethod def create_exponential_backoff_strategy( diff --git a/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py b/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py index eeefe0817..4cff1ccb8 100644 --- a/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_response_to_file_extractor.py @@ -14,7 +14,7 @@ class ResponseToFileExtractorTest(TestCase): def setUp(self) -> None: - self._extractor = ResponseToFileExtractor({}) + self._extractor = ResponseToFileExtractor(parameters={}) self._http_mocker = requests_mock.Mocker() self._http_mocker.__enter__() @@ -39,6 +39,55 @@ def test_text_response_with_null_bytes(self) -> None: assert extracted_records == [{"FIRST_NAME": "a first name", "LAST_NAME": "a last name"}] + def test_tab_delimited_response(self) -> None: + """Test that a tab-delimited (TSV) response is correctly parsed when delimiter is set to tab.""" + extractor = ResponseToFileExtractor(parameters={}, delimiter="\t") + tsv_data = "Date\tApp Name\tApp Apple Identifier\tEvent\n2026-02-28\tBullseye Blast\t1632779218\tImpression\n" + response = self._mock_streamed_response(BytesIO(tsv_data.encode("utf-8"))) + + extracted_records = list(extractor.extract_records(response)) + + assert len(extracted_records) == 1 + assert extracted_records[0] == { + "Date": "2026-02-28", + "App Name": "Bullseye Blast", + "App Apple Identifier": "1632779218", + "Event": "Impression", + } + + def test_escaped_tab_delimiter(self) -> None: + """Test that escaped tab delimiter (\\t from YAML) is correctly decoded.""" + extractor = ResponseToFileExtractor(parameters={}, delimiter="\\t") + tsv_data = "col1\tcol2\tcol3\nval1\tval2\tval3\n" + response = self._mock_streamed_response(BytesIO(tsv_data.encode("utf-8"))) + + extracted_records = list(extractor.extract_records(response)) + + assert len(extracted_records) == 1 + assert extracted_records[0] == {"col1": "val1", "col2": "val2", "col3": "val3"} + + def test_default_comma_delimiter(self) -> None: + """Test that the default comma delimiter still works correctly.""" + extractor = ResponseToFileExtractor(parameters={}) + csv_data = "col1,col2,col3\nval1,val2,val3\n" + response = self._mock_streamed_response(BytesIO(csv_data.encode("utf-8"))) + + extracted_records = list(extractor.extract_records(response)) + + assert len(extracted_records) == 1 + assert extracted_records[0] == {"col1": "val1", "col2": "val2", "col3": "val3"} + + def test_tab_delimiter_with_comma_in_values(self) -> None: + """Test that commas in field values are preserved when using tab delimiter.""" + extractor = ResponseToFileExtractor(parameters={}, delimiter="\t") + tsv_data = "name\taddress\nJohn Doe\t123 Main St, Apt 4\n" + response = self._mock_streamed_response(BytesIO(tsv_data.encode("utf-8"))) + + extracted_records = list(extractor.extract_records(response)) + + assert len(extracted_records) == 1 + assert extracted_records[0] == {"name": "John Doe", "address": "123 Main St, Apt 4"} + def _test_folder_path(self) -> Path: return Path(__file__).parent.resolve() @@ -76,7 +125,7 @@ def large_event_response_fixture(): @pytest.mark.limit_memory("20 MB") def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response): lines_in_response, file_path = large_events_response - extractor = ResponseToFileExtractor({}) + extractor = ResponseToFileExtractor(parameters={}) url = "https://for-all-mankind.nasa.com/api/v1/users/users1" requests_mock.get(url, body=open(file_path, "rb"))