Skip to content

Commit 39738d2

Browse files
authored
Merge branch 'main' into aj/feat/cli/accept-dynamic-gcp-project-id-from-env-var
2 parents e8687c4 + 77a68a5 commit 39738d2

10 files changed

Lines changed: 564 additions & 37 deletions
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# When a PR is has the auto-merge feature enabled or disabled, this workflow adds or updates
2+
# warning text at the bottom of the PR description.
3+
4+
name: "Add Auto-Merge Notification Text"
5+
on:
6+
pull_request:
7+
types: [auto_merge_enabled, auto_merge_disabled]
8+
9+
jobs:
10+
update-description:
11+
runs-on: ubuntu-latest
12+
permissions:
13+
pull-requests: write
14+
steps:
15+
- name: Add Auto-Merge Notice
16+
if: github.event.action == 'auto_merge_enabled'
17+
uses: bcgov/action-pr-description-add@v2.0.0
18+
with:
19+
add_markdown: |
20+
> [!IMPORTANT]
21+
> **Auto-merge enabled.**
22+
>
23+
> _This PR is set to merge automatically when all requirements are met._
24+
25+
- name: Remove Auto-Merge Notice
26+
if: github.event.action == 'auto_merge_disabled'
27+
uses: bcgov/action-pr-description-add@v2.0.0
28+
with:
29+
add_markdown: |
30+
> [!NOTE]
31+
> **Auto-merge may have been disabled. Please check the PR status to confirm.**

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
229229

230230
def _throttle_state_message(self) -> Optional[float]:
231231
"""
232-
Throttles the state message emission to once every 60 seconds.
232+
Throttles the state message emission to once every 600 seconds.
233233
"""
234234
current_time = time.time()
235-
if current_time - self._last_emission_time <= 60:
235+
if current_time - self._last_emission_time <= 600:
236236
return None
237237
return current_time
238238

airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,20 @@ def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
7878
"<cursor_field>" : "<cursor_value>"
7979
}
8080
"""
81-
if stream_state:
82-
for key, value in stream_state.items():
83-
if isinstance(value, dict):
84-
keys = list(value.keys())
85-
if len(keys) != 1:
86-
# The input partitioned state should only have one key
87-
return False
88-
if keys[0] != self._cursor_field:
89-
# Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
90-
return False
81+
if not stream_state:
82+
return False
83+
for key, value in stream_state.items():
84+
# it is expected the internal value to be a dictionary according to docstring
85+
if not isinstance(value, dict):
86+
return False
87+
keys = list(value.keys())
88+
if len(keys) != 1:
89+
# The input partitioned state should only have one key
90+
return False
91+
if keys[0] != self._cursor_field:
92+
# Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
93+
return False
94+
9195
return True
9296

9397
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,6 @@ def propagate_types_and_parameters(
120120
if found_type:
121121
propagated_component["type"] = found_type
122122

123-
# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
124-
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
125-
# be json_schema are not objects but we believe this is not likely in our case because:
126-
# * records are Mapping so objects hence SchemaLoader root should be an object
127-
# * connection_specification is a Mapping
128-
if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
129-
return propagated_component
130-
131123
# Combines parameters defined at the current level with parameters from parent components. Parameters at the current
132124
# level take precedence
133125
current_parameters = dict(copy.deepcopy(parent_parameters))
@@ -138,6 +130,27 @@ def propagate_types_and_parameters(
138130
else {**current_parameters, **component_parameters}
139131
)
140132

133+
# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
134+
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
135+
# be json_schema are not objects but we believe this is not likely in our case because:
136+
# * records are Mapping so objects hence SchemaLoader root should be an object
137+
# * connection_specification is a Mapping
138+
if self._is_json_schema_object(propagated_component):
139+
return propagated_component
140+
141+
# For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
142+
# For example, QueryProperties in requester.request_parameters, etc.
143+
# Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
144+
if "type" not in propagated_component:
145+
if self._has_nested_components(propagated_component):
146+
propagated_component = self._process_nested_components(
147+
propagated_component,
148+
parent_field_identifier,
149+
current_parameters,
150+
use_parent_parameters,
151+
)
152+
return propagated_component
153+
141154
# Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
142155
# both exist
143156
for parameter_key, parameter_value in current_parameters.items():
@@ -181,4 +194,33 @@ def propagate_types_and_parameters(
181194

182195
@staticmethod
183196
def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
184-
return propagated_component.get("type") == "object"
197+
return propagated_component.get("type") == "object" or propagated_component.get("type") == [
198+
"null",
199+
"object",
200+
]
201+
202+
@staticmethod
203+
def _has_nested_components(propagated_component: Dict[str, Any]) -> bool:
204+
for k, v in propagated_component.items():
205+
if isinstance(v, dict) and v.get("type"):
206+
return True
207+
return False
208+
209+
def _process_nested_components(
210+
self,
211+
propagated_component: Dict[str, Any],
212+
parent_field_identifier: str,
213+
current_parameters: Mapping[str, Any],
214+
use_parent_parameters: Optional[bool] = None,
215+
) -> Dict[str, Any]:
216+
for field_name, field_value in propagated_component.items():
217+
if isinstance(field_value, dict) and field_value.get("type"):
218+
nested_component_with_parameters = self.propagate_types_and_parameters(
219+
parent_field_identifier,
220+
field_value,
221+
current_parameters,
222+
use_parent_parameters=use_parent_parameters,
223+
)
224+
propagated_component[field_name] = nested_component_with_parameters
225+
226+
return propagated_component

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -928,9 +928,9 @@ def create_legacy_to_per_partition_state_migration(
928928
declarative_stream: DeclarativeStreamModel,
929929
) -> LegacyToPerPartitionStateMigration:
930930
retriever = declarative_stream.retriever
931-
if not isinstance(retriever, SimpleRetrieverModel):
931+
if not isinstance(retriever, (SimpleRetrieverModel, AsyncRetrieverModel)):
932932
raise ValueError(
933-
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got {type(retriever)}"
933+
f"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got {type(retriever)}"
934934
)
935935
partition_router = retriever.partition_router
936936
if not isinstance(
@@ -1484,6 +1484,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
14841484
stream_state_migrations=stream_state_migrations,
14851485
)
14861486
)
1487+
14871488
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
14881489
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
14891490
use_global_cursor = isinstance(
@@ -1993,14 +1994,31 @@ def _build_incremental_cursor(
19931994
) -> Optional[StreamSlicer]:
19941995
if model.incremental_sync and stream_slicer:
19951996
if model.retriever.type == "AsyncRetriever":
1997+
stream_name = model.name or ""
1998+
stream_namespace = None
1999+
stream_state = self._connector_state_manager.get_stream_state(
2000+
stream_name, stream_namespace
2001+
)
2002+
state_transformations = (
2003+
[
2004+
self._create_component_from_model(
2005+
state_migration, config, declarative_stream=model
2006+
)
2007+
for state_migration in model.state_migrations
2008+
]
2009+
if model.state_migrations
2010+
else []
2011+
)
2012+
19962013
return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
19972014
state_manager=self._connector_state_manager,
19982015
model_type=DatetimeBasedCursorModel,
19992016
component_definition=model.incremental_sync.__dict__,
2000-
stream_name=model.name or "",
2001-
stream_namespace=None,
2017+
stream_name=stream_name,
2018+
stream_namespace=stream_namespace,
20022019
config=config or {},
2003-
stream_state={},
2020+
stream_state=stream_state,
2021+
stream_state_migrations=state_transformations,
20042022
partition_router=stream_slicer,
20052023
)
20062024

unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3265,8 +3265,8 @@ def test_incremental_substream_request_options_provider(
32653265

32663266
def test_state_throttling(mocker):
32673267
"""
3268-
Verifies that _emit_state_message does not emit a new state if less than 60s
3269-
have passed since last emission, and does emit once 60s or more have passed.
3268+
Verifies that _emit_state_message does not emit a new state if less than 600s
3269+
have passed since last emission, and does emit once 600s or more have passed.
32703270
"""
32713271
cursor = ConcurrentPerPartitionCursor(
32723272
cursor_factory=MagicMock(),
@@ -3288,20 +3288,20 @@ def test_state_throttling(mocker):
32883288

32893289
mock_time = mocker.patch("time.time")
32903290

3291-
# First attempt: only 10 seconds passed => NO emission
3292-
mock_time.return_value = 10
3291+
# First attempt: only 100 seconds passed => NO emission
3292+
mock_time.return_value = 100
32933293
cursor._emit_state_message()
32943294
mock_connector_manager.update_state_for_stream.assert_not_called()
32953295
mock_repo.emit_message.assert_not_called()
32963296

3297-
# Second attempt: 30 seconds passed => still NO emission
3298-
mock_time.return_value = 30
3297+
# Second attempt: 300 seconds passed => still NO emission
3298+
mock_time.return_value = 300
32993299
cursor._emit_state_message()
33003300
mock_connector_manager.update_state_for_stream.assert_not_called()
33013301
mock_repo.emit_message.assert_not_called()
33023302

3303-
# Advance time: 70 seconds => exceed 60s => MUST emit
3304-
mock_time.return_value = 70
3303+
# Advance time: 700 seconds => exceed 600s => MUST emit
3304+
mock_time.return_value = 700
33053305
cursor._emit_state_message()
33063306
mock_connector_manager.update_state_for_stream.assert_called_once()
33073307
mock_repo.emit_message.assert_called_once()

unit_tests/sources/declarative/migrations/test_legacy_to_per_partition_migration.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,14 @@ def test_migrate_a_valid_legacy_state_to_per_partition():
189189
},
190190
id="test_should_not_migrate_if_the_partitioned_state_key_is_not_the_cursor_field",
191191
),
192+
pytest.param(
193+
{"last_changed": "2022-12-27T08:34:39+00:00"},
194+
id="test_should_not_migrate_if_the_partitioned_state_is_not_in_correct_format",
195+
),
196+
pytest.param(
197+
{},
198+
id="test_should_not_migrate_if_not_state_is_passed",
199+
),
192200
],
193201
)
194202
def test_should_not_migrate(input_state):
@@ -277,7 +285,7 @@ def _migrator_with_multiple_parent_streams():
277285
CustomPartitionRouter,
278286
True,
279287
ValueError,
280-
"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever. Got <class 'unittest.mock.MagicMock'>",
288+
"LegacyToPerPartitionStateMigrations can only be applied on a DeclarativeStream with a SimpleRetriever or AsyncRetriever. Got <class 'unittest.mock.MagicMock'>",
281289
),
282290
(
283291
SimpleRetriever,

0 commit comments

Comments
 (0)