diff --git a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py index ccff41ba7..7489eaf40 100644 --- a/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py +++ b/airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py @@ -71,6 +71,10 @@ def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, An for stream_slice in state.get("slices", []): stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY]) stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY]) + if self.MOST_RECENT_RECORD_KEY in stream_slice: + stream_slice[self.MOST_RECENT_RECORD_KEY] = self._from_state_message( + stream_slice[self.MOST_RECENT_RECORD_KEY] + ) return state def serialize( diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index e87976964..ddca1c689 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +from copy import deepcopy from datetime import datetime, timedelta, timezone from functools import partial from typing import Any, Mapping, Optional @@ -84,7 +84,7 @@ def _cursor_with_slice_boundary_fields( return ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, - {}, + deepcopy(_NO_STATE), self._message_repository, self._state_manager, EpochValueConcurrentStreamStateConverter(is_sequential_state), @@ -99,7 +99,7 @@ def _cursor_without_slice_boundary_fields(self) -> ConcurrentCursor: return ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, - {}, + deepcopy(_NO_STATE), self._message_repository, self._state_manager, EpochValueConcurrentStreamStateConverter(is_sequential_state=True), @@ -265,7 +265,7 @@ def test_given_no_state_when_generate_slices_then_create_slice_from_start_to_end cursor = ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, - _NO_STATE, + deepcopy(_NO_STATE), self._message_repository, self._state_manager, EpochValueConcurrentStreamStateConverter(is_sequential_state=False), @@ -950,6 +950,60 @@ def test_given_initial_state_is_sequential_and_start_provided_when_generate_slic }, # State message is updated to the legacy format before being emitted ) + @freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(50, timezone.utc)) + def test_given_most_recent_cursor_value_in_input_state_when_emit_state_then_serialize_state_properly( + self, + ) -> None: + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "slices": [ + { + EpochValueConcurrentStreamStateConverter.START_KEY: 0, + EpochValueConcurrentStreamStateConverter.END_KEY: 20, + EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15, + }, + ], + }, + self._message_repository, + self._state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=False), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + datetime.fromtimestamp(0, timezone.utc), + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + cursor.close_partition( + _partition( + StreamSlice( + partition={}, + cursor_slice={ + _LOWER_SLICE_BOUNDARY_FIELD: 20, + _UPPER_SLICE_BOUNDARY_FIELD: 50, + }, + ), + _stream_name=_A_STREAM_NAME, + ) + ) + + expected_state = { + "state_type": ConcurrencyCompatibleStateType.date_range.value, + "slices": [ + { + EpochValueConcurrentStreamStateConverter.START_KEY: 0, + EpochValueConcurrentStreamStateConverter.END_KEY: 50, + EpochValueConcurrentStreamStateConverter.MOST_RECENT_RECORD_KEY: 15, + }, + ], + } + self._state_manager.update_state_for_stream.assert_called_once_with( + _A_STREAM_NAME, _A_STREAM_NAMESPACE, expected_state + ) + class ClampingIntegrationTest(TestCase): def setUp(self) -> None: