Skip to content
Merged
25 changes: 23 additions & 2 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,36 @@ def parse(
class GzipParser(Parser):
inner_parser: Parser

def _reset_reader_pointer(self, data: BufferedIOBase) -> None:
"""
Reset the reader pointer to the beginning of the data.

Note:
- This is necessary because the gzip decompression will consume the data stream.
"""
data.seek(0)
Comment thread
bazarnov marked this conversation as resolved.
Outdated

def parse(
self,
data: BufferedIOBase,
) -> Generator[MutableMapping[str, Any], None, None]:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.

IMPORTANT:
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.

Note:
- The data is not decoded by default.
"""
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)

try:
Comment thread
bazarnov marked this conversation as resolved.
Outdated
with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
except gzip.BadGzipFile:
logger.warning(f"GzipParser(): Received non-gzipped data, parsing the data as is.")
self._reset_reader_pointer(data)
yield from self.inner_parser.parse(data)


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor

EMPTY_STR: str = ""
DEFAULT_ENCODING: str = "utf-8"
DOWNLOAD_CHUNK_SIZE: int = 1024 * 10

Expand Down Expand Up @@ -136,7 +135,6 @@ def _read_with_chunks(
"""

try:
# TODO: Add support for other file types, like `json`, with `pd.read_json()`
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,10 +2193,12 @@ def create_csv_decoder(self, model: CsvDecoderModel, config: Config, **kwargs: A
stream_response=False if self._emit_connector_builder_messages else True,
)

@staticmethod
def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> Decoder:
def create_jsonl_decoder(
self, model: JsonlDecoderModel, config: Config, **kwargs: Any
) -> Decoder:
return CompositeRawDecoder(
parser=ModelToComponentFactory._get_parser(model, config), stream_response=True
parser=ModelToComponentFactory._get_parser(model, config),
stream_response=False if self._emit_connector_builder_messages else True,
)

def create_gzip_decoder(
Expand Down Expand Up @@ -2753,7 +2755,10 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
)
paginator = (
self._create_component_from_model(
model=model.download_paginator, decoder=decoder, config=config, url_base=""
model=model.download_paginator,
decoder=operational_decoder,
config=config,
url_base="",
)
if model.download_paginator
else NoPagination(parameters={})
Expand Down Expand Up @@ -2782,29 +2787,29 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
parameters={},
)

decoder = (
operational_decoder = (
Comment thread
bazarnov marked this conversation as resolved.
Outdated
self._create_component_from_model(model=model.decoder, config=config)
if model.decoder
else JsonDecoder(parameters={})
)
record_selector = self._create_component_from_model(
model=model.record_selector,
config=config,
decoder=decoder,
decoder=operational_decoder,
name=name,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
)
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
creation_requester = self._create_component_from_model(
model=model.creation_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job creation - {name}",
)
polling_requester = self._create_component_from_model(
model=model.polling_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job polling - {name}",
)
Expand Down Expand Up @@ -2839,7 +2844,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
abort_requester = (
self._create_component_from_model(
model=model.abort_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job abort - {name}",
)
Expand All @@ -2849,7 +2854,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
delete_requester = (
self._create_component_from_model(
model=model.delete_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job delete - {name}",
)
Expand All @@ -2859,18 +2864,21 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
download_target_requester = (
self._create_component_from_model(
model=model.download_target_requester,
decoder=decoder,
decoder=operational_decoder,
config=config,
name=f"job extract_url - {name}",
)
if model.download_target_requester
else None
)
status_extractor = self._create_component_from_model(
model=model.status_extractor, decoder=decoder, config=config, name=name
model=model.status_extractor, decoder=operational_decoder, config=config, name=name
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor, decoder=decoder, config=config, name=name
model=model.download_target_extractor,
decoder=operational_decoder,
config=config,
name=name,
)
job_repository: AsyncJobRepository = AsyncHttpJobRepository(
creation_requester=creation_requester,
Expand Down
40 changes: 33 additions & 7 deletions airbyte_cdk/sources/declarative/requesters/http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
)
from airbyte_cdk.sources.declarative.decoders import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
Expand All @@ -26,7 +28,10 @@
from airbyte_cdk.sources.streams.http import HttpClient
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.types import Config, EmptyString, StreamSlice, StreamState
from airbyte_cdk.utils.mapping_helpers import combine_mappings, get_interpolation_context
from airbyte_cdk.utils.mapping_helpers import (
combine_mappings,
get_interpolation_context,
)


@dataclass
Expand Down Expand Up @@ -155,7 +160,9 @@ def get_request_params(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return self._request_options_provider.get_request_params(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

def get_request_headers(
Expand All @@ -166,7 +173,9 @@ def get_request_headers(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._request_options_provider.get_request_headers(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

# fixing request options provider types has a lot of dependencies
Expand Down Expand Up @@ -195,7 +204,9 @@ def get_request_body_json( # type: ignore
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping[str, Any]]:
return self._request_options_provider.get_request_body_json(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
)

@property
Expand Down Expand Up @@ -350,9 +361,24 @@ def _join_url(cls, url_base: str, path: str) -> str:
path (str): The path to join with the base URL.

Returns:
str: The concatenated URL with the trailing slash (if any) removed.
str: The resulting joined URL.

Note:
Related issue: https://github.com/airbytehq/airbyte-internal-issues/issues/11869
- If the path is an empty string or None, the method returns the base URL with any trailing slash removed.

Example:
1) _join_url("https://example.com/api/", "endpoint") >> 'https://example.com/api/endpoint'
2) _join_url("https://example.com/api", "/endpoint") >> 'https://example.com/api/endpoint'
3) _join_url("https://example.com/api/", "") >> 'https://example.com/api'
4) _join_url("https://example.com/api", None) >> 'https://example.com/api'
"""
return urljoin(url_base, path).rstrip("/")

# return a full-url if provided directly from interpolation context
if path == EmptyString or path is None:
return url_base.rstrip("/")

return urljoin(url_base, path)

def send_request(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ def test_send_request_stream_slice_next_page_token():
"test_trailing_slash_on_path",
"https://airbyte.io",
"/my_endpoint/",
"https://airbyte.io/my_endpoint",
"https://airbyte.io/my_endpoint/",
),
(
"test_nested_path_no_leading_slash",
Expand Down