Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1464,7 +1464,7 @@ definitions:
properties:
type:
type: string
enum: [ FileUploader ]
enum: [FileUploader]
requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API.
anyOf:
Expand Down Expand Up @@ -1978,6 +1978,10 @@ definitions:
- "$ref": "#/definitions/SelectiveAuthenticator"
- "$ref": "#/definitions/CustomAuthenticator"
- "$ref": "#/definitions/LegacySessionTokenAuthenticator"
fetch_properties_from_endpoint:
Comment thread
brianjlai marked this conversation as resolved.
title: Fetch Properties from Endpoint
description: Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.
"$ref": "#/definitions/PropertiesFromEndpoint"
request_body_data:
title: Request Body Payload (Non-JSON)
description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.
Expand Down Expand Up @@ -2370,7 +2374,7 @@ definitions:
properties:
type:
type: string
enum: [ KeyTransformation ]
enum: [KeyTransformation]
prefix:
title: Key Prefix
description: Prefix to add for object keys. If not provided original keys remain unchanged.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2252,6 +2254,11 @@ class HttpRequester(BaseModel):
description="Authentication method to use for requests sent to the API.",
title="Authenticator",
)
fetch_properties_from_endpoint: Optional[PropertiesFromEndpoint] = Field(
None,
description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.",
title="Fetch Properties from Endpoint",
)
request_body_data: Optional[Union[Dict[str, str], str]] = Field(
None,
description="Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2945,16 +2945,19 @@ def create_simple_retriever(

query_properties: Optional[QueryProperties] = None
query_properties_key: Optional[str] = None
if (
hasattr(model.requester, "request_parameters")
and model.requester.request_parameters
and isinstance(model.requester.request_parameters, Mapping)
):
if self._query_properties_in_request_parameters(model.requester):
# It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple
# places instead of default to request_parameters which isn't clearly documented
if (
hasattr(model.requester, "fetch_properties_from_endpoint")
and model.requester.fetch_properties_from_endpoint
):
raise ValueError(
f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters"
)

query_properties_definitions = []
for key, request_parameter in model.requester.request_parameters.items():
# When translating JSON schema into Pydantic models, enforcing types for arrays containing both
# concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any].
# This adds the extra validation that we couldn't get for free in Pydantic model generation
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters()
if isinstance(request_parameter, QueryPropertiesModel):
query_properties_key = key
query_properties_definitions.append(request_parameter)
Expand All @@ -2968,6 +2971,21 @@ def create_simple_retriever(
query_properties = self._create_component_from_model(
model=query_properties_definitions[0], config=config
)
elif (
hasattr(model.requester, "fetch_properties_from_endpoint")
and model.requester.fetch_properties_from_endpoint
Comment thread
brianjlai marked this conversation as resolved.
):
query_properties_definition = QueryPropertiesModel(
type="QueryProperties",
property_list=model.requester.fetch_properties_from_endpoint,
always_include_properties=None,
property_chunking=None,
) # type: ignore # $parameters has a default value

query_properties = self.create_query_properties(
model=query_properties_definition,
config=config,
)

requester = self._create_component_from_model(
model=model.requester,
Expand Down Expand Up @@ -3087,6 +3105,19 @@ def create_simple_retriever(
parameters=model.parameters or {},
)

@staticmethod
def _query_properties_in_request_parameters(
requester: Union[HttpRequesterModel, CustomRequesterModel],
) -> bool:
if not hasattr(requester, "request_parameters"):
return False
request_parameters = requester.request_parameters
if request_parameters and isinstance(request_parameters, Mapping):
for request_parameter in request_parameters.values():
if isinstance(request_parameter, QueryPropertiesModel):
return True
return False

@staticmethod
def _remove_query_properties(
request_parameters: Mapping[str, Union[str, QueryPropertiesModel]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,3 @@ def get_request_property_chunks(
)
else:
yield list(fields)

# delete later, but leaving this to keep the discussion thread on the PR from getting hidden
def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice))
try:
next(property_chunks)
next(property_chunks)
return True
except StopIteration:
return False
Original file line number Diff line number Diff line change
Expand Up @@ -4119,7 +4119,7 @@ def test_simple_retriever_with_query_properties():
assert request_options_provider.request_parameters.get("nonary") == "{{config['nonary'] }}"


def test_simple_retriever_with_properties_from_endpoint():
def test_simple_retriever_with_request_parameters_properties_from_endpoint():
content = """
selector:
type: RecordSelector
Expand Down Expand Up @@ -4216,6 +4216,88 @@ def test_simple_retriever_with_properties_from_endpoint():
assert property_chunking.property_limit == 3


def test_simple_retriever_with_requester_properties_from_endpoint():
content = """
selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["extractor_path"]
record_filter:
type: RecordFilter
condition: "{{ record['id'] > stream_state['id'] }}"
requester:
type: HttpRequester
name: "{{ parameters['name'] }}"
url_base: "https://api.hubapi.com"
http_method: "GET"
path: "adAnalytics"
fetch_properties_from_endpoint:
type: PropertiesFromEndpoint
property_field_path: [ "name" ]
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://api.hubapi.com
path: "/properties/v2/dynamics/properties"
http_method: GET
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
dynamic_properties_stream:
type: DeclarativeStream
incremental_sync:
type: DatetimeBasedCursor
$parameters:
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
start_datetime: "{{ config['start_time'] }}"
cursor_field: "created"
retriever:
type: SimpleRetriever
name: "{{ parameters['name'] }}"
requester:
$ref: "#/requester"
record_selector:
$ref: "#/selector"
$parameters:
name: "dynamics"
"""

parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
stream_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["dynamic_properties_stream"], {}
)

stream = factory.create_component(
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
)

query_properties = stream.retriever.additional_query_properties
assert isinstance(query_properties, QueryProperties)
assert query_properties.always_include_properties is None
assert query_properties.property_chunking is None

properties_from_endpoint = stream.retriever.additional_query_properties.property_list
assert isinstance(properties_from_endpoint, PropertiesFromEndpoint)
assert properties_from_endpoint.property_field_path == ["name"]

properties_from_endpoint_retriever = (
stream.retriever.additional_query_properties.property_list.retriever
)
assert isinstance(properties_from_endpoint_retriever, SimpleRetriever)

properties_from_endpoint_requester = (
stream.retriever.additional_query_properties.property_list.retriever.requester
)
assert isinstance(properties_from_endpoint_requester, HttpRequester)
assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com"
assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties"


def test_request_parameters_raise_error_if_not_of_type_query_properties():
content = """
selector:
Expand Down Expand Up @@ -4360,6 +4442,89 @@ def test_create_simple_retriever_raise_error_if_multiple_request_properties():
)


def test_create_simple_retriever_raise_error_properties_from_endpoint_defined_multiple_times():
content = """
selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["extractor_path"]
record_filter:
type: RecordFilter
condition: "{{ record['id'] > stream_state['id'] }}"
requester:
type: HttpRequester
name: "{{ parameters['name'] }}"
url_base: "https://api.linkedin.com/rest/"
http_method: "GET"
path: "adAnalytics"
fetch_properties_from_endpoint:
type: PropertiesFromEndpoint
property_field_path: [ "name" ]
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: https://api.hubapi.com
path: "/properties/v2/dynamics/properties"
http_method: GET
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
request_parameters:
properties:
type: QueryProperties
property_list:
- first_name
- last_name
- status
- organization
- created_at
always_include_properties:
- id
property_chunking:
type: PropertyChunking
property_limit_type: property_count
property_limit: 3
record_merge_strategy:
type: GroupByKeyMergeStrategy
key: ["id"]
nonary: "{{config['nonary'] }}"
analytics_stream:
type: DeclarativeStream
incremental_sync:
type: DatetimeBasedCursor
$parameters:
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
start_datetime: "{{ config['start_time'] }}"
cursor_field: "created"
retriever:
type: SimpleRetriever
name: "{{ parameters['name'] }}"
requester:
$ref: "#/requester"
record_selector:
$ref: "#/selector"
$parameters:
name: "analytics"
"""

parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
stream_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["analytics_stream"], {}
)

with pytest.raises(ValueError):
factory.create_component(
model_type=DeclarativeStreamModel,
component_definition=stream_manifest,
config=input_config,
)


def test_create_property_chunking_characters():
property_chunking_model = {
"type": "PropertyChunking",
Expand Down
Loading