88from copy import deepcopy
99from importlib import metadata
1010from types import ModuleType
11- from typing import Any , Dict , Iterator , List , Mapping , MutableMapping , Optional , Set
11+ from typing import Any , Dict , Iterator , List , Mapping , Optional , Set
1212
1313import orjson
1414import yaml
3535from airbyte_cdk .sources .declarative .checks import COMPONENTS_CHECKER_TYPE_MAPPING
3636from airbyte_cdk .sources .declarative .checks .connection_checker import ConnectionChecker
3737from airbyte_cdk .sources .declarative .declarative_source import DeclarativeSource
38+ from airbyte_cdk .sources .declarative .interpolation import InterpolatedBoolean
39+ from airbyte_cdk .sources .declarative .models .declarative_component_schema import (
40+ ConditionalStreams as ConditionalStreamsModel ,
41+ )
3842from airbyte_cdk .sources .declarative .models .declarative_component_schema import (
3943 DeclarativeStream as DeclarativeStreamModel ,
4044)
@@ -300,7 +304,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
300304 }
301305 )
302306
303- stream_configs = self ._stream_configs (self ._source_config ) + self .dynamic_streams
307+ stream_configs = (
308+ self ._stream_configs (self ._source_config , config = config ) + self .dynamic_streams
309+ )
304310
305311 api_budget_model = self ._source_config .get ("api_budget" )
306312 if api_budget_model :
@@ -319,7 +325,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
319325 )
320326 for stream_config in self ._initialize_cache_for_parent_streams (deepcopy (stream_configs ))
321327 ]
322-
323328 return source_streams
324329
325330 @staticmethod
@@ -373,7 +378,6 @@ def update_with_cache_parent_configs(
373378 )
374379 else :
375380 stream_config ["retriever" ]["requester" ]["use_cache" ] = True
376-
377381 return stream_configs
378382
379383 def spec (self , logger : logging .Logger ) -> ConnectorSpecification :
@@ -477,12 +481,27 @@ def _parse_version(
477481 # No exception
478482 return parsed_version
479483
480- def _stream_configs (self , manifest : Mapping [str , Any ]) -> List [Dict [str , Any ]]:
484+ def _stream_configs (
485+ self , manifest : Mapping [str , Any ], config : Mapping [str , Any ]
486+ ) -> List [Dict [str , Any ]]:
481487 # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config
482- stream_configs : List [Dict [str , Any ]] = manifest .get ("streams" , [])
483- for s in stream_configs :
484- if "type" not in s :
485- s ["type" ] = "DeclarativeStream"
488+ stream_configs = []
489+ for current_stream_config in manifest .get ("streams" , []):
490+ if (
491+ "type" in current_stream_config
492+ and current_stream_config ["type" ] == "ConditionalStreams"
493+ ):
494+ interpolated_boolean = InterpolatedBoolean (
495+ condition = current_stream_config .get ("condition" ),
496+ parameters = {},
497+ )
498+
499+ if interpolated_boolean .eval (config = config ):
500+ stream_configs .extend (current_stream_config .get ("streams" , []))
501+ else :
502+ if "type" not in current_stream_config :
503+ current_stream_config ["type" ] = "DeclarativeStream"
504+ stream_configs .append (current_stream_config )
486505 return stream_configs
487506
488507 def _dynamic_stream_configs (
0 commit comments