Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from isodate import parse_duration
from pydantic.v1 import BaseModel
from requests import Response

from airbyte_cdk.connector_builder.models import (
LogMessage as ConnectorBuilderLogMessage,
Expand Down Expand Up @@ -529,6 +530,7 @@
from airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation import (
KeysToSnakeCaseTransformation,
)
from airbyte_cdk.sources.http_logger import format_http_message
from airbyte_cdk.sources.message import (
InMemoryMessageRepository,
LogAppenderMessageRepositoryDecorator,
Expand Down Expand Up @@ -2390,15 +2392,24 @@ def create_dynamic_schema_loader(
schema_transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

name = "dynamic_properties"
retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name="dynamic_properties",
name=name,
primary_key=None,
stream_slicer=combined_slicers,
transformations=[],
use_cache=True,
Comment thread
lazebnyi marked this conversation as resolved.
log_formatter=(
lambda response: format_http_message(
response,
f"Schema loader '{name}' request",
f"Request performed in order to extract schema.",
name,
is_auxiliary=True,
)
),
)
schema_type_identifier = self._create_component_from_model(
model.schema_type_identifier, config=config, parameters=model.parameters or {}
Expand Down Expand Up @@ -2985,6 +2996,7 @@ def create_simple_retriever(
]
] = None,
use_cache: Optional[bool] = None,
log_formatter: Optional[Callable[[Response], Any]] = None,
**kwargs: Any,
) -> SimpleRetriever:
def _get_url() -> str:
Expand Down Expand Up @@ -3161,6 +3173,7 @@ def _get_url() -> str:
config=config,
maximum_number_of_slices=self._limit_slices_fetched or 5,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
log_formatter=log_formatter,
parameters=model.parameters or {},
)
return SimpleRetriever(
Expand Down
59 changes: 15 additions & 44 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Expand Down Expand Up @@ -93,6 +92,7 @@ class SimpleRetriever(Retriever):
cursor: Optional[DeclarativeCursor] = None
ignore_stream_slicer_parameters_on_paginated_requests: bool = False
additional_query_properties: Optional[QueryProperties] = None
log_formatter: Optional[Callable[[requests.Response], Any]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._paginator = self.paginator or NoPagination(parameters=parameters)
Expand Down Expand Up @@ -353,6 +353,7 @@ def _fetch_next_page(
stream_slice=stream_slice,
next_page_token=next_page_token,
),
log_formatter=self.log_formatter,
)

# This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
Expand Down Expand Up @@ -655,6 +656,19 @@ class SimpleRetrieverTestReadDecorator(SimpleRetriever):

def __post_init__(self, options: Mapping[str, Any]) -> None:
super().__post_init__(options)
self.log_formatter = (
(
lambda response: format_http_message(
response,
f"Stream '{self.name}' request",
f"Request performed in order to extract records for stream '{self.name}'",
self.name,
)
)
if not self.log_formatter
else self.log_formatter
)

if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
raise ValueError(
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
Expand All @@ -664,49 +678,6 @@ def __post_init__(self, options: Mapping[str, Any]) -> None:
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
return islice(super().stream_slices(), self.maximum_number_of_slices)

def _fetch_next_page(
self,
stream_state: Mapping[str, Any],
stream_slice: StreamSlice,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[requests.Response]:
return self.requester.send_request(
path=self._paginator_path(
next_page_token=next_page_token,
stream_state=stream_state,
stream_slice=stream_slice,
),
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
request_headers=self._request_headers(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_params=self._request_params(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_body_data=self._request_body_data(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
request_body_json=self._request_body_json(
stream_state=stream_state,
stream_slice=stream_slice,
next_page_token=next_page_token,
),
log_formatter=lambda response: format_http_message(
response,
f"Stream '{self.name}' request",
f"Request performed in order to extract records for stream '{self.name}'",
self.name,
),
)


@deprecated(
"This class is experimental. Use at your own risk.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,11 +921,8 @@ def test_emit_log_request_response_messages(mocker):
stream_state={}, stream_slice=StreamSlice(cursor_slice={}, partition={})
)

assert requester.send_request.call_args_list[0][1]["log_formatter"] is not None
assert (
requester.send_request.call_args_list[0][1]["log_formatter"](response)
== format_http_message_mock.return_value
)
assert retriever.log_formatter is not None
assert retriever.log_formatter(response) == format_http_message_mock.return_value


def test_retriever_last_page_size_for_page_increment():
Expand Down
Loading