Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(
component_factory
if component_factory
else ModelToComponentFactory(
emit_connector_builder_messages,
emit_connector_builder_messages=emit_connector_builder_messages,
Comment thread
aldogonzalez8 marked this conversation as resolved.
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3299,23 +3299,10 @@ def _get_url() -> str:
parameters=model.parameters or {},
)

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
name=name,
paginator=paginator,
primary_key=primary_key,
requester=requester,
record_selector=record_selector,
stream_slicer=stream_slicer,
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
maximum_number_of_slices=self._limit_slices_fetched or 5,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
log_formatter=log_formatter,
parameters=model.parameters or {},
)
return SimpleRetriever(
test_read_enabled = self._limit_slices_fetched or self._emit_connector_builder_messages
maximum_number_of_slices = self._limit_slices_fetched or 5 if test_read_enabled else 0
retriever_log_formatter = log_formatter if test_read_enabled else None
return SimpleRetrieverTestReadDecorator(
name=name,
paginator=paginator,
primary_key=primary_key,
Expand All @@ -3325,8 +3312,11 @@ def _get_url() -> str:
request_option_provider=request_options_provider,
cursor=cursor,
config=config,
maximum_number_of_slices=maximum_number_of_slices,
emit_connector_builder_messages=self._emit_connector_builder_messages,
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
additional_query_properties=query_properties,
log_formatter=retriever_log_formatter,
parameters=model.parameters or {},
)

Expand Down Expand Up @@ -3440,28 +3430,19 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
if model.download_paginator
else NoPagination(parameters={})
)
maximum_number_of_slices = self._limit_slices_fetched or 5
test_read_enabled = self._limit_slices_fetched or self._emit_connector_builder_messages
maximum_number_of_slices = self._limit_slices_fetched or 5 if test_read_enabled else 0

if self._limit_slices_fetched or self._emit_connector_builder_messages:
return SimpleRetrieverTestReadDecorator(
requester=download_requester,
record_selector=record_selector,
primary_key=None,
name=job_download_components_name,
paginator=paginator,
config=config,
parameters={},
maximum_number_of_slices=maximum_number_of_slices,
)

