11#
2- # Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+ # Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33#
44
55import json
88from copy import deepcopy
99from importlib import metadata
1010from types import ModuleType
11- from typing import Any , Dict , Iterator , List , Mapping , Optional , Set
11+ from typing import Any , Dict , Iterator , List , Mapping , MutableMapping , Optional , Set
1212
13+ import orjson
1314import yaml
1415from jsonschema .exceptions import ValidationError
1516from jsonschema .validators import validate
1617from packaging .version import InvalidVersion , Version
1718
19+ from airbyte_cdk .config_observation import create_connector_config_control_message
1820from airbyte_cdk .connector_builder .models import (
1921 LogMessage as ConnectorBuilderLogMessage ,
2022)
2931 ConnectorSpecification ,
3032 FailureType ,
3133)
34+ from airbyte_cdk .models .airbyte_protocol_serializers import AirbyteMessageSerializer
3235from airbyte_cdk .sources .declarative .checks import COMPONENTS_CHECKER_TYPE_MAPPING
3336from airbyte_cdk .sources .declarative .checks .connection_checker import ConnectionChecker
3437from airbyte_cdk .sources .declarative .declarative_source import DeclarativeSource
5760 ModelToComponentFactory ,
5861)
5962from airbyte_cdk .sources .declarative .resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
63+ from airbyte_cdk .sources .declarative .spec .spec import Spec
6064from airbyte_cdk .sources .message import MessageRepository
6165from airbyte_cdk .sources .streams .core import Stream
62- from airbyte_cdk .sources .types import ConnectionDefinition
66+ from airbyte_cdk .sources .types import Config , ConnectionDefinition
6367from airbyte_cdk .sources .utils .slice_logger import (
6468 AlwaysLogSliceLogger ,
6569 DebugSliceLogger ,
@@ -99,6 +103,7 @@ def __init__(
99103 component_factory : Optional [ModelToComponentFactory ] = None ,
100104 migrate_manifest : Optional [bool ] = False ,
101105 normalize_manifest : Optional [bool ] = False ,
106+ config_path : Optional [str ] = None ,
102107 ) -> None :
103108 """
104109 Args:
@@ -108,6 +113,7 @@ def __init__(
108113 emit_connector_builder_messages: True if messages should be emitted to the connector builder.
109114 component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
110115 normalize_manifest: Optional flag to indicate if the manifest should be normalized.
116+ config_path: Optional path to the config file.
111117 """
112118 self .logger = logging .getLogger (f"airbyte.{ self .name } " )
113119 self ._should_normalize = normalize_manifest
@@ -130,7 +136,6 @@ def __init__(
130136 self ._slice_logger : SliceLogger = (
131137 AlwaysLogSliceLogger () if emit_connector_builder_messages else DebugSliceLogger ()
132138 )
133- self ._config = config or {}
134139
135140 # resolve all components in the manifest
136141 self ._source_config = self ._pre_process_manifest (dict (source_config ))
@@ -139,6 +144,12 @@ def __init__(
139144 # apply additional post-processing to the manifest
140145 self ._post_process_manifest ()
141146
147+ spec : Optional [Mapping [str , Any ]] = self ._source_config .get ("spec" )
148+ self ._spec_component : Optional [Spec ] = (
149+ self ._constructor .create_component (SpecModel , spec , dict ()) if spec else None
150+ )
151+ self ._config = self ._migrate_and_transform_config (config_path , config ) or {}
152+
142153 @property
143154 def resolved_manifest (self ) -> Mapping [str , Any ]:
144155 """
@@ -199,6 +210,30 @@ def _normalize_manifest(self) -> None:
199210 normalizer = ManifestNormalizer (self ._source_config , self ._declarative_component_schema )
200211 self ._source_config = normalizer .normalize ()
201212
213+ def _migrate_and_transform_config (
214+ self ,
215+ config_path : Optional [str ],
216+ config : Optional [Config ],
217+ ) -> Optional [Config ]:
218+ if not config :
219+ return None
220+ if not self ._spec_component :
221+ return config
222+ mutable_config = dict (config )
223+ self ._spec_component .migrate_config (mutable_config )
224+ if mutable_config != config :
225+ if config_path :
226+ with open (config_path , "w" ) as f :
227+ json .dump (mutable_config , f )
228+ self .message_repository .emit_message (
229+ create_connector_config_control_message (mutable_config )
230+ )
231+ # We have no mechanism for consuming the queue, so we print the messages to stdout
232+ for message in self .message_repository .consume_queue ():
233+ print (orjson .dumps (AirbyteMessageSerializer .dump (message )).decode ())
234+ self ._spec_component .transform_config (mutable_config )
235+ return mutable_config
236+
202237 def _migrate_manifest (self ) -> None :
203238 """
204239 This method is used to migrate the manifest. It should be called after the manifest has been validated.
@@ -255,6 +290,9 @@ def connection_checker(self) -> ConnectionChecker:
255290 )
256291
257292 def streams (self , config : Mapping [str , Any ]) -> List [Stream ]:
293+ if self ._spec_component :
294+ self ._spec_component .validate_config (config )
295+
258296 self ._emit_manifest_debug_message (
259297 extra_args = {
260298 "source_name" : self .name ,
@@ -355,14 +393,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
355393 }
356394 )
357395
358- spec = self ._source_config .get ("spec" )
359- if spec :
360- if "type" not in spec :
361- spec ["type" ] = "Spec"
362- spec_component = self ._constructor .create_component (SpecModel , spec , dict ())
363- return spec_component .generate_spec ()
364- else :
365- return super ().spec (logger )
396+ return (
397+ self ._spec_component .generate_spec () if self ._spec_component else super ().spec (logger )
398+ )
366399
367400 def check (self , logger : logging .Logger , config : Mapping [str , Any ]) -> AirbyteConnectionStatus :
368401 self ._configure_logger_level (logger )
0 commit comments