-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathquery_properties.py
More file actions
48 lines (41 loc) · 2.15 KB
/
query_properties.py
File metadata and controls
48 lines (41 loc) · 2.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, List, Mapping, Optional, Union
from airbyte_cdk.sources.declarative.requesters.query_properties import (
PropertiesFromEndpoint,
PropertyChunking,
)
from airbyte_cdk.sources.types import Config, StreamSlice
@dataclass
class QueryProperties:
"""
Low-code component that encompasses the behavior to inject additional property values into the outbound API
requests. Property values can be defined statically within the manifest or dynamically by making requests
to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of
properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved
"""
property_list: Optional[Union[List[str], PropertiesFromEndpoint]]
always_include_properties: Optional[List[str]]
property_chunking: Optional[PropertyChunking]
config: Config
parameters: InitVar[Mapping[str, Any]]
def get_request_property_chunks(
self, stream_slice: Optional[StreamSlice] = None
) -> Iterable[List[str]]:
"""
Uses the defined property_list to fetch the total set of properties dynamically or from a static list
and based on the resulting properties, performs property chunking if applicable.
:param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
"""
fields: List[str]
if isinstance(self.property_list, PropertiesFromEndpoint):
fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
else:
fields = self.property_list if self.property_list else []
if self.property_chunking:
yield from self.property_chunking.get_request_property_chunks(
property_fields=fields, always_include_properties=self.always_include_properties
)
else:
yield list(fields)