diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index fbdf6a296..0aae6be8a 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -229,10 +229,10 @@ def ensure_at_least_one_state_emitted(self) -> None: def _throttle_state_message(self) -> Optional[float]: """ - Throttles the state message emission to once every 60 seconds. + Throttles the state message emission to once every 600 seconds. """ current_time = time.time() - if current_time - self._last_emission_time <= 60: + if current_time - self._last_emission_time <= 600: return None return current_time diff --git a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py index bca9d78b2..b54fc4779 100644 --- a/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py +++ b/unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py @@ -3265,8 +3265,8 @@ def test_incremental_substream_request_options_provider( def test_state_throttling(mocker): """ - Verifies that _emit_state_message does not emit a new state if less than 60s - have passed since last emission, and does emit once 60s or more have passed. + Verifies that _emit_state_message does not emit a new state if less than 600s + have passed since last emission, and does emit once 600s or more have passed. """ cursor = ConcurrentPerPartitionCursor( cursor_factory=MagicMock(), @@ -3288,20 +3288,20 @@ def test_state_throttling(mocker): mock_time = mocker.patch("time.time") - # First attempt: only 10 seconds passed => NO emission - mock_time.return_value = 10 + # First attempt: only 100 seconds passed => NO emission + mock_time.return_value = 100 cursor._emit_state_message() mock_connector_manager.update_state_for_stream.assert_not_called() mock_repo.emit_message.assert_not_called() - # Second attempt: 30 seconds passed => still NO emission - mock_time.return_value = 30 + # Second attempt: 300 seconds passed => still NO emission + mock_time.return_value = 300 cursor._emit_state_message() mock_connector_manager.update_state_for_stream.assert_not_called() mock_repo.emit_message.assert_not_called() - # Advance time: 70 seconds => exceed 60s => MUST emit - mock_time.return_value = 70 + # Advance time: 700 seconds => exceed 600s => MUST emit + mock_time.return_value = 700 cursor._emit_state_message() mock_connector_manager.update_state_for_stream.assert_called_once() mock_repo.emit_message.assert_called_once()