diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 7ceb19f14..914c90e59 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -26,6 +26,7 @@ from isodate import parse_duration from pydantic.v1 import BaseModel +from requests import Response from airbyte_cdk.connector_builder.models import ( LogMessage as ConnectorBuilderLogMessage, @@ -529,6 +530,7 @@ from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import ( KeysToSnakeCaseTransformation, ) +from airbyte_cdk.sources.http_logger import format_http_message from airbyte_cdk.sources.message import ( InMemoryMessageRepository, LogAppenderMessageRepositoryDecorator, @@ -2390,15 +2392,24 @@ def create_dynamic_schema_loader( schema_transformations.append( self._create_component_from_model(model=transformation_model, config=config) ) - + name = "dynamic_properties" retriever = self._create_component_from_model( model=model.retriever, config=config, - name="dynamic_properties", + name=name, primary_key=None, stream_slicer=combined_slicers, transformations=[], use_cache=True, + log_formatter=( + lambda response: format_http_message( + response, + f"Schema loader '{name}' request", + f"Request performed in order to extract schema.", + name, + is_auxiliary=True, + ) + ), ) schema_type_identifier = self._create_component_from_model( model.schema_type_identifier, config=config, parameters=model.parameters or {} @@ -2985,6 +2996,7 @@ def create_simple_retriever( ] ] = None, use_cache: Optional[bool] = None, + log_formatter: Optional[Callable[[Response], Any]] = None, **kwargs: Any, ) -> SimpleRetriever: def _get_url() -> str: @@ -3161,6 +3173,7 @@ def _get_url() -> str: config=config, maximum_number_of_slices=self._limit_slices_fetched or 5, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, + log_formatter=log_formatter, parameters=model.parameters or {}, ) return SimpleRetriever( diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 3aa3ffb87..5bd68e5bd 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -10,7 +10,6 @@ from typing import ( Any, Callable, - Dict, Iterable, List, Mapping, @@ -93,6 +92,7 @@ class SimpleRetriever(Retriever): cursor: Optional[DeclarativeCursor] = None ignore_stream_slicer_parameters_on_paginated_requests: bool = False additional_query_properties: Optional[QueryProperties] = None + log_formatter: Optional[Callable[[requests.Response], Any]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._paginator = self.paginator or NoPagination(parameters=parameters) @@ -353,6 +353,7 @@ def _fetch_next_page( stream_slice=stream_slice, next_page_token=next_page_token, ), + log_formatter=self.log_formatter, ) # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. @@ -655,6 +656,19 @@ class SimpleRetrieverTestReadDecorator(SimpleRetriever): def __post_init__(self, options: Mapping[str, Any]) -> None: super().__post_init__(options) + self.log_formatter = ( + ( + lambda response: format_http_message( + response, + f"Stream '{self.name}' request", + f"Request performed in order to extract records for stream '{self.name}'", + self.name, + ) + ) + if not self.log_formatter + else self.log_formatter + ) + if self.maximum_number_of_slices and self.maximum_number_of_slices < 1: raise ValueError( f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}" @@ -664,49 +678,6 @@ def __post_init__(self, options: Mapping[str, Any]) -> None: def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore return islice(super().stream_slices(), self.maximum_number_of_slices) - def _fetch_next_page( - self, - stream_state: Mapping[str, Any], - stream_slice: StreamSlice, - next_page_token: Optional[Mapping[str, Any]] = None, - ) -> Optional[requests.Response]: - return self.requester.send_request( - path=self._paginator_path( - next_page_token=next_page_token, - stream_state=stream_state, - stream_slice=stream_slice, - ), - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - request_headers=self._request_headers( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), - request_params=self._request_params( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), - request_body_data=self._request_body_data( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), - request_body_json=self._request_body_json( - stream_state=stream_state, - stream_slice=stream_slice, - next_page_token=next_page_token, - ), - log_formatter=lambda response: format_http_message( - response, - f"Stream '{self.name}' request", - f"Request performed in order to extract records for stream '{self.name}'", - self.name, - ), - ) - @deprecated( "This class is experimental. Use at your own risk.", diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index ff09df2ca..76a94aec3 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -921,11 +921,8 @@ def test_emit_log_request_response_messages(mocker): stream_state={}, stream_slice=StreamSlice(cursor_slice={}, partition={}) ) - assert requester.send_request.call_args_list[0][1]["log_formatter"] is not None - assert ( - requester.send_request.call_args_list[0][1]["log_formatter"](response) - == format_http_message_mock.return_value - ) + assert retriever.log_formatter is not None + assert retriever.log_formatter(response) == format_http_message_mock.return_value def test_retriever_last_page_size_for_page_increment():