return SimpleRetriever(
return SimpleRetrieverTestReadDecorator(
requester=download_requester,
record_selector=record_selector,
primary_key=None,
name=job_download_components_name,
paginator=paginator,
config=config,
parameters={},
maximum_number_of_slices=maximum_number_of_slices,
emit_connector_builder_messages=self._emit_connector_builder_messages,
)

def _get_job_timeout() -> datetime.timedelta:
Expand Down
52 changes: 33 additions & 19 deletions airbyte_cdk/sources/declarative/retrievers/simple_retriever.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,33 +650,47 @@ class SimpleRetrieverTestReadDecorator(SimpleRetriever):
"""
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
slices that are queried throughout a read command.

maximum_number_of_slices must be provided when test read is enabled.
"""

maximum_number_of_slices: int = 5

def __post_init__(self, options: Mapping[str, Any]) -> None:
super().__post_init__(options)
self.log_formatter = (
(
lambda response: format_http_message(
response,
f"Stream '{self.name}' request",
f"Request performed in order to extract records for stream '{self.name}'",
self.name,
maximum_number_of_slices: int = 0
emit_connector_builder_messages: bool = False

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
if self.test_read_enabled():
self.log_formatter = (
(
lambda response: format_http_message(
response,
f"Stream '{self.name}' request",
f"Request performed in order to extract records for stream '{self.name}'",
self.name,
)
)
if not self.log_formatter
else self.log_formatter
)
if not self.log_formatter
else self.log_formatter
)

if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
raise ValueError(
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
)
if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
raise ValueError(
f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
)

def test_read_enabled(self) -> bool:
"""
Indicates whether the retriever is in test read mode.
This is used to limit the number of slices processed during a test read.
"""
return bool(self.maximum_number_of_slices or self.emit_connector_builder_messages)

# stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
return islice(super().stream_slices(), self.maximum_number_of_slices)
if not self.test_read_enabled():
return super().stream_slices()
else:
return islice(super().stream_slices(), self.maximum_number_of_slices)


@deprecated(
Expand Down
243 changes: 243 additions & 0 deletions unit_tests/connector_builder/test_property_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import copy
import json

import freezegun

from airbyte_cdk.connector_builder.connector_builder_handler import (
TestLimits,
)
from airbyte_cdk.connector_builder.main import (
handle_connector_builder_request,
)
from airbyte_cdk.models import (
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStreamState,
ConfiguredAirbyteCatalogSerializer,
Level,
StreamDescriptor,
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
from airbyte_cdk.test.mock_http.response_builder import find_template

BASE_URL = "https://api.apilayer.com/exchangerates_data/"
FREEZE_DATE = "2025-05-23"

PROPERTY_KEY = "test"
PROPERTY_LIST = ["one", "two", "three", "four"]

MANIFEST = {
"version": "6.48.15",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"streams": [
{
"type": "DeclarativeStream",
"name": "Rates",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"path": "/exchangerates_data/{{stream_interval.start_time}}",
"url_base": "https://api.apilayer.com",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"api_token": "{{ config['api_key'] }}",
"inject_into": {
"type": "RequestOption",
"field_name": "apikey",
"inject_into": "header",
},
},
"request_parameters": {
"base": "{{ config['base'] }}",
PROPERTY_KEY: {
"type": "QueryProperties",
"property_list": PROPERTY_LIST,
"property_chunking": {
"type": "PropertyChunking",
"property_limit_type": "property_count",
"property_limit": 2,
},
},
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
},
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"type": "object",
"$schema": "http://json-schema.org/schema#",
"properties": {
"base": {"type": "string"},
"date": {"type": "string"},
"rates": {
"type": "object",
"properties": {"fake_currency": {"type": "number"}},
},
"success": {"type": "boolean"},
"timestamp": {"type": "number"},
"historical": {"type": "boolean"},
},
},
},
"transformations": [],
"incremental_sync": {
"type": "DatetimeBasedCursor",
"step": "P1D",
"cursor_field": "date",
"end_datetime": {
"type": "MinMaxDatetime",
"datetime": "{{ now_utc().strftime('%Y-%m-%dT%H:%M:%SZ') }}",
"datetime_format": "%Y-%m-%dT%H:%M:%SZ",
},
"start_datetime": {
"type": "MinMaxDatetime",
"datetime": "{{ config['start_date'] }}",
"datetime_format": "%Y-%m-%dT%H:%M:%SZ",
},
"datetime_format": "%Y-%m-%d",
"cursor_granularity": "P1D",
"cursor_datetime_formats": ["%Y-%m-%d"],
},
"state_migrations": [],
}
],
"spec": {
"type": "Spec",
"documentation_url": "https://example.org",
"connection_specification": {
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema#",
"required": ["start_date", "api_key", "base"],
"properties": {
"base": {"type": "string", "order": 2, "title": "Base"},
"api_key": {
"type": "string",
"order": 1,
"title": "API Key",
"airbyte_secret": True,
},
"start_date": {
"type": "string",
"order": 0,
"title": "Start date",
"format": "date-time",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
},
},
"additionalProperties": True,
},
},
"metadata": {
"testedStreams": {
"Rates": {
"hasRecords": False,
"streamHash": "4dce031d602258dd3bcc478731d6862a5cdeb70f",
"hasResponse": False,
"primaryKeysAreUnique": False,
"primaryKeysArePresent": False,
"responsesAreSuccessful": False,
}
},
"autoImportSchema": {"Rates": True},
},
"dynamic_streams": [],
}

_stream_name = "Rates"

_A_STATE = [
AirbyteStateMessage(
type="STREAM",
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(name=_stream_name),
stream_state=AirbyteStateBlob({"key": "value"}),
),
)
]

TEST_READ_CONFIG = {
"__injected_declarative_manifest": MANIFEST,
"__command": "test_read",
"__test_read_config": {"max_pages_per_slice": 2, "max_slices": 5, "max_records": 10},
}

CONFIGURED_CATALOG = {
"streams": [
{
"stream": {
"name": _stream_name,
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {},
},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": False,
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
]
}


@freezegun.freeze_time(f"{FREEZE_DATE}T00:00:00Z")
def test_read():
conversion_base = "USD"
config = copy.deepcopy(TEST_READ_CONFIG)
config["start_date"] = f"{FREEZE_DATE}T00:00:00Z"
config["base"] = conversion_base
config["api_key"] = "test_api_key"

stream_url = f"{BASE_URL}{FREEZE_DATE}?base={conversion_base}&{PROPERTY_KEY}="

with HttpMocker() as http_mocker:
source = ManifestDeclarativeSource(
source_config=MANIFEST, emit_connector_builder_messages=True
)
limits = TestLimits()

http_mocker.get(
HttpRequest(url=f"{stream_url}{PROPERTY_LIST[0]}%2C{PROPERTY_LIST[1]}"),
HttpResponse(
json.dumps(find_template("declarative/property_chunking/rates_one_two", __file__)),
200,
),
)
http_mocker.get(
HttpRequest(url=f"{stream_url}{PROPERTY_LIST[2]}%2C{PROPERTY_LIST[3]}"),
HttpResponse(
json.dumps(
find_template("declarative/property_chunking/rates_three_four", __file__)
),
200,
),
)
output_record = handle_connector_builder_request(
source,
"test_read",
config,
ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG),
_A_STATE,
limits,
)
# for connector build we get each record in a single page
assert len(output_record.record.data["slices"][0]["pages"]) == 2
for current_log in output_record.record.data["logs"]:
assert not "Something went wrong in the connector" in current_log["message"]
assert not current_log["internal_message"]
assert not current_log["level"] == Level.ERROR
assert not current_log["stacktrace"]
Loading
Loading