Skip to content

Commit 5a1644d

Browse files
author
Baz
authored
fix: (CDK) (Manifest) - Add Manifest Normalization module (reduce commonalities + handle schema $refs) (#447)
1 parent 1b1d1a8 commit 5a1644d

13 files changed

Lines changed: 1796 additions & 51 deletions

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,22 @@ def get_limits(config: Mapping[str, Any]) -> TestLimits:
5656
return TestLimits(max_records, max_pages_per_slice, max_slices, max_streams)
5757

5858

59+
def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
60+
"""
61+
Check if the manifest should be normalized.
62+
:param config: The configuration to check
63+
:return: True if the manifest should be normalized, False otherwise.
64+
"""
65+
return config.get("__should_normalize", False)
66+
67+
5968
def create_source(config: Mapping[str, Any], limits: TestLimits) -> ManifestDeclarativeSource:
6069
manifest = config["__injected_declarative_manifest"]
6170
return ManifestDeclarativeSource(
6271
config=config,
6372
emit_connector_builder_messages=True,
6473
source_config=manifest,
74+
normalize_manifest=should_normalize_manifest(config),
6575
component_factory=ModelToComponentFactory(
6676
emit_connector_builder_messages=True,
6777
limit_pages_fetched_per_slice=limits.max_pages_per_slice,

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1917,8 +1917,9 @@ definitions:
19171917
type: string
19181918
enum: [HttpRequester]
19191919
url_base:
1920+
linkable: true
19201921
title: API Base URL
1921-
description: Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
1922+
description: The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
19221923
type: string
19231924
interpolation_context:
19241925
- config
@@ -1936,7 +1937,7 @@ definitions:
19361937
- "https://example.com/api/v1/resource/{{ next_page_token['id'] }}"
19371938
path:
19381939
title: URL Path
1939-
description: Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
1940+
description: The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.
19401941
type: string
19411942
interpolation_context:
19421943
- config
@@ -1964,6 +1965,7 @@ definitions:
19641965
- POST
19651966
authenticator:
19661967
title: Authenticator
1968+
linkable: true
19671969
description: Authentication method to use for requests sent to the API.
19681970
anyOf:
19691971
- "$ref": "#/definitions/NoAuth"

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 122 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3030
DeclarativeStream as DeclarativeStreamModel,
3131
)
32-
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
32+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
33+
Spec as SpecModel,
34+
)
3335
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3436
StateDelegatingStream as StateDelegatingStreamModel,
3537
)
@@ -39,6 +41,9 @@
3941
from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import (
4042
ManifestComponentTransformer,
4143
)
44+
from airbyte_cdk.sources.declarative.parsers.manifest_normalizer import (
45+
ManifestNormalizer,
46+
)
4247
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import (
4348
ManifestReferenceResolver,
4449
)
@@ -57,6 +62,24 @@
5762
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
5863

5964

65+
def _get_declarative_component_schema() -> Dict[str, Any]:
66+
try:
67+
raw_component_schema = pkgutil.get_data(
68+
"airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
69+
)
70+
if raw_component_schema is not None:
71+
declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
72+
return declarative_component_schema # type: ignore
73+
else:
74+
raise RuntimeError(
75+
"Failed to read manifest component json schema required for deduplication"
76+
)
77+
except FileNotFoundError as e:
78+
raise FileNotFoundError(
79+
f"Failed to read manifest component json schema required for deduplication: {e}"
80+
)
81+
82+
6083
class ManifestDeclarativeSource(DeclarativeSource):
6184
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""
6285

@@ -68,29 +91,25 @@ def __init__(
6891
debug: bool = False,
6992
emit_connector_builder_messages: bool = False,
7093
component_factory: Optional[ModelToComponentFactory] = None,
71-
):
94+
normalize_manifest: Optional[bool] = False,
95+
) -> None:
7296
"""
7397
Args:
7498
config: The provided config dict.
7599
source_config: The manifest of low-code components that describe the source connector.
76100
debug: True if debug mode is enabled.
77101
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
78102
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
103+
normalize_manifest: Optional flag to indicate if the manifest should be normalized.
79104
"""
80105
self.logger = logging.getLogger(f"airbyte.{self.name}")
81-
# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
82-
manifest = dict(source_config)
83-
if "type" not in manifest:
84-
manifest["type"] = "DeclarativeSource"
85-
106+
self._should_normalize = normalize_manifest
107+
self._declarative_component_schema = _get_declarative_component_schema()
86108
# If custom components are needed, locate and/or register them.
87109
self.components_module: ModuleType | None = get_registered_components_module(config=config)
110+
# resolve all components in the manifest
111+
self._source_config = self._preprocess_manifest(dict(source_config))
88112

