From 2120a18ba759297b0cd4a12fdcf09903ab42fd95 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 4 Mar 2025 19:00:26 +0200 Subject: [PATCH 01/14] add --- .../declarative_component_schema.yaml | 4 + .../extractors/response_to_file_extractor.py | 193 +++++++++++++----- .../models/declarative_component_schema.py | 4 + .../parsers/model_to_component_factory.py | 4 +- 4 files changed, 154 insertions(+), 51 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 6cd9998c7..3ff38b051 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1678,6 +1678,10 @@ definitions: type: type: string enum: [ResponseToFileExtractor] + file_type: + title: The file type in which the response data is storred. Supported types are [csv, jsonl]. + type: string + default: csv $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 76631ee6b..f0e2ccf94 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -7,6 +7,7 @@ import zlib from contextlib import closing from dataclasses import InitVar, dataclass +from enum import Enum from typing import Any, Dict, Iterable, Mapping, Optional, Tuple import pandas as pd @@ -20,21 +21,57 @@ DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 +class FileTypes(Enum): + CSV = "csv" + JSONL = "jsonl" + + @dataclass class ResponseToFileExtractor(RecordExtractor): """ - This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as - a tradeoff. + This class is used when having very big HTTP responses (usually streamed), + which would require too much memory so we use disk space as a tradeoff. + + The extractor does the following: + 1) Save the response to a temporary file + 2) Read from the temporary file by chunks to avoid OOM + 3) Remove the temporary file after reading + 4) Return the records + 5) If the response is not compressed, it will be filtered for null bytes + 6) If the response is compressed, it will be decompressed + 7) If the response is compressed and contains null bytes, it will be filtered for null bytes - Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for - a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing. """ parameters: InitVar[Mapping[str, Any]] + file_type: Optional[str] = "csv" def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.logger = logging.getLogger("airbyte") + def extract_records( + self, response: Optional[requests.Response] = None + ) -> Iterable[Mapping[str, Any]]: + """ + Extracts records from the given response by: + 1) Saving the result to a tmp file + 2) Reading from saved file by chunks to avoid OOM + + Args: + response (Optional[requests.Response]): The response object containing the data. Defaults to None. + + Yields: + Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. + + Returns: + None + """ + if response: + file_path, encoding = self._save_to_file(response) + yield from self._read_with_chunks(file_path, encoding) + else: + yield from [] + def _get_response_encoding(self, headers: Dict[str, Any]) -> str: """ Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library @@ -42,6 +79,7 @@ def _get_response_encoding(self, headers: Dict[str, Any]) -> str: Args: headers (Dict[str, Any]): The headers of the response. + Returns: str: The encoding of the response. """ @@ -73,11 +111,28 @@ def _filter_null_bytes(self, b: bytes) -> bytes: res = b.replace(b"\x00", b"") if len(res) < len(b): - self.logger.warning( - "Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res) - ) + message = "ResponseToFileExtractor._filter_null_bytes(): Filter 'null' bytes from string, size reduced %d -> %d chars" + self.logger.warning(message, len(b), len(res)) return res + def _get_file_path(self) -> str: + """ + Get a temporary file path with a unique name. + + Returns: + str: The path to the temporary file. + + Raises: + ValueError: If the file type is not supported. + """ + + if self.file_type not in [file_type.value for file_type in FileTypes]: + raise ValueError( + f"ResponseToFileExtractor._get_file_path(): File type {self.file_type} is not supported.", + ) + + return str(uuid.uuid4()) + "." + self.file_type + def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: """ Saves the binary data from the given response to a temporary file and returns the filepath and response encoding. @@ -95,8 +150,9 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32) needs_decompression = True # we will assume at first that the response is compressed and change the flag if not - tmp_file = str(uuid.uuid4()) - with closing(response) as response, open(tmp_file, "wb") as data_file: + file_path = self._get_file_path() + # save binary data to tmp file + with closing(response) as response, open(file_path, "wb") as data_file: response_encoding = self._get_response_encoding(dict(response.headers or {})) for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): try: @@ -110,15 +166,76 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: needs_decompression = False # check the file exists - if os.path.isfile(tmp_file): - return tmp_file, response_encoding + if os.path.isfile(file_path): + return file_path, response_encoding else: - raise ValueError( - f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist." - ) + message = "ResponseToFileExtractor._save_to_file(): The IO/Error occured while verifying binary data." + raise ValueError(f"{message} Tmp file {file_path} doesn't exist.") + + def _read_csv( + self, + path: str, + file_encoding: str, + chunk_size: int = 100, + ) -> Iterable[Mapping[str, Any]]: + """ + Reads a CSV file and yields each row as a dictionary. + + Args: + path (str): The path to the CSV file to be read. + file_encoding (str): The encoding of the file. + + Yields: + Mapping[str, Any]: A dictionary representing each row of data. + """ + + csv_read_params = { + "chunksize": chunk_size, + "iterator": True, + "dialect": "unix", + "dtype": object, + "encoding": file_encoding, + } + + for chunk in pd.read_csv(path, **csv_read_params): + # replace NaN with None + chunk = chunk.replace({nan: None}).to_dict(orient="records") + for record in chunk: + yield record + + def _read_json_lines( + self, + path: str, + file_encoding: str, + chunk_size: int = 100, + ) -> Iterable[Mapping[str, Any]]: + """ + Reads a JSON file and yields each row as a dictionary. + + Args: + path (str): The path to the JSON file to be read. + file_encoding (str): The encoding of the file. + + Yields: + Mapping[str, Any]: A dictionary representing each row of data. + """ + + json_read_params = { + "lines": True, + "chunksize": chunk_size, + "encoding": file_encoding, + "convert_dates": False, + } + + for chunk in pd.read_json(path, **json_read_params): + for record in chunk.to_dict(orient="records"): + yield record def _read_with_chunks( - self, path: str, file_encoding: str, chunk_size: int = 100 + self, + path: str, + file_encoding: str, + chunk_size: int = 100, ) -> Iterable[Mapping[str, Any]]: """ Reads data from a file in chunks and yields each row as a dictionary. @@ -132,47 +249,23 @@ def _read_with_chunks( Mapping[str, Any]: A dictionary representing each row of data. Raises: - ValueError: If an IO/Error occurs while reading the temporary data. + ValueError: If an error occurs while reading the data from the file. """ try: - # TODO: Add support for other file types, like `json`, with `pd.read_json()` - with open(path, "r", encoding=file_encoding) as data: - chunks = pd.read_csv( - data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object - ) - for chunk in chunks: - chunk = chunk.replace({nan: None}).to_dict(orient="records") - for row in chunk: - yield row + if self.file_type == FileTypes.CSV.value: + yield from self._read_csv(path, file_encoding, chunk_size) + + if self.file_type == FileTypes.JSONL.value: + yield from self._read_json_lines(path, file_encoding, chunk_size) + except pd.errors.EmptyDataError as e: - self.logger.info(f"Empty data received. {e}") + message = "ResponseToFileExtractor._read_with_chunks(): Empty data received." + self.logger.info(f"{message} {e}") yield from [] except IOError as ioe: - raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe) + message = "ResponseToFileExtractor._read_with_chunks(): The IO/Error occured while reading the data from file." + raise ValueError(f"{message} Called: {path}", ioe) finally: # remove binary tmp file, after data is read os.remove(path) - - def extract_records( - self, response: Optional[requests.Response] = None - ) -> Iterable[Mapping[str, Any]]: - """ - Extracts records from the given response by: - 1) Saving the result to a tmp file - 2) Reading from saved file by chunks to avoid OOM - - Args: - response (Optional[requests.Response]): The response object containing the data. Defaults to None. - - Yields: - Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. - - Returns: - None - """ - if response: - file_path, encoding = self._save_to_file(response) - yield from self._read_with_chunks(file_path, encoding) - else: - yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index a49b66c03..1a114981d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -702,6 +702,10 @@ class DpathExtractor(BaseModel): class ResponseToFileExtractor(BaseModel): type: Literal["ResponseToFileExtractor"] + file_type: Optional[str] = Field( + "csv", + title="The file type in which the response data is storred. Supported types are [csv, jsonl].", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39058f834..39da868d7 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1992,7 +1992,9 @@ def create_response_to_file_extractor( model: ResponseToFileExtractorModel, **kwargs: Any, ) -> ResponseToFileExtractor: - return ResponseToFileExtractor(parameters=model.parameters or {}) + return ResponseToFileExtractor( + parameters=model.parameters or {}, file_type=model.file_type or "csv" + ) @staticmethod def create_exponential_backoff_strategy( From 5167da5fe990c9809a8afd37e18d39488e76ddb6 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Tue, 4 Mar 2025 19:34:41 +0200 Subject: [PATCH 02/14] fix linter issues --- .../declarative/extractors/response_to_file_extractor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index f0e2ccf94..266e10098 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -197,7 +197,7 @@ def _read_csv( "encoding": file_encoding, } - for chunk in pd.read_csv(path, **csv_read_params): + for chunk in pd.read_csv(path, **csv_read_params): # type: ignore # ignoring how args are passed # replace NaN with None chunk = chunk.replace({nan: None}).to_dict(orient="records") for record in chunk: @@ -227,7 +227,7 @@ def _read_json_lines( "convert_dates": False, } - for chunk in pd.read_json(path, **json_read_params): + for chunk in pd.read_json(path, **json_read_params): # type: ignore # ignoring how args are passed for record in chunk.to_dict(orient="records"): yield record From 030f1065ce78f13cbc95c3ebe49f6fd9aa63d7db Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 5 Mar 2025 18:20:52 +0200 Subject: [PATCH 03/14] updated after the review --- .../declarative_component_schema.yaml | 4 - .../decoders/composite_raw_decoder.py | 25 ++- .../extractors/response_to_file_extractor.py | 192 +++++------------- .../models/declarative_component_schema.py | 4 - .../parsers/model_to_component_factory.py | 38 ++-- 5 files changed, 94 insertions(+), 169 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3ff38b051..6cd9998c7 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1678,10 +1678,6 @@ definitions: type: type: string enum: [ResponseToFileExtractor] - file_type: - title: The file type in which the response data is storred. Supported types are [csv, jsonl]. - type: string - default: csv $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index b8e8e3315..209afb1c2 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -35,15 +35,36 @@ def parse( class GzipParser(Parser): inner_parser: Parser + def _reset_reader_pointer(self, data: BufferedIOBase) -> None: + """ + Reset the reader pointer to the beginning of the data. + + Note: + - This is necessary because the gzip decompression will consume the data stream. + """ + data.seek(0) + def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. + + IMPORTANT: + - If the data is not gzipped, reset the pointer and pass the data to the inner parser as is. + + Note: + - The data is not decoded by default. """ - with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: - yield from self.inner_parser.parse(gzipobj) + + try: + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) + except gzip.BadGzipFile: + logger.warning(f"GzipParser(): Received non-gzipped data, parsing the data as is.") + self._reset_reader_pointer(data) + yield from self.inner_parser.parse(data) @dataclass diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 266e10098..0215ddb45 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -7,7 +7,6 @@ import zlib from contextlib import closing from dataclasses import InitVar, dataclass -from enum import Enum from typing import Any, Dict, Iterable, Mapping, Optional, Tuple import pandas as pd @@ -21,57 +20,21 @@ DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 -class FileTypes(Enum): - CSV = "csv" - JSONL = "jsonl" - - @dataclass class ResponseToFileExtractor(RecordExtractor): """ - This class is used when having very big HTTP responses (usually streamed), - which would require too much memory so we use disk space as a tradeoff. - - The extractor does the following: - 1) Save the response to a temporary file - 2) Read from the temporary file by chunks to avoid OOM - 3) Remove the temporary file after reading - 4) Return the records - 5) If the response is not compressed, it will be filtered for null bytes - 6) If the response is compressed, it will be decompressed - 7) If the response is compressed and contains null bytes, it will be filtered for null bytes + This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as + a tradeoff. + Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for + a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing. """ parameters: InitVar[Mapping[str, Any]] - file_type: Optional[str] = "csv" def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.logger = logging.getLogger("airbyte") - def extract_records( - self, response: Optional[requests.Response] = None - ) -> Iterable[Mapping[str, Any]]: - """ - Extracts records from the given response by: - 1) Saving the result to a tmp file - 2) Reading from saved file by chunks to avoid OOM - - Args: - response (Optional[requests.Response]): The response object containing the data. Defaults to None. - - Yields: - Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. - - Returns: - None - """ - if response: - file_path, encoding = self._save_to_file(response) - yield from self._read_with_chunks(file_path, encoding) - else: - yield from [] - def _get_response_encoding(self, headers: Dict[str, Any]) -> str: """ Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library @@ -79,7 +42,6 @@ def _get_response_encoding(self, headers: Dict[str, Any]) -> str: Args: headers (Dict[str, Any]): The headers of the response. - Returns: str: The encoding of the response. """ @@ -111,27 +73,10 @@ def _filter_null_bytes(self, b: bytes) -> bytes: res = b.replace(b"\x00", b"") if len(res) < len(b): - message = "ResponseToFileExtractor._filter_null_bytes(): Filter 'null' bytes from string, size reduced %d -> %d chars" - self.logger.warning(message, len(b), len(res)) - return res - - def _get_file_path(self) -> str: - """ - Get a temporary file path with a unique name. - - Returns: - str: The path to the temporary file. - - Raises: - ValueError: If the file type is not supported. - """ - - if self.file_type not in [file_type.value for file_type in FileTypes]: - raise ValueError( - f"ResponseToFileExtractor._get_file_path(): File type {self.file_type} is not supported.", + self.logger.warning( + "Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res) ) - - return str(uuid.uuid4()) + "." + self.file_type + return res def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: """ @@ -150,9 +95,8 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32) needs_decompression = True # we will assume at first that the response is compressed and change the flag if not - file_path = self._get_file_path() - # save binary data to tmp file - with closing(response) as response, open(file_path, "wb") as data_file: + tmp_file = str(uuid.uuid4()) + with closing(response) as response, open(tmp_file, "wb") as data_file: response_encoding = self._get_response_encoding(dict(response.headers or {})) for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): try: @@ -166,76 +110,15 @@ def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: needs_decompression = False # check the file exists - if os.path.isfile(file_path): - return file_path, response_encoding + if os.path.isfile(tmp_file): + return tmp_file, response_encoding else: - message = "ResponseToFileExtractor._save_to_file(): The IO/Error occured while verifying binary data." - raise ValueError(f"{message} Tmp file {file_path} doesn't exist.") - - def _read_csv( - self, - path: str, - file_encoding: str, - chunk_size: int = 100, - ) -> Iterable[Mapping[str, Any]]: - """ - Reads a CSV file and yields each row as a dictionary. - - Args: - path (str): The path to the CSV file to be read. - file_encoding (str): The encoding of the file. - - Yields: - Mapping[str, Any]: A dictionary representing each row of data. - """ - - csv_read_params = { - "chunksize": chunk_size, - "iterator": True, - "dialect": "unix", - "dtype": object, - "encoding": file_encoding, - } - - for chunk in pd.read_csv(path, **csv_read_params): # type: ignore # ignoring how args are passed - # replace NaN with None - chunk = chunk.replace({nan: None}).to_dict(orient="records") - for record in chunk: - yield record - - def _read_json_lines( - self, - path: str, - file_encoding: str, - chunk_size: int = 100, - ) -> Iterable[Mapping[str, Any]]: - """ - Reads a JSON file and yields each row as a dictionary. - - Args: - path (str): The path to the JSON file to be read. - file_encoding (str): The encoding of the file. - - Yields: - Mapping[str, Any]: A dictionary representing each row of data. - """ - - json_read_params = { - "lines": True, - "chunksize": chunk_size, - "encoding": file_encoding, - "convert_dates": False, - } - - for chunk in pd.read_json(path, **json_read_params): # type: ignore # ignoring how args are passed - for record in chunk.to_dict(orient="records"): - yield record + raise ValueError( + f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist." + ) def _read_with_chunks( - self, - path: str, - file_encoding: str, - chunk_size: int = 100, + self, path: str, file_encoding: str, chunk_size: int = 100 ) -> Iterable[Mapping[str, Any]]: """ Reads data from a file in chunks and yields each row as a dictionary. @@ -249,23 +132,46 @@ def _read_with_chunks( Mapping[str, Any]: A dictionary representing each row of data. Raises: - ValueError: If an error occurs while reading the data from the file. + ValueError: If an IO/Error occurs while reading the temporary data. """ try: - if self.file_type == FileTypes.CSV.value: - yield from self._read_csv(path, file_encoding, chunk_size) - - if self.file_type == FileTypes.JSONL.value: - yield from self._read_json_lines(path, file_encoding, chunk_size) - + with open(path, "r", encoding=file_encoding) as data: + chunks = pd.read_csv( + data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object + ) + for chunk in chunks: + chunk = chunk.replace({nan: None}).to_dict(orient="records") + for row in chunk: + yield row except pd.errors.EmptyDataError as e: - message = "ResponseToFileExtractor._read_with_chunks(): Empty data received." - self.logger.info(f"{message} {e}") + self.logger.info(f"Empty data received. {e}") yield from [] except IOError as ioe: - message = "ResponseToFileExtractor._read_with_chunks(): The IO/Error occured while reading the data from file." - raise ValueError(f"{message} Called: {path}", ioe) + raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe) finally: # remove binary tmp file, after data is read os.remove(path) + + def extract_records( + self, response: Optional[requests.Response] = None + ) -> Iterable[Mapping[str, Any]]: + """ + Extracts records from the given response by: + 1) Saving the result to a tmp file + 2) Reading from saved file by chunks to avoid OOM + + Args: + response (Optional[requests.Response]): The response object containing the data. Defaults to None. + + Yields: + Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. + + Returns: + None + """ + if response: + file_path, encoding = self._save_to_file(response) + yield from self._read_with_chunks(file_path, encoding) + else: + yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 1a114981d..a49b66c03 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -702,10 +702,6 @@ class DpathExtractor(BaseModel): class ResponseToFileExtractor(BaseModel): type: Literal["ResponseToFileExtractor"] - file_type: Optional[str] = Field( - "csv", - title="The file type in which the response data is storred. Supported types are [csv, jsonl].", - ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 39da868d7..da9b018a3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1992,9 +1992,7 @@ def create_response_to_file_extractor( model: ResponseToFileExtractorModel, **kwargs: Any, ) -> ResponseToFileExtractor: - return ResponseToFileExtractor( - parameters=model.parameters or {}, file_type=model.file_type or "csv" - ) + return ResponseToFileExtractor(parameters=model.parameters or {}) @staticmethod def create_exponential_backoff_strategy( @@ -2195,10 +2193,12 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A stream_response=False if self._emit_connector_builder_messages else True, ) - @staticmethod - def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder: + def create_jsonl_decoder( + self, model: JsonlDecoderModel, config: Config, **kwargs: Any + ) -> Decoder: return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), stream_response=True + parser=ModelToComponentFactory._get_parser(model, config), + stream_response=False if self._emit_connector_builder_messages else True, ) def create_gzip_decoder( @@ -2755,7 +2755,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie ) paginator = ( self._create_component_from_model( - model=model.download_paginator, decoder=decoder, config=config, url_base="" + model=model.download_paginator, + decoder=operational_decoder, + config=config, + url_base="", ) if model.download_paginator else NoPagination(parameters={}) @@ -2784,7 +2787,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie parameters={}, ) - decoder = ( + operational_decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) @@ -2792,7 +2795,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie record_selector = self._create_component_from_model( model=model.record_selector, config=config, - decoder=decoder, + decoder=operational_decoder, name=name, transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, @@ -2800,13 +2803,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) creation_requester = self._create_component_from_model( model=model.creation_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job creation - {name}", ) polling_requester = self._create_component_from_model( model=model.polling_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job polling - {name}", ) @@ -2841,7 +2844,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie abort_requester = ( self._create_component_from_model( model=model.abort_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job abort - {name}", ) @@ -2851,7 +2854,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie delete_requester = ( self._create_component_from_model( model=model.delete_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job delete - {name}", ) @@ -2861,7 +2864,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie download_target_requester = ( self._create_component_from_model( model=model.download_target_requester, - decoder=decoder, + decoder=operational_decoder, config=config, name=f"job extract_url - {name}", ) @@ -2869,10 +2872,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie else None ) status_extractor = self._create_component_from_model( - model=model.status_extractor, decoder=decoder, config=config, name=name + model=model.status_extractor, decoder=operational_decoder, config=config, name=name ) download_target_extractor = self._create_component_from_model( - model=model.download_target_extractor, decoder=decoder, config=config, name=name + model=model.download_target_extractor, + decoder=operational_decoder, + config=config, + name=name, ) job_repository: AsyncJobRepository = AsyncHttpJobRepository( creation_requester=creation_requester, From 0d85176c01998222af7ddc56e7965b33a025ac35 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Wed, 5 Mar 2025 18:31:22 +0200 Subject: [PATCH 04/14] removed non-used constant --- .../sources/declarative/extractors/response_to_file_extractor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py index 0215ddb45..c7fd98c17 100644 --- a/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/response_to_file_extractor.py @@ -15,7 +15,6 @@ from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor -EMPTY_STR: str = "" DEFAULT_ENCODING: str = "utf-8" DOWNLOAD_CHUNK_SIZE: int = 1024 * 10 From f0be859b6018a7f52c7be61e33109c14833d90d5 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 6 Mar 2025 11:38:23 +0200 Subject: [PATCH 05/14] fixed stripping trailing slash when the path provides values --- .../declarative/requesters/http_requester.py | 40 +++++++++++++++---- .../requesters/test_http_requester.py | 2 +- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/requesters/http_requester.py b/airbyte_cdk/sources/declarative/requesters/http_requester.py index 8a64fae60..45671fc59 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_requester.py +++ b/airbyte_cdk/sources/declarative/requesters/http_requester.py @@ -16,7 +16,9 @@ ) from airbyte_cdk.sources.declarative.decoders import Decoder from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( + InterpolatedString, +) from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) @@ -26,7 +28,10 @@ from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState -from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context +from airbyte_cdk.utils.mapping_helpers import ( + combine_mappings, + get_interpolation_context, +) @dataclass @@ -155,7 +160,9 @@ def get_request_params( next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: return self._request_options_provider.get_request_params( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) def get_request_headers( @@ -166,7 +173,9 @@ def get_request_headers( next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: return self._request_options_provider.get_request_headers( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) # fixing request options provider types has a lot of dependencies @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping[str, Any]]: return self._request_options_provider.get_request_body_json( - stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + stream_state=stream_state, + stream_slice=stream_slice, + next_page_token=next_page_token, ) @property @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str: path (str): The path to join with the base URL. Returns: - str: The concatenated URL with the trailing slash (if any) removed. + str: The resulting joined URL. + + Note: + Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869 + - If the path is an empty string or None, the method returns the base URL with any trailing slash removed. + + Example: + 1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint' + 2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint' + 3) _join_url("https://example.com/api/", "") >> 'https://example.com/api' + 4) _join_url("https://example.com/api", None) >> 'https://example.com/api' """ - return urljoin(url_base, path).rstrip("/") + + # return a full-url if provided directly from interpolation context + if path == EmptyString or path is None: + return url_base.rstrip("/") + + return urljoin(url_base, path) def send_request( self, diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index a1229579f..dfe78011a 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -825,7 +825,7 @@ def test_send_request_stream_slice_next_page_token(): "test_trailing_slash_on_path", "https://airbyte.io", "/my_endpoint/", - "https://airbyte.io/my_endpoint", + "https://airbyte.io/my_endpoint/", ), ( "test_nested_path_no_leading_slash", From 559b280ebb9a4c46b115a503225acc3eb3409061 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Thu, 6 Mar 2025 23:11:22 +0200 Subject: [PATCH 06/14] updated after the review --- .../decoders/composite_raw_decoder.py | 59 +++++++++++++------ .../decoders/test_composite_decoder.py | 27 ++++++++- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 209afb1c2..857d2f0e1 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -1,3 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + import csv import gzip import io @@ -18,12 +22,21 @@ logger = logging.getLogger("airbyte") +COMPRESSION_TYPES = [ + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", +] + + @dataclass class Parser(ABC): @abstractmethod def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse data and yield dictionaries. @@ -35,18 +48,10 @@ def parse( class GzipParser(Parser): inner_parser: Parser - def _reset_reader_pointer(self, data: BufferedIOBase) -> None: - """ - Reset the reader pointer to the beginning of the data. - - Note: - - This is necessary because the gzip decompression will consume the data stream. - """ - data.seek(0) - def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. @@ -58,12 +63,10 @@ def parse( - The data is not decoded by default. """ - try: + if compressed: with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) - except gzip.BadGzipFile: - logger.warning(f"GzipParser(): Received non-gzipped data, parsing the data as is.") - self._reset_reader_pointer(data) + else: yield from self.inner_parser.parse(data) @@ -71,7 +74,11 @@ def parse( class JsonParser(Parser): encoding: str = "utf-8" - def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: + def parse( + self, + data: BufferedIOBase, + compressed: Optional[bool] = False, + ) -> Generator[MutableMapping[str, Any], None, None]: """ Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ @@ -114,6 +121,7 @@ class JsonLineParser(Parser): def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: @@ -141,6 +149,7 @@ def _get_delimiter(self) -> Optional[str]: def parse( self, data: BufferedIOBase, + compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse CSV data from decompressed bytes. @@ -156,10 +165,15 @@ class CompositeRawDecoder(Decoder): """ Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] passed response.raw to parser(s). - Note: response.raw is not decoded/decompressed by default. - parsers should be instantiated recursively. + + Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. + Example: - composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) + composite_raw_decoder = CompositeRawDecoder( + parser=GzipParser( + inner_parser=JsonLineParser(encoding="iso-8859-1") + ) + ) """ parser: Parser @@ -168,10 +182,17 @@ class CompositeRawDecoder(Decoder): def is_stream_response(self) -> bool: return self.stream_response + def is_compressed(self, response: requests.Response) -> bool: + """ + Check if the response is compressed based on the Content-Encoding header. + """ + return response.headers.get("Content-Encoding") in COMPRESSION_TYPES + def decode( - self, response: requests.Response + self, + response: requests.Response, ) -> Generator[MutableMapping[str, Any], None, None]: if self.is_stream_response(): - yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type] else: yield from self.parser.parse(data=io.BytesIO(response.content)) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 745113925..26cbae613 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -110,6 +110,30 @@ def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str assert counter == 3 +@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) +def test_composite_raw_decoder_gzip_jsonline_parser_decodes_non_gzipped_raw_response( + requests_mock, encoding: str +) -> None: + """ + Test the GzipParser with a non-compressed response. + """ + + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + # we encode the jsonl content as bytes here + content="".join(generate_jsonlines()).encode(encoding), + ) + response = requests.get("https://airbyte.io/", stream=True) + + parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) + composite_raw_decoder = CompositeRawDecoder(parser=parser) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + @pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): response_content = "".join(generate_jsonlines()) @@ -224,7 +248,8 @@ def test_given_response_is_not_streamed_when_decode_then_can_be_called_multiple_ ) response = requests.get("https://airbyte.io/") composite_raw_decoder = CompositeRawDecoder( - parser=JsonParser(encoding="utf-8"), stream_response=False + parser=JsonParser(encoding="utf-8"), + stream_response=False, ) content = list(composite_raw_decoder.decode(response)) From dbed4d6b6f7a773a3e8d8a819fae5305cde3f7a3 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 00:09:22 +0200 Subject: [PATCH 07/14] updated COMPRESSED_TYPES and fixed the tests --- .../declarative/decoders/composite_raw_decoder.py | 11 ++++++++--- .../declarative/decoders/zipfile_decoder.py | 15 +++++++++++---- .../decoders/test_composite_decoder.py | 11 ++++++++++- .../declarative/decoders/test_zipfile_decoder.py | 7 ++++++- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 4bf38f869..1162afa09 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -27,6 +27,10 @@ "x-gzip", "gzip, deflate", "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", ] @@ -64,6 +68,7 @@ def parse( """ if compressed: + print(f"\n\nHERE\n\n") with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) else: @@ -193,9 +198,9 @@ def decode( response: requests.Response, ) -> Generator[MutableMapping[str, Any], None, None]: if self.is_stream_response(): - # urllib mentions that some interfaces don't play nice with auto_close - # [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content) - # We have indeed observed some issues with CSV parsing. + # urllib mentions that some interfaces don't play nice with auto_close + # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content + # We have indeed observed some issues with CSV parsing. # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type] diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index a937a1e4d..637f742b1 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -13,9 +13,7 @@ from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders import Decoder -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( - Parser, -) +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import COMPRESSION_TYPES, Parser from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -28,6 +26,12 @@ class ZipfileDecoder(Decoder): def is_stream_response(self) -> bool: return False + def is_compressed(self, response: requests.Response) -> bool: + """ + Check if the response is compressed based on the Content-Encoding header. + """ + return response.headers.get("Content-Encoding") in COMPRESSION_TYPES + def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: @@ -37,7 +41,10 @@ def decode( unzipped_content = zip_file.read(file_name) buffered_content = BytesIO(unzipped_content) try: - yield from self.parser.parse(buffered_content) + yield from self.parser.parse( + buffered_content, + compressed=self.is_compressed(response), + ) except Exception as e: logger.error( f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 5aba44dc2..98079686c 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -68,6 +68,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): "GET", "https://airbyte.io/", content=generate_csv(encoding=encoding, delimiter="\t", should_compress=True), + headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) @@ -107,7 +108,10 @@ def generate_compressed_jsonlines(encoding: str = "utf-8") -> bytes: @pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str): requests_mock.register_uri( - "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding) + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(encoding=encoding), + headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) @@ -132,6 +136,11 @@ def test_composite_raw_decoder_gzip_jsonline_parser_decodes_non_gzipped_raw_resp "https://airbyte.io/", # we encode the jsonl content as bytes here content="".join(generate_jsonlines()).encode(encoding), + # we don't specify the `Content-Encoding` header here + # to simulate a non-compressed response + # but we still use the GzipParser to decode it + # to test the GzipParser's behavior with non-compressed data + # and to ensure it doesn't raise an error. ) response = requests.get("https://airbyte.io/", stream=True) diff --git a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py index 731895e2e..f5c988d0f 100644 --- a/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_zipfile_decoder.py @@ -43,7 +43,12 @@ def test_zipfile_decoder_with_single_file_response(requests_mock, json_data): zipfile_decoder = ZipfileDecoder(parser=GzipParser(inner_parser=JsonParser())) compressed_data = gzip.compress(json.dumps(json_data).encode()) zipped_data = create_zip_from_dict(compressed_data) - requests_mock.register_uri("GET", "https://airbyte.io/", content=zipped_data) + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=zipped_data, + headers={"Content-Encoding": "application/zip"}, + ) response = requests.get("https://airbyte.io/") if isinstance(json_data, list): From ead326dc20e5e85346f37baa70fa75ab7d8a6954 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 00:25:23 +0200 Subject: [PATCH 08/14] reverted operational_decoder to decoder --- .../parsers/model_to_component_factory.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d270750c5..09f42282e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2756,7 +2756,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie paginator = ( self._create_component_from_model( model=model.download_paginator, - decoder=operational_decoder, + decoder=decoder, config=config, url_base="", ) @@ -2787,7 +2787,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie parameters={}, ) - operational_decoder = ( + decoder = ( self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={}) @@ -2795,7 +2795,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie record_selector = self._create_component_from_model( model=model.record_selector, config=config, - decoder=operational_decoder, + decoder=decoder, name=name, transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, @@ -2803,13 +2803,13 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie stream_slicer = stream_slicer or SinglePartitionRouter(parameters={}) creation_requester = self._create_component_from_model( model=model.creation_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job creation - {name}", ) polling_requester = self._create_component_from_model( model=model.polling_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job polling - {name}", ) @@ -2844,7 +2844,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie abort_requester = ( self._create_component_from_model( model=model.abort_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job abort - {name}", ) @@ -2854,7 +2854,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie delete_requester = ( self._create_component_from_model( model=model.delete_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job delete - {name}", ) @@ -2864,7 +2864,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie download_target_requester = ( self._create_component_from_model( model=model.download_target_requester, - decoder=operational_decoder, + decoder=decoder, config=config, name=f"job extract_url - {name}", ) @@ -2872,11 +2872,11 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie else None ) status_extractor = self._create_component_from_model( - model=model.status_extractor, decoder=operational_decoder, config=config, name=name + model=model.status_extractor, decoder=decoder, config=config, name=name ) download_target_extractor = self._create_component_from_model( model=model.download_target_extractor, - decoder=operational_decoder, + decoder=decoder, config=config, name=name, ) From f1a71a66810b6c7d8356e6f67f64a29ed6eaca5f Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 00:29:55 +0200 Subject: [PATCH 09/14] removed print --- .../sources/declarative/decoders/composite_raw_decoder.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 1162afa09..30b8c449f 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -68,7 +68,6 @@ def parse( """ if compressed: - print(f"\n\nHERE\n\n") with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) else: From 3750f6d218261e9c83940b9b00956ac96560875d Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 08:34:13 +0200 Subject: [PATCH 10/14] updated compression types checks --- .../decoders/composite_raw_decoder.py | 23 ++++--------------- .../sources/declarative/decoders/decoder.py | 20 ++++++++++++++++ .../declarative/decoders/zipfile_decoder.py | 11 ++------- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 30b8c449f..60636b845 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -22,18 +22,6 @@ logger = logging.getLogger("airbyte") -COMPRESSION_TYPES = [ - "gzip", - "x-gzip", - "gzip, deflate", - "x-gzip, deflate", - "application/zip", - "application/gzip", - "application/x-gzip", - "application/x-zip-compressed", -] - - @dataclass class Parser(ABC): @abstractmethod @@ -186,12 +174,6 @@ class CompositeRawDecoder(Decoder): def is_stream_response(self) -> bool: return self.stream_response - def is_compressed(self, response: requests.Response) -> bool: - """ - Check if the response is compressed based on the Content-Encoding header. - """ - return response.headers.get("Content-Encoding") in COMPRESSION_TYPES - def decode( self, response: requests.Response, @@ -202,7 +184,10 @@ def decode( # We have indeed observed some issues with CSV parsing. # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False - yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type] + yield from self.parser.parse( + data=response.raw, # type: ignore[arg-type] + compressed=self.is_compressed_response(response), + ) response.raw.close() else: yield from self.parser.parse(data=io.BytesIO(response.content)) diff --git a/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte_cdk/sources/declarative/decoders/decoder.py index 5fa9dc8f6..44445034a 100644 --- a/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -8,6 +8,17 @@ import requests +COMPRESSION_RESPONSE_TYPES = [ + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", +] + @dataclass class Decoder: @@ -30,3 +41,12 @@ def decode( :param response: the response to decode :return: Generator of Mapping describing the response """ + + def is_compressed_response(self, response: requests.Response) -> bool: + """ + Check if the response is compressed based on the Content-Encoding header. + """ + return ( + response.headers.get("Content-Encoding") in COMPRESSION_RESPONSE_TYPES + or response.headers.get("Content-Type") in COMPRESSION_RESPONSE_TYPES + ) diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index 637f742b1..4423519f9 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -8,12 +8,11 @@ from io import BytesIO from typing import Any, Generator, MutableMapping -import orjson import requests from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders import Decoder -from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import COMPRESSION_TYPES, Parser +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import Parser from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -26,12 +25,6 @@ class ZipfileDecoder(Decoder): def is_stream_response(self) -> bool: return False - def is_compressed(self, response: requests.Response) -> bool: - """ - Check if the response is compressed based on the Content-Encoding header. - """ - return response.headers.get("Content-Encoding") in COMPRESSION_TYPES - def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: @@ -43,7 +36,7 @@ def decode( try: yield from self.parser.parse( buffered_content, - compressed=self.is_compressed(response), + compressed=self.is_compressed_response(response), ) except Exception as e: logger.error( From 96f41dac7dd666266b3f4541de27ff5a1cb7da58 Mon Sep 17 00:00:00 2001 From: Oleksandr Bazarnov Date: Fri, 7 Mar 2025 14:09:05 +0200 Subject: [PATCH 11/14] nit --- airbyte_cdk/sources/declarative/decoders/decoder.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/decoder.py b/airbyte_cdk/sources/declarative/decoders/decoder.py index 44445034a..d195caeac 100644 --- a/airbyte_cdk/sources/declarative/decoders/decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/decoder.py @@ -8,7 +8,7 @@ import requests -COMPRESSION_RESPONSE_TYPES = [ +COMPRESSSED_RESPONSE_TYPES = [ "gzip", "x-gzip", "gzip, deflate", @@ -44,9 +44,9 @@ def decode( def is_compressed_response(self, response: requests.Response) -> bool: """ - Check if the response is compressed based on the Content-Encoding header. + Check if the response is compressed based on the `Content-Encoding` or `Content-Type` header. """ return ( - response.headers.get("Content-Encoding") in COMPRESSION_RESPONSE_TYPES - or response.headers.get("Content-Type") in COMPRESSION_RESPONSE_TYPES + response.headers.get("Content-Encoding") in COMPRESSSED_RESPONSE_TYPES + or response.headers.get("Content-Type") in COMPRESSSED_RESPONSE_TYPES ) From 2bde838692cc9e71500ebe3dec109fb7c3121c7d Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Mar 2025 10:07:46 -0500 Subject: [PATCH 12/14] Proposal for generic composite raw decoder --- .../decoders/composite_raw_decoder.py | 50 ++++++++++++------- .../declarative/decoders/zipfile_decoder.py | 1 - .../parsers/model_to_component_factory.py | 19 ++++++- .../decoders/test_composite_decoder.py | 46 +++++++++-------- 4 files changed, 75 insertions(+), 41 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 60636b845..05391e67c 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -10,7 +10,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass from io import BufferedIOBase, TextIOWrapper -from typing import Any, Generator, MutableMapping, Optional +from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple import orjson import requests @@ -28,7 +28,6 @@ class Parser(ABC): def parse( self, data: BufferedIOBase, - compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse data and yield dictionaries. @@ -43,7 +42,6 @@ class GzipParser(Parser): def parse( self, data: BufferedIOBase, - compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Decompress gzipped bytes and pass decompressed data to the inner parser. @@ -55,11 +53,8 @@ def parse( - The data is not decoded by default. """ - if compressed: - with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: - yield from self.inner_parser.parse(gzipobj) - else: - yield from self.inner_parser.parse(data) + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) @dataclass @@ -69,7 +64,6 @@ class JsonParser(Parser): def parse( self, data: BufferedIOBase, - compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. @@ -113,7 +107,6 @@ class JsonLineParser(Parser): def parse( self, data: BufferedIOBase, - compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: @@ -141,7 +134,6 @@ def _get_delimiter(self) -> Optional[str]: def parse( self, data: BufferedIOBase, - compressed: Optional[bool] = False, ) -> Generator[MutableMapping[str, Any], None, None]: """ Parse CSV data from decompressed bytes. @@ -152,7 +144,9 @@ def parse( yield row -@dataclass +_HEADER = str +_HEADER_VALUE = str + class CompositeRawDecoder(Decoder): """ Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] @@ -168,26 +162,46 @@ class CompositeRawDecoder(Decoder): ) """ - parser: Parser - stream_response: bool = True + @classmethod + def by_headers(cls, parsers: List[Tuple[Set[_HEADER], Set[_HEADER_VALUE], Parser]], stream_response: bool, fallback_parser: Parser) -> "CompositeRawDecoder": + parsers_by_header = {} + for headers, header_values, parser in parsers: + for header in headers: + parsers_by_header[header] = {header_value: parser for header_value in header_values} + return cls(fallback_parser, stream_response, parsers_by_header) + + @classmethod + def from_parser(cls, parser: Parser, stream_response: bool) -> "CompositeRawDecoder": + return cls(parser, stream_response, {}) + + def __init__(self, parser: Parser, stream_response: bool = True, parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None) -> None: + self._parsers_by_header = parsers_by_header if parsers_by_header else {} + self._fallback_parser = parser + self._stream_response = stream_response def is_stream_response(self) -> bool: - return self.stream_response + return self._stream_response def decode( self, response: requests.Response, ) -> Generator[MutableMapping[str, Any], None, None]: + parser = self._select_parser(response) if self.is_stream_response(): # urllib mentions that some interfaces don't play nice with auto_close # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content # We have indeed observed some issues with CSV parsing. # Hence, we will manage the closing of the file ourselves until we find a better solution. response.raw.auto_close = False - yield from self.parser.parse( + yield from parser.parse( data=response.raw, # type: ignore[arg-type] - compressed=self.is_compressed_response(response), ) response.raw.close() else: - yield from self.parser.parse(data=io.BytesIO(response.content)) + yield from parser.parse(data=io.BytesIO(response.content)) + + def _select_parser(self, response: requests.Response) -> Parser: + for header, parser_by_header_value in self._parsers_by_header.items(): + if header in response.headers and response.headers[header] in parser_by_header_value.keys(): + return parser_by_header_value[response.headers[header]] + return self._fallback_parser diff --git a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py index 4423519f9..d2642acbb 100644 --- a/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py @@ -36,7 +36,6 @@ def decode( try: yield from self.parser.parse( buffered_content, - compressed=self.is_compressed_response(response), ) except Exception as e: logger.error( diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 09f42282e..e00f8f2fb 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -513,6 +513,17 @@ SchemaNormalizationModel.Default: TransformConfig.DefaultSchemaNormalization, } +_COMPRESSED_RESPONSE_TYPES = { + "gzip", + "x-gzip", + "gzip, deflate", + "x-gzip, deflate", + "application/zip", + "application/gzip", + "application/x-gzip", + "application/x-zip-compressed", +} + class ModelToComponentFactory: EPOCH_DATETIME_FORMAT = "%s" @@ -2204,9 +2215,13 @@ def create_jsonl_decoder( def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: - return CompositeRawDecoder( - parser=ModelToComponentFactory._get_parser(model, config), + gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser + return CompositeRawDecoder.by_headers( + [ + ({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser) + ], stream_response=False if self._emit_connector_builder_messages else True, + fallback_parser=gzip_parser.inner_parser, ) @staticmethod diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 98079686c..bccd1afea 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -8,7 +8,8 @@ from http.server import BaseHTTPRequestHandler, HTTPServer from io import BytesIO, StringIO from threading import Thread -from unittest.mock import patch +from typing import Iterable +from unittest.mock import Mock, patch import pytest import requests @@ -82,7 +83,7 @@ def test_composite_raw_decoder_gzip_csv_parser(requests_mock, encoding: str): assert counter == 3 -def generate_jsonlines(): +def generate_jsonlines() -> Iterable[str]: """ Generator function to yield data in JSON Lines format. This is useful for streaming large datasets. @@ -111,41 +112,46 @@ def test_composite_raw_decoder_gzip_jsonline_parser(requests_mock, encoding: str "GET", "https://airbyte.io/", content=generate_compressed_jsonlines(encoding=encoding), - headers={"Content-Encoding": "gzip"}, ) response = requests.get("https://airbyte.io/", stream=True) parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) - composite_raw_decoder = CompositeRawDecoder(parser=parser) + composite_raw_decoder = CompositeRawDecoder(parser) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1 assert counter == 3 -@pytest.mark.parametrize("encoding", ["utf-8", "utf", "iso-8859-1"]) -def test_composite_raw_decoder_gzip_jsonline_parser_decodes_non_gzipped_raw_response( - requests_mock, encoding: str -) -> None: - """ - Test the GzipParser with a non-compressed response. - """ +def test_given_header_match_when_decode_then_select_parser(requests_mock): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=generate_compressed_jsonlines(), + headers={"Content-Encoding": "gzip"}, + ) + response = requests.get("https://airbyte.io/", stream=True) + parser = GzipParser(inner_parser=JsonLineParser()) + unused_parser = Mock() + composite_raw_decoder = CompositeRawDecoder.by_headers([({"Content-Encoding"}, {"gzip"}, parser)], stream_response=True, fallback_parser=unused_parser) + counter = 0 + for _ in composite_raw_decoder.decode(response): + counter += 1 + assert counter == 3 + + +def test_given_header_does_not_match_when_decode_then_select_fallback_parser(requests_mock): requests_mock.register_uri( "GET", "https://airbyte.io/", - # we encode the jsonl content as bytes here - content="".join(generate_jsonlines()).encode(encoding), - # we don't specify the `Content-Encoding` header here - # to simulate a non-compressed response - # but we still use the GzipParser to decode it - # to test the GzipParser's behavior with non-compressed data - # and to ensure it doesn't raise an error. + content="".join(generate_jsonlines()).encode("utf-8"), + headers={"Content-Encoding": "not gzip in order to expect fallback"}, ) response = requests.get("https://airbyte.io/", stream=True) - parser = GzipParser(inner_parser=JsonLineParser(encoding=encoding)) - composite_raw_decoder = CompositeRawDecoder(parser=parser) + unused_parser = GzipParser(inner_parser=Mock()) + composite_raw_decoder = CompositeRawDecoder.by_headers([({"Content-Encoding"}, {"gzip"}, unused_parser)], stream_response=True, fallback_parser=JsonLineParser()) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1 From a13e80188dbeb2f225c4dd159ea0a9192579b2c2 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 7 Mar 2025 15:11:10 +0000 Subject: [PATCH 13/14] Auto-fix lint and format issues --- .../decoders/composite_raw_decoder.py | 20 ++++++++++++++++--- .../parsers/model_to_component_factory.py | 4 +--- .../decoders/test_composite_decoder.py | 12 +++++++++-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 05391e67c..1e89db1d9 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -147,6 +147,7 @@ def parse( _HEADER = str _HEADER_VALUE = str + class CompositeRawDecoder(Decoder): """ Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] @@ -163,7 +164,12 @@ class CompositeRawDecoder(Decoder): """ @classmethod - def by_headers(cls, parsers: List[Tuple[Set[_HEADER], Set[_HEADER_VALUE], Parser]], stream_response: bool, fallback_parser: Parser) -> "CompositeRawDecoder": + def by_headers( + cls, + parsers: List[Tuple[Set[_HEADER], Set[_HEADER_VALUE], Parser]], + stream_response: bool, + fallback_parser: Parser, + ) -> "CompositeRawDecoder": parsers_by_header = {} for headers, header_values, parser in parsers: for header in headers: @@ -174,7 +180,12 @@ def by_headers(cls, parsers: List[Tuple[Set[_HEADER], Set[_HEADER_VALUE], Parser def from_parser(cls, parser: Parser, stream_response: bool) -> "CompositeRawDecoder": return cls(parser, stream_response, {}) - def __init__(self, parser: Parser, stream_response: bool = True, parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None) -> None: + def __init__( + self, + parser: Parser, + stream_response: bool = True, + parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None, + ) -> None: self._parsers_by_header = parsers_by_header if parsers_by_header else {} self._fallback_parser = parser self._stream_response = stream_response @@ -202,6 +213,9 @@ def decode( def _select_parser(self, response: requests.Response) -> Parser: for header, parser_by_header_value in self._parsers_by_header.items(): - if header in response.headers and response.headers[header] in parser_by_header_value.keys(): + if ( + header in response.headers + and response.headers[header] in parser_by_header_value.keys() + ): return parser_by_header_value[response.headers[header]] return self._fallback_parser diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index e00f8f2fb..a926023fc 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2217,9 +2217,7 @@ def create_gzip_decoder( ) -> Decoder: gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser return CompositeRawDecoder.by_headers( - [ - ({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser) - ], + [({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser)], stream_response=False if self._emit_connector_builder_messages else True, fallback_parser=gzip_parser.inner_parser, ) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index bccd1afea..02c0993b6 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -134,7 +134,11 @@ def test_given_header_match_when_decode_then_select_parser(requests_mock): parser = GzipParser(inner_parser=JsonLineParser()) unused_parser = Mock() - composite_raw_decoder = CompositeRawDecoder.by_headers([({"Content-Encoding"}, {"gzip"}, parser)], stream_response=True, fallback_parser=unused_parser) + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, parser)], + stream_response=True, + fallback_parser=unused_parser, + ) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1 @@ -151,7 +155,11 @@ def test_given_header_does_not_match_when_decode_then_select_fallback_parser(req response = requests.get("https://airbyte.io/", stream=True) unused_parser = GzipParser(inner_parser=Mock()) - composite_raw_decoder = CompositeRawDecoder.by_headers([({"Content-Encoding"}, {"gzip"}, unused_parser)], stream_response=True, fallback_parser=JsonLineParser()) + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, unused_parser)], + stream_response=True, + fallback_parser=JsonLineParser(), + ) counter = 0 for _ in composite_raw_decoder.decode(response): counter += 1 From 10e6552f67b8d3d02f160e66c7184b8a0f6fb29a Mon Sep 17 00:00:00 2001 From: maxi297 Date: Fri, 7 Mar 2025 11:44:33 -0500 Subject: [PATCH 14/14] Builder shenanigans --- .../declarative/decoders/composite_raw_decoder.py | 11 +---------- .../declarative/parsers/model_to_component_factory.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 1e89db1d9..b735d8e6e 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -176,16 +176,7 @@ def by_headers( parsers_by_header[header] = {header_value: parser for header_value in header_values} return cls(fallback_parser, stream_response, parsers_by_header) - @classmethod - def from_parser(cls, parser: Parser, stream_response: bool) -> "CompositeRawDecoder": - return cls(parser, stream_response, {}) - - def __init__( - self, - parser: Parser, - stream_response: bool = True, - parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None, - ) -> None: + def __init__(self, parser: Parser, stream_response: bool = True, parsers_by_header: Optional[Dict[_HEADER, Dict[_HEADER_VALUE, Parser]]] = None) -> None: self._parsers_by_header = parsers_by_header if parsers_by_header else {} self._fallback_parser = parser self._stream_response = stream_response diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a926023fc..31b8595d9 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2216,9 +2216,16 @@ def create_gzip_decoder( self, model: GzipDecoderModel, config: Config, **kwargs: Any ) -> Decoder: gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser + + if self._emit_connector_builder_messages: + # This is very surprising but if the response is not streamed, CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw which uses urllib3 directly and does not uncompress the data + return CompositeRawDecoder(gzip_parser.inner_parser, False) + return CompositeRawDecoder.by_headers( - [({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser)], - stream_response=False if self._emit_connector_builder_messages else True, + [ + ({"Content-Encoding", "Content-Type"}, _COMPRESSED_RESPONSE_TYPES, gzip_parser) + ], + stream_response=True, fallback_parser=gzip_parser.inner_parser, )