Skip to content

Commit 656a806

Browse files
author
maxime.c
committed
Merge branch 'main' into maxi297/cache_properties_from_endpoint
2 parents 2adf3ea + 26a9b98 commit 656a806

23 files changed

Lines changed: 1378 additions & 259 deletions

Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# A new version of source-declarative-manifest is built for every new Airbyte CDK release, and their versions are kept in sync.
66
#
77

8-
FROM docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73
8+
FROM docker.io/airbyte/python-connector-base:4.1.0@sha256:1d1aa21d34e851df4e8a87b391c27724c06e2597608e7161f4d167be853bd7b6
99

1010
WORKDIR /airbyte/integration_code
1111

@@ -24,8 +24,8 @@ RUN pip install dist/*.whl
2424
RUN mkdir -p source_declarative_manifest \
2525
&& echo 'from source_declarative_manifest.run import run\n\nif __name__ == "__main__":\n run()' > main.py \
2626
&& touch source_declarative_manifest/__init__.py \
27-
&& cp /usr/local/lib/python3.11/site-packages/airbyte_cdk/cli/source_declarative_manifest/_run.py source_declarative_manifest/run.py \
28-
&& cp /usr/local/lib/python3.11/site-packages/airbyte_cdk/cli/source_declarative_manifest/spec.json source_declarative_manifest/
27+
&& cp /usr/local/lib/python3.13/site-packages/airbyte_cdk/cli/source_declarative_manifest/_run.py source_declarative_manifest/run.py \
28+
&& cp /usr/local/lib/python3.13/site-packages/airbyte_cdk/cli/source_declarative_manifest/spec.json source_declarative_manifest/
2929

3030
# Remove unnecessary build files
3131
RUN rm -rf dist/ pyproject.toml poetry.lock README.md

airbyte_cdk/destinations/vector_db_based/document_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
from typing import Any, Dict, List, Mapping, Optional, Tuple
99

1010
import dpath
11-
from langchain.text_splitter import Language, RecursiveCharacterTextSplitter
12-
from langchain.utils import stringify_dict
1311
from langchain_core.documents.base import Document
12+
from langchain_core.utils.strings import stringify_dict
13+
from langchain_text_splitters import Language, RecursiveCharacterTextSplitter
1414

1515
from airbyte_cdk.destinations.vector_db_based.config import (
1616
ProcessingConfigModel,

airbyte_cdk/destinations/vector_db_based/embedder.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
from dataclasses import dataclass
88
from typing import List, Optional, Union, cast
99

10-
from langchain.embeddings.cohere import CohereEmbeddings
11-
from langchain.embeddings.fake import FakeEmbeddings
12-
from langchain.embeddings.localai import LocalAIEmbeddings
13-
from langchain.embeddings.openai import OpenAIEmbeddings
10+
from langchain_community.embeddings import (
11+
CohereEmbeddings,
12+
FakeEmbeddings,
13+
LocalAIEmbeddings,
14+
OpenAIEmbeddings,
15+
)
1416

1517
from airbyte_cdk.destinations.vector_db_based.config import (
1618
AzureOpenAIEmbeddingConfigModel,
@@ -140,7 +142,9 @@ def __init__(self, config: CohereEmbeddingConfigModel):
140142
super().__init__()
141143
# Client is set internally
142144
self.embeddings = CohereEmbeddings(
143-
cohere_api_key=config.cohere_key, model="embed-english-light-v2.0"
145+
cohere_api_key=config.cohere_key,
146+
model="embed-english-light-v2.0",
147+
user_agent="airbyte-cdk",
144148
) # type: ignore
145149

146150
def check(self) -> Optional[str]:

airbyte_cdk/manifest_server/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
# Example:
66
# docker build -f airbyte_cdk/manifest_server/Dockerfile -t airbyte/manifest-server .
77

8-
FROM python:3.12-slim-bookworm
8+
FROM python:3.13.9-slim-bookworm@sha256:4c9fe962f6ce46ecf3633a7e9d0a9fb7f5622121ee00d628eff206da024147c9
99

1010
# Install git (needed for dynamic versioning) and poetry
1111
RUN apt-get update && \

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def __init__(
169169
component_factory = ModelToComponentFactory(
170170
emit_connector_builder_messages=emit_connector_builder_messages,
171171
message_repository=ConcurrentMessageRepository(queue, message_repository),
172+
configured_catalog=catalog,
172173
connector_state_manager=self._connector_state_manager,
173174
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
174175
limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2520,6 +2520,34 @@ definitions:
25202520
type:
25212521
type: string
25222522
enum: [JsonlDecoder]
2523+
JsonSchemaPropertySelector:
2524+
title: Json Schema Property Selector
2525+
description: When configured, the JSON schema supplied in the catalog containing which columns are selected for the current stream will be used to reduce which query properties will be included in the outbound API request. This can improve the performance of API requests, especially for those requiring multiple requests to get a complete record.
2526+
type: object
2527+
required:
2528+
- type
2529+
properties:
2530+
type:
2531+
type: string
2532+
enum: [JsonSchemaPropertySelector]
2533+
transformations:
2534+
title: Transformations
2535+
description: A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.
2536+
linkable: true
2537+
type: array
2538+
items:
2539+
anyOf:
2540+
- "$ref": "#/definitions/AddFields"
2541+
- "$ref": "#/definitions/RemoveFields"
2542+
- "$ref": "#/definitions/KeysToLower"
2543+
- "$ref": "#/definitions/KeysToSnakeCase"
2544+
- "$ref": "#/definitions/FlattenFields"
2545+
- "$ref": "#/definitions/DpathFlattenFields"
2546+
- "$ref": "#/definitions/KeysReplace"
2547+
- "$ref": "#/definitions/CustomTransformation"
2548+
$parameters:
2549+
type: object
2550+
additionalProperties: true
25232551
KeysToLower:
25242552
title: Keys to Lower Case
25252553
description: A transformation that renames all keys to lower case.
@@ -3410,6 +3438,10 @@ definitions:
34103438
title: Property Chunking
34113439
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
34123440
"$ref": "#/definitions/PropertyChunking"
3441+
property_selector:
3442+
title: Property Selector
3443+
description: Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.
3444+
"$ref": "#/definitions/JsonSchemaPropertySelector"
34133445
$parameters:
34143446
type: object
34153447
additionalProperties: true
@@ -3746,7 +3778,7 @@ definitions:
37463778
properties:
37473779
type:
37483780
type: string
3749-
enum: [ PaginationReset ]
3781+
enum: [PaginationReset]
37503782
action:
37513783
type: string
37523784
enum:
@@ -3763,7 +3795,7 @@ definitions:
37633795
properties:
37643796
type:
37653797
type: string
3766-
enum: [ PaginationResetLimits ]
3798+
enum: [PaginationResetLimits]
37673799
number_of_records:
37683800
type: integer
37693801
GzipDecoder:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2029,6 +2031,29 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
20292031
)
20302032

20312033

2034+
class JsonSchemaPropertySelector(BaseModel):
2035+
type: Literal["JsonSchemaPropertySelector"]
2036+
transformations: Optional[
2037+
List[
2038+
Union[
2039+
AddFields,
2040+
RemoveFields,
2041+
KeysToLower,
2042+
KeysToSnakeCase,
2043+
FlattenFields,
2044+
DpathFlattenFields,
2045+
KeysReplace,
2046+
CustomTransformation,
2047+
]
2048+
]
2049+
] = Field(
2050+
None,
2051+
description="A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.",
2052+
title="Transformations",
2053+
)
2054+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2055+
2056+
20322057
class ListPartitionRouter(BaseModel):
20332058
type: Literal["ListPartitionRouter"]
20342059
cursor_field: str = Field(
@@ -2799,6 +2824,11 @@ class QueryProperties(BaseModel):
27992824
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
28002825
title="Property Chunking",
28012826
)
2827+
property_selector: Optional[JsonSchemaPropertySelector] = Field(
2828+
None,
2829+
description="Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.",
2830+
title="Property Selector",
2831+
)
28022832
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
28032833

28042834

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
get_type_hints,
2727
)
2828

29+
from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
2930
from isodate import parse_duration
3031
from pydantic.v1 import BaseModel
3132
from requests import Response
@@ -42,6 +43,7 @@
4243
AirbyteStateMessage,
4344
AirbyteStateType,
4445
AirbyteStreamState,
46+
ConfiguredAirbyteCatalog,
4547
FailureType,
4648
Level,
4749
StreamDescriptor,
@@ -314,6 +316,9 @@
314316
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
315317
JsonlDecoder as JsonlDecoderModel,
316318
)
319+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
320+
JsonSchemaPropertySelector as JsonSchemaPropertySelectorModel,
321+
)
317322
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
318323
JwtAuthenticator as JwtAuthenticatorModel,
319324
)
@@ -501,6 +506,9 @@
501506
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
502507
PropertyLimitType,
503508
)
509+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import (
510+
JsonSchemaPropertySelector,
511+
)
504512
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import (
505513
GroupByKey,
506514
)
@@ -668,6 +676,7 @@ def __init__(
668676
message_repository: Optional[MessageRepository] = None,
669677
connector_state_manager: Optional[ConnectorStateManager] = None,
670678
max_concurrent_async_job_count: Optional[int] = None,
679+
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
671680
):
672681
self._init_mappings()
673682
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -678,6 +687,9 @@ def __init__(
678687
self._message_repository = message_repository or InMemoryMessageRepository(
679688
self._evaluate_log_level(emit_connector_builder_messages)
680689
)
690+
self._stream_name_to_configured_stream = self._create_stream_name_to_configured_stream(
691+
configured_catalog
692+
)
681693
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
682694
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
683695
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
@@ -734,6 +746,7 @@ def _init_mappings(self) -> None:
734746
InlineSchemaLoaderModel: self.create_inline_schema_loader,
735747
JsonDecoderModel: self.create_json_decoder,
736748
JsonlDecoderModel: self.create_jsonl_decoder,
749+
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
737750
GzipDecoderModel: self.create_gzip_decoder,
738751
KeysToLowerModel: self.create_keys_to_lower_transformation,
739752
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
@@ -796,6 +809,16 @@ def _init_mappings(self) -> None:
796809
# Needed for the case where we need to perform a second parse on the fields of a custom component
797810
self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR}
798811

812+
@staticmethod
813+
def _create_stream_name_to_configured_stream(
814+
configured_catalog: Optional[ConfiguredAirbyteCatalog],
815+
) -> Mapping[str, ConfiguredAirbyteStream]:
816+
return (
817+
{stream.stream.name: stream for stream in configured_catalog.streams}
818+
if configured_catalog
819+
else {}
820+
)
821+
799822
def create_component(
800823
self,
801824
model_type: Type[BaseModel],
@@ -2987,7 +3010,7 @@ def create_property_chunking(
29873010
)
29883011

29893012
def create_query_properties(
2990-
self, model: QueryPropertiesModel, config: Config, **kwargs: Any
3013+
self, model: QueryPropertiesModel, config: Config, *, stream_name: str, **kwargs: Any
29913014
) -> QueryProperties:
29923015
if isinstance(model.property_list, list):
29933016
property_list = model.property_list
@@ -3004,10 +3027,43 @@ def create_query_properties(
30043027
else None
30053028
)
30063029

3030+
property_selector = (
3031+
self._create_component_from_model(
3032+
model=model.property_selector, config=config, stream_name=stream_name, **kwargs
3033+
)
3034+
if model.property_selector
3035+
else None
3036+
)
3037+
30073038
return QueryProperties(
30083039
property_list=property_list,
30093040
always_include_properties=model.always_include_properties,
30103041
property_chunking=property_chunking,
3042+
property_selector=property_selector,
3043+
config=config,
3044+
parameters=model.parameters or {},
3045+
)
3046+
3047+
def create_json_schema_property_selector(
3048+
self,
3049+
model: JsonSchemaPropertySelectorModel,
3050+
config: Config,
3051+
*,
3052+
stream_name: str,
3053+
**kwargs: Any,
3054+
) -> JsonSchemaPropertySelector:
3055+
configured_stream = self._stream_name_to_configured_stream.get(stream_name)
3056+
3057+
transformations = []
3058+
if model.transformations:
3059+
for transformation_model in model.transformations:
3060+
transformations.append(
3061+
self._create_component_from_model(model=transformation_model, config=config)
3062+
)
3063+
3064+
return JsonSchemaPropertySelector(
3065+
configured_stream=configured_stream,
3066+
properties_transformations=transformations,
30113067
config=config,
30123068
parameters=model.parameters or {},
30133069
)
@@ -3235,7 +3291,7 @@ def _get_url(req: Requester) -> str:
32353291

32363292
if len(query_properties_definitions) == 1:
32373293
query_properties = self._create_component_from_model(
3238-
model=query_properties_definitions[0], config=config
3294+
model=query_properties_definitions[0], stream_name=name, config=config
32393295
)
32403296

32413297
# Removes QueryProperties components from the interpolated mappings because it has been designed
@@ -3261,11 +3317,13 @@ def _get_url(req: Requester) -> str:
32613317

32623318
query_properties = self.create_query_properties(
32633319
model=query_properties_definition,
3320+
stream_name=name,
32643321
config=config,
32653322
)
32663323
elif hasattr(model.requester, "query_properties") and model.requester.query_properties:
32673324
query_properties = self.create_query_properties(
32683325
model=model.requester.query_properties,
3326+
stream_name=name,
32693327
config=config,
32703328
)
32713329

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from dataclasses import InitVar, dataclass
44
from enum import Enum
5-
from typing import Any, Iterable, List, Mapping, Optional
5+
from typing import Any, Iterable, List, Mapping, Optional, Set
66

77
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
88
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
@@ -40,7 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4040
)
4141

4242
def get_request_property_chunks(
43-
self, property_fields: List[str], always_include_properties: Optional[List[str]]
43+
self,
44+
property_fields: List[str],
45+
always_include_properties: Optional[List[str]],
46+
configured_properties: Optional[Set[str]],
4447
) -> Iterable[List[str]]:
4548
if not self.property_limit:
4649
single_property_chunk = list(property_fields)
@@ -53,6 +56,8 @@ def get_request_property_chunks(
5356
for property_field in property_fields:
5457
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
5558
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
59+
if configured_properties is not None and property_field not in configured_properties:
60+
continue
5661
property_field_size = (
5762
len(property_field)
5863
+ 3 # The +3 represents the extra characters for encoding the delimiter in between properties
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
3+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import (
4+
JsonSchemaPropertySelector,
5+
)
6+
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
7+
PropertySelector,
8+
)
9+
10+
__all__ = ["JsonSchemaPropertySelector", "PropertySelector"]

0 commit comments

Comments
 (0)