89-
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(manifest)
90-
propagated_source_config = ManifestComponentTransformer().propagate_types_and_parameters(
91-
"", resolved_source_config, {}
92-
)
93-
self._source_config = propagated_source_config
94113
self._debug = debug
95114
self._emit_connector_builder_messages = emit_connector_builder_messages
96115
self._constructor = (
@@ -105,22 +124,91 @@ def __init__(
105124
self._slice_logger: SliceLogger = (
106125
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
107126
)
108-
109127
self._config = config or {}
128+
129+
# validate resolved manifest against the declarative component schema
110130
self._validate_source()
111131

132+
# apply additional post-processing to the manifest
133+
self._postprocess_manifest()
134+
112135
@property
113136
def resolved_manifest(self) -> Mapping[str, Any]:
137+
"""
138+
Returns the resolved manifest configuration for the source.
139+
140+
This property provides access to the internal source configuration as a mapping,
141+
which contains all settings and parameters required to define the source's behavior.
142+
143+
Returns:
144+
Mapping[str, Any]: The resolved source configuration manifest.
145+
"""
114146
return self._source_config
115147

148+
def _preprocess_manifest(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
149+
"""
150+
Preprocesses the provided manifest dictionary by resolving any manifest references.
151+
152+
This method modifies the input manifest in place, resolving references using the
153+
ManifestReferenceResolver to ensure all references within the manifest are properly handled.
154+
155+
Args:
156+
manifest (Dict[str, Any]): The manifest dictionary to preprocess and resolve references in.
157+
158+
Returns:
159+
None
160+
"""
161+
# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
162+
manifest = self._fix_source_type(manifest)
163+
# Resolve references in the manifest
164+
resolved_manifest = ManifestReferenceResolver().preprocess_manifest(manifest)
165+
# Propagate types and parameters throughout the manifest
166+
propagated_manifest = ManifestComponentTransformer().propagate_types_and_parameters(
167+
"", resolved_manifest, {}
168+
)
169+
170+
return propagated_manifest
171+
172+
def _postprocess_manifest(self) -> None:
173+
"""
174+
Post-processes the manifest after validation.
175+
This method is responsible for any additional modifications or transformations needed
176+
after the manifest has been validated and before it is used in the source.
177+
"""
178+
# apply manifest normalization, if required
179+
self._normalize_manifest()
180+
181+
def _normalize_manifest(self) -> None:
182+
"""
183+
This method is used to normalize the manifest. It should be called after the manifest has been validated.
184+
185+
Connector Builder UI rendering requires the manifest to be in a specific format.
186+
- references have been resolved
187+
- the commonly used definitions are extracted to the `definitions.linked.*`
188+
"""
189+
if self._should_normalize:
190+
normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
191+
self._source_config = normalizer.normalize()
192+
193+
def _fix_source_type(self, manifest: Dict[str, Any]) -> Dict[str, Any]:
194+
"""
195+
Fix the source type in the manifest. This is necessary because the source type is not always set in the manifest.
196+
"""
197+
if "type" not in manifest:
198+
manifest["type"] = "DeclarativeSource"
199+
200+
return manifest
201+
116202
@property
117203
def message_repository(self) -> MessageRepository:
118204
return self._message_repository
119205

120206
@property
121207
def dynamic_streams(self) -> List[Dict[str, Any]]:
122208
return self._dynamic_stream_configs(
123-
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
209+
manifest=self._source_config,
210+
config=self._config,
211+
with_dynamic_stream_name=True,
124212
)
125213

126214
@property
@@ -143,7 +231,10 @@ def connection_checker(self) -> ConnectionChecker:
143231

144232
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
145233
self._emit_manifest_debug_message(
146-
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
234+
extra_args={
235+
"source_name": self.name,
236+
"parsed_config": json.dumps(self._source_config),
237+
}
147238
)
148239

149240
stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
@@ -156,9 +247,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
156247

157248
source_streams = [
158249
self._constructor.create_component(
159-
StateDelegatingStreamModel
160-
if stream_config.get("type") == StateDelegatingStreamModel.__name__
161-
else DeclarativeStreamModel,
250+
(
251+
StateDelegatingStreamModel
252+
if stream_config.get("type") == StateDelegatingStreamModel.__name__
253+
else DeclarativeStreamModel
254+
),
162255
stream_config,
163256
config,
164257
emit_connector_builder_messages=self._emit_connector_builder_messages,
@@ -174,7 +267,9 @@ def _initialize_cache_for_parent_streams(
174267
) -> List[Dict[str, Any]]:
175268
parent_streams = set()
176269

177-
def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
270+
def update_with_cache_parent_configs(
271+
parent_configs: list[dict[str, Any]],
272+
) -> None:
178273
for parent_config in parent_configs:
179274
parent_streams.add(parent_config["stream"]["name"])
180275
if parent_config["stream"]["type"] == "StateDelegatingStream":
@@ -229,7 +324,10 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
229324
"""
230325
self._configure_logger_level(logger)
231326
self._emit_manifest_debug_message(
232-
extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}
327+
extra_args={
328+
"source_name": self.name,
329+
"parsed_config": json.dumps(self._source_config),
330+
}
233331
)
234332

235333
spec = self._source_config.get("spec")
@@ -266,25 +364,9 @@ def _validate_source(self) -> None:
266364
"""
267365
Validates the connector manifest against the declarative component schema
268366
"""
269-
try:
270-
raw_component_schema = pkgutil.get_data(
271-
"airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
272-
)
273-
if raw_component_schema is not None:
274-
declarative_component_schema = yaml.load(
275-
raw_component_schema, Loader=yaml.SafeLoader
276-
)
277-
else:
278-
raise RuntimeError(
279-
"Failed to read manifest component json schema required for validation"
280-
)
281-
except FileNotFoundError as e:
282-
raise FileNotFoundError(
283-
f"Failed to read manifest component json schema required for validation: {e}"
284-
)
285367

286368
try:
287-
validate(self._source_config, declarative_component_schema)
369+
validate(self._source_config, self._declarative_component_schema)
288370
except ValidationError as e:
289371
raise ValidationError(
290372
"Validation against json schema defined in declarative_component_schema.yaml schema failed"
@@ -382,7 +464,9 @@ def _dynamic_stream_configs(
382464

383465
# Create a resolver for dynamic components based on type
384466
components_resolver = self._constructor.create_component(
385-
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config
467+
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
468+
components_resolver_config,
469+
config,
386470
)
387471

388472
stream_template_config = dynamic_definition["stream_template"]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2209,7 +2209,7 @@ class HttpRequester(BaseModel):
22092209
type: Literal["HttpRequester"]
22102210
url_base: str = Field(
22112211
...,
2212-
description="Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
2212+
description="The Base URL of the API source. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
22132213
examples=[
22142214
"https://connect.squareup.com/v2",
22152215
"{{ config['base_url'] or 'https://app.posthog.com'}}/api",
@@ -2220,7 +2220,7 @@ class HttpRequester(BaseModel):
22202220
)
22212221
path: Optional[str] = Field(
22222222
None,
2223-
description="Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
2223+
description="The Path the specific API endpoint that this stream represents. Do not put sensitive information (e.g. API tokens) into this field - Use the Authentication component for this.",
22242224
examples=[
22252225
"/products",
22262226
"/quotes/{{ stream_partition['id'] }}/quote_line_groups",

airbyte_cdk/sources/declarative/parsers/custom_exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,12 @@ class UndefinedReferenceException(Exception):
1919

2020
def __init__(self, path: str, reference: str) -> None:
2121
super().__init__(f"Undefined reference {reference} from {path}")
22+
23+
24+
class ManifestNormalizationException(Exception):
25+
"""
26+
Raised when a circular reference is detected in a manifest.
27+
"""
28+
29+
def __init__(self, message: str) -> None:
30+
super().__init__(f"Failed to deduplicate manifest: {message}")

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import copy
66
import typing
7-
from typing import Any, Mapping, Optional
7+
from typing import Any, Dict, Mapping, Optional
88

99
PARAMETERS_STR = "$parameters"
1010

@@ -95,7 +95,7 @@ def propagate_types_and_parameters(
9595
declarative_component: Mapping[str, Any],
9696
parent_parameters: Mapping[str, Any],
9797
use_parent_parameters: Optional[bool] = None,
98-
) -> Mapping[str, Any]:
98+
) -> Dict[str, Any]:
9999
"""
100100
Recursively transforms the specified declarative component and subcomponents to propagate parameters and insert the
101101
default component type if it was not already present. The resulting transformed components are a deep copy of the input

0 commit comments

Comments
 (0)