Skip to content
Merged
69 changes: 59 additions & 10 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import csv
import gzip
import io
Expand All @@ -18,12 +22,25 @@
logger = logging.getLogger("airbyte")


COMPRESSION_TYPES = [
"gzip",
"x-gzip",
"gzip, deflate",
"x-gzip, deflate",
"application/zip",
"application/gzip",
"application/x-gzip",
"application/x-zip-compressed",
]


@dataclass
class Parser(ABC):
@abstractmethod
def parse(
self,
data: BufferedIOBase,
compressed: Optional[bool] = False,
Comment thread
bazarnov marked this conversation as resolved.
Outdated
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse data and yield dictionaries.
Expand All @@ -38,19 +55,35 @@ class GzipParser(Parser):
def parse(
self,
data: BufferedIOBase,
compressed: Optional[bool] = False,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.

IMPORTANT:
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.

Note:
- The data is not decoded by default.
"""
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)

if compressed:
print(f"\n\nHERE\n\n")
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
else:
yield from self.inner_parser.parse(data)


@dataclass
class JsonParser(Parser):
encoding: str = "utf-8"

def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
def parse(
self,
data: BufferedIOBase,
compressed: Optional[bool] = False,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
"""
Expand Down Expand Up @@ -93,6 +126,7 @@ class JsonLineParser(Parser):
def parse(
self,
data: BufferedIOBase,
compressed: Optional[bool] = False,
) -> Generator[MutableMapping[str, Any], None, None]:
for line in data:
try:
Expand Down Expand Up @@ -120,6 +154,7 @@ def _get_delimiter(self) -> Optional[str]:
def parse(
self,
data: BufferedIOBase,
compressed: Optional[bool] = False,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Parse CSV data from decompressed bytes.
Expand All @@ -135,10 +170,15 @@ class CompositeRawDecoder(Decoder):
"""
Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None]
passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default.
parsers should be instantiated recursively.

Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.

Example:
composite_raw_decoder = CompositeRawDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1")))
composite_raw_decoder = CompositeRawDecoder(
parser=GzipParser(
inner_parser=JsonLineParser(encoding="iso-8859-1")
)
)
"""

parser: Parser
Expand All @@ -147,14 +187,23 @@ class CompositeRawDecoder(Decoder):
def is_stream_response(self) -> bool:
return self.stream_response

def is_compressed(self, response: requests.Response) -> bool:
"""
Check if the response is compressed based on the Content-Encoding header.
"""
return response.headers.get("Content-Encoding") in COMPRESSION_TYPES

def decode(
self, response: requests.Response
self,
response: requests.Response,
) -> Generator[MutableMapping[str, Any], None, None]:
if self.is_stream_response():
# urllib mentions that some interfaces don't play nice with auto_close [here](https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content)
# We have indeed observed some issues with CSV parsing. Hence, we will manage the closing of the file ourselves until we find a better solution.
# urllib mentions that some interfaces don't play nice with auto_close
# More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
# We have indeed observed some issues with CSV parsing.
# Hence, we will manage the closing of the file ourselves until we find a better solution.
response.raw.auto_close = False
yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
yield from self.parser.parse(data=response.raw, compressed=self.is_compressed(response)) # type: ignore[arg-type]
response.raw.close()
else:
yield from self.parser.parse(data=io.BytesIO(response.content))
15 changes: 11 additions & 4 deletions airbyte_cdk/sources/declarative/decoders/zipfile_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
Parser,
)
from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import COMPRESSION_TYPES, Parser
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")
Expand All @@ -28,6 +26,12 @@ class ZipfileDecoder(Decoder):
def is_stream_response(self) -> bool:
return False

def is_compressed(self, response: requests.Response) -> bool:
"""
Check if the response is compressed based on the Content-Encoding header.
"""
return response.headers.get("Content-Encoding") in COMPRESSION_TYPES

def decode(
self, response: requests.Response
) -> Generator[MutableMapping[str, Any], None, None]:
Expand All @@ -37,7 +41,10 @@ def decode(
unzipped_content = zip_file.read(file_name)
buffered_content = BytesIO(unzipped_content)
try:
yield from self.parser.parse(buffered_content)
yield from self.parser.parse(
buffered_content,
compressed=self.is_compressed(response),
)
except Exception as e:
logger.error(
f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor

EMPTY_STR: str = ""
DEFAULT_ENCODING: str = "utf-8"
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10

Expand Down Expand Up @@ -136,7 +135,6 @@ def _read_with_chunks(
"""

try:
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,10 +2193,12 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A
stream_response=False if self._emit_connector_builder_messages else True,
)

@staticmethod
def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder:
def create_jsonl_decoder(
self, model: JsonlDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_gzip_decoder(
Expand Down Expand Up @@ -2753,7 +2755,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
)
paginator = (
self._create_component_from_model(
model=model.download_paginator, decoder=decoder, config=config, url_base=""
model=model.download_paginator,
decoder=operational_decoder,
config=config,
url_base="",
)
if model.download_paginator
else NoPagination(parameters={})
Expand Down Expand Up @@ -2782,29 +2787,29 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
parameters={},
)

decoder = (
operational_decoder = (
Comment thread
bazarnov marked this conversation as resolved.
Outdated
self._create_component_from_model(model=model.decoder, config=config)
if model.decoder
else JsonDecoder(parameters={})
)
record_selector = self._create_component_from_model(
model=model.record_selector,
config=config,
decoder=decoder,
decoder=operational_decoder,
name=name,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
)
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
creation_requester = self._create_component_from_model(
model=model.creation_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job creation - {name}",
)
polling_requester = self._create_component_from_model(
model=model.polling_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job polling - {name}",
)
Expand Down Expand Up @@ -2839,7 +2844,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
abort_requester = (
self._create_component_from_model(
model=model.abort_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job abort - {name}",
)
Expand All @@ -2849,7 +2854,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
delete_requester = (
self._create_component_from_model(
model=model.delete_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job delete - {name}",
)
Expand All @@ -2859,18 +2864,21 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
download_target_requester = (
self._create_component_from_model(
model=model.download_target_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job extract_url - {name}",
)
if model.download_target_requester
else None
)
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
model=model.status_extractor, decoder=operational_decoder, config=config, name=name
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor, decoder=decoder, config=config, name=name
model=model.download_target_extractor,
decoder=operational_decoder,
config=config,
name=name,
)
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
creation_requester=creation_requester,
Expand Down
40 changes: 33 additions & 7 deletions airbyte_cdk/sources/declarative/requesters/http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
)
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
Expand All @@ -26,7 +28,10 @@
from airbyte_cdk.sources.streams.http import HttpClient
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context
from airbyte_cdk.utils.mapping_helpers import (
combine_mappings,
get_interpolation_context,
)


@dataclass
Expand Down Expand Up @@ -155,7 +160,9 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._request_options_provider.get_request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_headers(
Expand All @@ -166,7 +173,9 @@ def get_request_headers(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._request_options_provider.get_request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

# fixing request options provider types has a lot of dependencies
Expand Down Expand Up @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
return self._request_options_provider.get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

@property
Expand Down Expand Up @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str:
path (str): The path to join with the base URL.

Returns:
str: The concatenated URL with the trailing slash (if any) removed.
str: The resulting joined URL.

Note:
Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
- If the path is an empty string or None, the method returns the base URL with any trailing slash removed.

Example:
1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
3) _join_url("https://example.com/api/", "") >> 'https://example.com/api'
4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
"""
return urljoin(url_base, path).rstrip("/")

# return a full-url if provided directly from interpolation context
if path == EmptyString or path is None:
return url_base.rstrip("/")

return urljoin(url_base, path)

def send_request(
self,
Expand Down
Loading