-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathcomponent_constructor.py
More file actions
102 lines (89 loc) · 3.43 KB
/
component_constructor.py
File metadata and controls
102 lines (89 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from typing import Any, Callable, Generic, Mapping, Optional, Type, TypeVar
from pydantic.v1 import BaseModel
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.types import Config
M = TypeVar("M", bound=BaseModel)
@dataclass
class AdditionalFlags:
def __init__(
self,
emit_connector_builder_messages: bool,
disable_retries: bool,
message_repository: MessageRepository,
connector_state_manager: ConnectorStateManager,
limit_pages_fetched_per_slice: Optional[int],
limit_slices_fetched: Optional[int],
):
self.emit_connector_builder_messages = emit_connector_builder_messages
self.disable_retries = disable_retries
self.message_repository = message_repository
self.connector_state_manager = connector_state_manager
self.limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
self.limit_slices_fetched = limit_slices_fetched
@property
def should_limit_slices_fetched(self) -> bool:
"""
Returns True if the number of slices fetched should be limited, False otherwise.
This is used to limit the number of slices fetched during tests.
"""
return bool(self.limit_slices_fetched or self.emit_connector_builder_messages)
@dataclass
class ComponentConstructor(Generic[M]):
@classmethod
def resolve_dependencies(
cls,
model: M,
config: Config,
dependency_constructor: Callable[..., Any],
additional_flags: AdditionalFlags,
**kwargs: Any,
) -> Mapping[str, Any]:
"""
Resolves the component's dependencies, this method should be created in the component,
if there are any dependencies on other components, or we need to adopt / change / adjust / fine-tune
specific component's behavior.
"""
return {}
@classmethod
def build(
cls,
model: M,
config: Config,
dependency_constructor: Callable[..., Any],
additional_flags: AdditionalFlags,
**kwargs: Any,
) -> "ComponentConstructor[M]":
"""
Builds up the Component and it's component-specific dependencies.
Order of operations:
- build the dependencies first
- build the component with the resolved dependencies
"""
# resolve the component dependencies first
resolved_dependencies: Mapping[str, Any] = cls.resolve_dependencies(
model=model,
config=config,
dependency_constructor=dependency_constructor,
additional_flags=additional_flags,
**kwargs,
)
# returns the instance of the component class,
# with resolved dependencies and model-specific arguments.
return cls(**resolved_dependencies)
@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
return None
names_to_types = {
ValueType.string: str,
ValueType.number: float,
ValueType.integer: int,
ValueType.boolean: bool,
}
return names_to_types[value_type]