diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 153a5d105..bf677cc3e 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1912,6 +1912,10 @@ definitions: - ["data", "records"] - ["data", "{{ parameters.name }}"] - ["data", "*", "record"] + record_expander: + title: Record Expander + description: Optional component to expand records by extracting items from nested array fields. + "$ref": "#/definitions/RecordExpander" $parameters: type: object additionalProperties: true @@ -1928,6 +1932,46 @@ definitions: $parameters: type: object additionalProperties: true + RecordExpander: + title: Record Expander + description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays. + type: object + required: + - type + - expand_records_from_field + properties: + type: + type: string + enum: [RecordExpander] + expand_records_from_field: + title: Expand Records From Field + description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays. + type: array + items: + type: string + interpolation_context: + - config + examples: + - ["lines", "data"] + - ["items"] + - ["nested", "array"] + - ["sections", "*", "items"] + remain_original_record: + title: Remain Original Record + description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false. + type: boolean + default: false + on_no_records: + title: On No Records + description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged. + type: string + enum: + - skip + - emit_parent + default: skip + $parameters: + type: object + additionalProperties: true ExponentialBackoffStrategy: title: Exponential Backoff description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count. diff --git a/airbyte_cdk/sources/declarative/expanders/__init__.py b/airbyte_cdk/sources/declarative/expanders/__init__.py new file mode 100644 index 000000000..1b10ee82d --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/__init__.py @@ -0,0 +1,7 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.expanders.record_expander import OnNoRecords, RecordExpander + +__all__ = ["OnNoRecords", "RecordExpander"] diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py new file mode 100644 index 000000000..6b489243d --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -0,0 +1,128 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +import copy +from dataclasses import InitVar, dataclass +from enum import Enum +from typing import Any, Iterable, Mapping, MutableMapping, Sequence + +import dpath + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.types import Config + + +class OnNoRecords(Enum): + """ + Behavior when record expansion produces no records. + """ + + skip = "skip" + emit_parent = "emit_parent" + + +@dataclass +class RecordExpander: + """Expands records by extracting items from a nested array field. + + When configured, this component extracts items from a specified nested array path + within each record and emits each item as a separate record. Set `remain_original_record: true` + to embed the full parent record under `original_record` in each expanded item when you need + downstream transformations to access parent context. + + The expand_records_from_field path supports wildcards (*) for matching multiple arrays. + When wildcards are used, items from all matched arrays are extracted and emitted. + + Examples of instantiating this component: + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "sections" + - "*" + - "items" + on_no_records: emit_parent + ``` + + Attributes: + expand_records_from_field: Path to a nested array field within each record. + Items from this array will be extracted and emitted as separate records. + Supports wildcards (*). + remain_original_record: If True, each expanded record will include the original + parent record in an "original_record" field. Defaults to False. + on_no_records: Behavior when expansion produces no records. "skip" (default) + emits nothing. "emit_parent" emits the original parent record unchanged. + config: The user-provided configuration as specified by the source's spec. + """ + + expand_records_from_field: Sequence[str] + config: Config + parameters: InitVar[Mapping[str, Any]] + remain_original_record: bool = False + on_no_records: OnNoRecords = OnNoRecords.skip + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._expand_path: list[InterpolatedString] = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.expand_records_from_field + ] + + def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: + """Expand a record by extracting items from a nested array field.""" + if not isinstance(record, Mapping): + # If the input isn't a mapping, expansion can't proceed; yield as-is. + yield record + return + + if not self._expand_path: + yield record + return + + parent_record = record + expand_path = [path.eval(self.config) for path in self._expand_path] + expanded_any = False + + try: + extracted_values = dpath.values(parent_record, expand_path) + except KeyError: + extracted_values = [] + + for extracted in extracted_values: + if not isinstance(extracted, list): + continue + items = extracted + for item in items: + if isinstance(item, dict): + expanded_record = dict(item) + self._apply_parent_context(parent_record, expanded_record) + yield expanded_record + expanded_any = True + else: + if self.remain_original_record: + yield { + "value": item, + "original_record": copy.deepcopy(parent_record), + } + else: + yield item + expanded_any = True + + if not expanded_any and self.on_no_records == OnNoRecords.emit_parent: + yield parent_record + + def _apply_parent_context( + self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] + ) -> None: + """Apply parent context to a child record.""" + if self.remain_original_record: + child_record["original_record"] = copy.deepcopy(parent_record) diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 9c97773e3..1d8831056 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -3,12 +3,13 @@ # from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, List, Mapping, MutableMapping, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath import requests from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.types import Config @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor): If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned. + Optionally, records can be expanded by providing a RecordExpander component. + When record_expander is configured, each extracted record is passed through the + expander which extracts items from nested array fields and emits each item as a + separate record. + Examples of instantiating this transform: ``` extractor: @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor): field_path: [] ``` + ``` + extractor: + type: DpathExtractor + field_path: + - "data" + - "object" + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + Attributes: field_path (Union[InterpolatedString, str]): Path to the field that should be extracted config (Config): The user-provided configuration as specified by the source's spec decoder (Decoder): The decoder responsible to transfom the response in a Mapping + record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields """ field_path: List[Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) + record_expander: Optional[RecordExpander] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): - yield from extracted + if not self.record_expander: + yield from extracted + else: + for record in extracted: + yield from self.record_expander.expand_record(record) elif extracted: - yield extracted + if self.record_expander: + yield from self.record_expander.expand_record(extracted) + else: + yield extracted else: yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0f5c0f1f9..f535e6a3e 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -482,27 +482,16 @@ class Config: ) -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( - ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', - examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], - ], - title="Field Path", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class ResponseToFileExtractor(BaseModel): type: Literal["ResponseToFileExtractor"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class OnNoRecords(Enum): + skip = "skip" + emit_parent = "emit_parent" + + class ExponentialBackoffStrategy(BaseModel): type: Literal["ExponentialBackoffStrategy"] factor: Optional[Union[float, str]] = Field( @@ -2034,6 +2023,32 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( + ..., + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + examples=[ + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], + ], + title="Expand Records From Field", + ) + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) + on_no_records: Optional[OnNoRecords] = Field( + OnNoRecords.skip, + description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', + title="On No Records", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestApiKeyAuthenticator(BaseModel): type: Literal["ApiKey"] inject_into: RequestOption = Field( @@ -2092,27 +2107,6 @@ class ListPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class RecordSelector(BaseModel): - type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] - record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( - None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", - ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - None, - description="Responsible for normalization according to the schema.", - title="Schema Normalization", - ) - transform_before_filtering: Optional[bool] = Field( - None, - description="If true, transformation will be applied before record filtering.", - title="Transform Before Filtering", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class PaginationReset(BaseModel): type: Literal["PaginationReset"] action: Action1 @@ -2233,6 +2227,27 @@ class Config: ) +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( + ..., + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -2245,6 +2260,27 @@ class Config: ) +class RecordSelector(BaseModel): + type: Literal["RecordSelector"] + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( + None, + description="Responsible for filtering records to be emitted by the Source.", + title="Record Filter", + ) + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + None, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) + transform_before_filtering: Optional[bool] = Field( + None, + description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ConfigMigration(BaseModel): type: Literal["ConfigMigration"] description: Optional[str] = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 3ed86bf06..00b1a18ff 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -42,6 +42,7 @@ "DefaultPaginator.page_size_option": "RequestOption", # DpathExtractor "DpathExtractor.decoder": "JsonDecoder", + "DpathExtractor.record_expander": "RecordExpander", # HttpRequester "HttpRequester.error_handler": "DefaultErrorHandler", # ListPartitionRouter diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 867c93a22..dc3c098d3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -94,6 +94,10 @@ JsonParser, Parser, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import ( + OnNoRecords, + RecordExpander, +) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, RecordFilter, @@ -391,6 +395,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( Rate as RateModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + RecordExpander as RecordExpanderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -778,6 +785,7 @@ def _init_mappings(self) -> None: PropertiesFromEndpointModel: self.create_properties_from_endpoint, PropertyChunkingModel: self.create_property_chunking, QueryPropertiesModel: self.create_query_properties, + RecordExpanderModel: self.create_record_expander, RecordFilterModel: self.create_record_filter, RecordSelectorModel: self.create_record_selector, RemoveFieldsModel: self.create_remove_fields, @@ -2314,11 +2322,36 @@ def create_dpath_extractor( else: decoder_to_use = JsonDecoder(parameters={}) model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + + record_expander = None + if model.record_expander: + record_expander = self._create_component_from_model( + model=model.record_expander, + config=config, + ) + return DpathExtractor( decoder=decoder_to_use, field_path=model_field_path, config=config, parameters=model.parameters or {}, + record_expander=record_expander, + ) + + def create_record_expander( + self, + model: RecordExpanderModel, + config: Config, + **kwargs: Any, + ) -> RecordExpander: + return RecordExpander( + expand_records_from_field=model.expand_records_from_field, + config=config, + parameters=model.parameters or {}, + remain_original_record=model.remain_original_record or False, + on_no_records=OnNoRecords(model.on_no_records.value) + if model.on_no_records + else OnNoRecords.skip, ) @staticmethod diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 05e586592..94fb72743 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -14,6 +14,7 @@ IterableDecoder, JsonDecoder, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import OnNoRecords, RecordExpander from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor config = {"field": "record_array"} @@ -121,3 +122,375 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco actual_records = list(extractor.extract_records(response)) assert actual_records == expected_records + + +def test_dpath_extractor_interpolated_expand_path(): + cfg = {"nested": "items"} + record_expander = RecordExpander( + expand_records_from_field=["{{ config['nested'] }}", "data"], + config=cfg, + parameters=parameters, + remain_original_record=True, + ) + extractor = DpathExtractor( + field_path=["data", "object"], + config=cfg, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + body = {"data": {"object": {"id": "parent", "items": {"data": [{"id": "child"}]}}}} + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + assert actual_records == [ + {"id": "child", "original_record": {"id": "parent", "items": {"data": [{"id": "child"}]}}}, + ] + + +def test_dpath_extractor_expands_non_mapping_safely(): + record_expander = RecordExpander( + expand_records_from_field=["items"], + config=config, + parameters=parameters, + ) + extractor = DpathExtractor( + field_path=["value"], + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response({"value": 3}) + actual_records = list(extractor.extract_records(response)) + assert actual_records == [3] + + +def test_dpath_extractor_non_dict_items_with_parent_context(): + parent = {"items": [1, "a"], "meta": "m"} + record_expander = RecordExpander( + expand_records_from_field=["items"], + config=config, + parameters=parameters, + remain_original_record=True, + ) + extractor = DpathExtractor( + field_path=["data"], + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response({"data": parent}) + actual_records = list(extractor.extract_records(response)) + assert actual_records == [ + {"value": 1, "original_record": parent}, + {"value": "a", "original_record": parent}, + ] + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, remain_original_record, body, expected_records", + [ + ( + ["data", "object"], + ["lines", "data"], + False, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ] + }, + } + } + }, + [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ], + ), + ( + ["data", "object"], + ["lines", "data"], + True, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + ] + }, + } + } + }, + [ + { + "id": "il_1", + "amount": 100, + "original_record": { + "id": "in_123", + "created": 1234567890, + "lines": {"data": [{"id": "il_1", "amount": 100}]}, + }, + }, + ], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": []}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1"}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [], + ), + ( + ["data"], + ["nested", "array"], + False, + { + "data": { + "id": "parent_1", + "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}, + } + }, + [{"id": "child_1"}, {"id": "child_2"}], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": [1, 2, "string", {"id": "dict_item"}]}}, + [1, 2, "string", {"id": "dict_item"}], + ), + ( + [], + ["items"], + False, + [ + {"id": "parent_1", "items": [{"id": "child_1"}]}, + {"id": "parent_2", "items": [{"id": "child_2"}, {"id": "child_3"}]}, + ], + [{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}, {"id": "item_2"}]}, + {"name": "section2", "items": [{"id": "item_3"}]}, + ] + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + True, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + } + }, + [ + { + "id": "item_1", + "original_record": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + }, + } + ], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": []}, + {"name": "section2", "items": []}, + ] + } + }, + [], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1"}, + {"name": "section2", "items": "not_an_array"}, + ] + } + }, + [], + ), + ( + ["data"], + ["*", "items"], + False, + { + "data": { + "group1": {"items": [{"id": "item_1"}]}, + "group2": {"items": [{"id": "item_2"}, {"id": "item_3"}]}, + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ], + ids=[ + "test_expand_nested_array", + "test_expand_with_original_record", + "test_expand_empty_array_yields_nothing", + "test_expand_missing_path_yields_nothing", + "test_expand_non_array_yields_nothing", + "test_expand_deeply_nested_path", + "test_expand_mixed_types_in_array", + "test_expand_multiple_parent_records", + "test_expand_wildcard_multiple_lists", + "test_expand_wildcard_with_original_record", + "test_expand_wildcard_all_empty_arrays", + "test_expand_wildcard_no_list_matches", + "test_expand_wildcard_dict_values", + ], +) +def test_dpath_extractor_with_expansion( + field_path: List, + expand_records_from_field: List, + remain_original_record: bool, + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + remain_original_record=remain_original_record, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, on_no_records, body, expected_records", + [ + pytest.param( + ["data"], + ["items"], + OnNoRecords.skip, + {"data": {"id": "parent_1"}}, + [], + id="on_no_records_skip_missing_path", + ), + pytest.param( + ["data"], + ["items"], + OnNoRecords.skip, + {"data": {"id": "parent_1", "items": []}}, + [], + id="on_no_records_skip_empty_array", + ), + pytest.param( + ["data"], + ["items"], + OnNoRecords.emit_parent, + {"data": {"id": "parent_1"}}, + [{"id": "parent_1"}], + id="on_no_records_emit_parent_missing_path", + ), + pytest.param( + ["data"], + ["items"], + OnNoRecords.emit_parent, + {"data": {"id": "parent_1", "items": []}}, + [{"id": "parent_1", "items": []}], + id="on_no_records_emit_parent_empty_array", + ), + pytest.param( + ["data"], + ["items"], + OnNoRecords.emit_parent, + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [{"id": "parent_1", "items": "not_an_array"}], + id="on_no_records_emit_parent_non_array", + ), + pytest.param( + ["data"], + ["items"], + OnNoRecords.emit_parent, + {"data": {"id": "parent_1", "items": [{"id": "child_1"}]}}, + [{"id": "child_1"}], + id="on_no_records_emit_parent_has_items_extracts_normally", + ), + ], +) +def test_dpath_extractor_on_no_records( + field_path: List, + expand_records_from_field: List, + on_no_records: OnNoRecords, + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + on_no_records=on_no_records, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records