From 3fdba6ef5f95c57509191b3c59ca8502f12e05b5 Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 10 Sep 2025 09:05:17 -0400 Subject: [PATCH 1/3] do not break on bad cursor value in perpartition cursor --- .../concurrent_partition_cursor.py | 14 ++++++-- .../test_concurrent_perpartitioncursor.py | 32 ++++++++++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 75669d5e3..e3af2e1e1 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -189,6 +189,7 @@ def __init__( # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided self._synced_some_data = False + self._logged_regarding_datetime_format_error = False @property def cursor_field(self) -> CursorField: @@ -518,10 +519,17 @@ def observe(self, record: Record) -> None: except ValueError: return + try: + record_cursor = self._connector_state_converter.output_format( + self._connector_state_converter.parse_value(record_cursor_value) + ) + except ValueError as exception: + if not self._logged_regarding_datetime_format_error: + logger.warning(f"Tried to parse cursor value `{record_cursor_value}` but go error: {exception}") + self._logged_regarding_datetime_format_error = True + return + self._synced_some_data = True - record_cursor = self._connector_state_converter.output_format( - self._connector_state_converter.parse_value(record_cursor_value) - ) self._update_global_cursor(record_cursor) if not self._use_global_cursor: self._cursor_per_partition[ diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index 2602c52fc..bbb6adae7 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -20,6 +20,7 @@ ConcurrentDeclarativeSource, ) from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor +from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( DeclarativePartition, @@ -29,7 +30,7 @@ from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, ) -from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.sources.types import StreamSlice, Record from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read @@ -4400,3 +4401,32 @@ def test_duplicate_partition_while_processing(): assert len(cursor._processing_partitions_indexes) == 0 assert len(cursor._partition_key_to_index) == 0 assert len(cursor._partitions_done_generating_stream_slices) == 0 + + +def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_break_sync(): + cursor_factory_mock = MagicMock() + cursor_factory_mock.create.side_effect = [_make_inner_cursor("2024-01-01T00:00:00Z")] + cursor = ConcurrentPerPartitionCursor( + cursor_factory=MagicMock(), + partition_router=ListPartitionRouter(values=["1"], cursor_field="partition_id", config={}, parameters={}), + stream_name="test_stream", + stream_namespace=None, + stream_state={}, + message_repository=MagicMock(), + connector_state_manager=MagicMock(), + connector_state_converter=CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ), + cursor_field=CursorField(cursor_field_key="updated_at"), + ) + + cursor.observe( + Record( + data={"updated_at": ""}, + stream_name="test_stream", + associated_slice=StreamSlice(partition={"partition_id": "1"}, cursor_slice={}) + ) + ) From c7c5a9503b232f9b2ca750c11eb5fb62bd10a0fa Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Wed, 10 Sep 2025 13:09:34 +0000 Subject: [PATCH 2/3] Auto-fix lint and format issues --- .../incremental/concurrent_partition_cursor.py | 4 +++- .../incremental/test_concurrent_perpartitioncursor.py | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index e3af2e1e1..4d7c1502e 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -525,7 +525,9 @@ def observe(self, record: Record) -> None: ) except ValueError as exception: if not self._logged_regarding_datetime_format_error: - logger.warning(f"Tried to parse cursor value `{record_cursor_value}` but go error: {exception}") + logger.warning( + f"Tried to parse cursor value `{record_cursor_value}` but go error: {exception}" + ) self._logged_regarding_datetime_format_error = True return diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index bbb6adae7..28b9b8460 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -30,7 +30,7 @@ from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, ) -from airbyte_cdk.sources.types import StreamSlice, Record +from airbyte_cdk.sources.types import Record, StreamSlice from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput, read @@ -4408,7 +4408,9 @@ def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_br cursor_factory_mock.create.side_effect = [_make_inner_cursor("2024-01-01T00:00:00Z")] cursor = ConcurrentPerPartitionCursor( cursor_factory=MagicMock(), - partition_router=ListPartitionRouter(values=["1"], cursor_field="partition_id", config={}, parameters={}), + partition_router=ListPartitionRouter( + values=["1"], cursor_field="partition_id", config={}, parameters={} + ), stream_name="test_stream", stream_namespace=None, stream_state={}, @@ -4427,6 +4429,6 @@ def test_given_record_with_bad_cursor_value_the_global_state_parsing_does_not_br Record( data={"updated_at": ""}, stream_name="test_stream", - associated_slice=StreamSlice(partition={"partition_id": "1"}, cursor_slice={}) + associated_slice=StreamSlice(partition={"partition_id": "1"}, cursor_slice={}), ) ) From 53a5a0e9f155cc570a798fcbef342648ae99b80c Mon Sep 17 00:00:00 2001 From: "maxime.c" Date: Wed, 10 Sep 2025 13:05:02 -0400 Subject: [PATCH 3/3] improve log message --- .../declarative/incremental/concurrent_partition_cursor.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 4d7c1502e..30e2c7eac 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -526,7 +526,11 @@ def observe(self, record: Record) -> None: except ValueError as exception: if not self._logged_regarding_datetime_format_error: logger.warning( - f"Tried to parse cursor value `{record_cursor_value}` but go error: {exception}" + "Skipping cursor update for stream '%s': failed to parse cursor field '%s' value %r: %s", + self._stream_name, + self._cursor_field.cursor_field_key, + record_cursor_value, + exception, ) self._logged_regarding_datetime_format_error = True return