diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 1d629f0c7..7accd1ac6 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -468,34 +468,11 @@ def _is_concurrent_cursor_incremental_without_partition_routing( def _get_retriever( declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] ) -> Retriever: - retriever = declarative_stream.retriever - - # This is an optimization so that we don't invoke any cursor or state management flows within the - # low-code framework because state management is handled through the ConcurrentCursor. - if declarative_stream and isinstance(retriever, SimpleRetriever): - # Also a temporary hack. In the legacy Stream implementation, as part of the read, - # set_initial_state() is called to instantiate incoming state on the cursor. Although we no - # longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components - # like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is - # properly initialized with state. - if retriever.cursor: - retriever.cursor.set_initial_state(stream_state=stream_state) - - # Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance - # from the one initialized on the SimpleRetriever, so it also must also have state initialized - # for semi-incremental streams using is_client_side_incremental to filter properly - if isinstance(retriever.record_selector, RecordSelector) and isinstance( - retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator - ): - retriever.record_selector.record_filter._cursor.set_initial_state( - stream_state=stream_state - ) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds - + if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever): # We zero it out here, but since this is a cursor reference, the state is still properly # instantiated for the other components that reference it - retriever.cursor = None - - return retriever + declarative_stream.retriever.cursor = None + return declarative_stream.retriever @staticmethod def _select_streams( diff --git a/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte_cdk/sources/declarative/extractors/record_filter.py index 373669612..943068f87 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -4,12 +4,8 @@ from dataclasses import InitVar, dataclass from typing import Any, Iterable, Mapping, Optional, Union -from airbyte_cdk.sources.declarative.incremental import ( - DatetimeBasedCursor, - GlobalSubstreamCursor, - PerPartitionWithGlobalCursor, -) from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState @@ -53,13 +49,13 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter): """ Applies a filter to a list of records to exclude those that are older than the stream_state/start_date. - :param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values + :param Cursor cursor: Cursor used to filter out values :param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state """ def __init__( self, - cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor], + cursor: Union[Cursor], **kwargs: Any, ): super().__init__(**kwargs) @@ -77,7 +73,7 @@ def filter_records( for record in records if self._cursor.should_be_synced( # Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here - # Record stream name is empty cause it is not used durig the filtering + # Record stream name is empty because it is not used during the filtering Record(data=record, associated_slice=stream_slice, stream_name="") ) ) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 77c65523a..2a7cfd1d3 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -81,6 +81,7 @@ def __init__( connector_state_converter: AbstractStreamStateConverter, cursor_field: CursorField, use_global_cursor: bool = False, + attempt_to_create_cursor_if_not_provided: bool = False, ) -> None: self._global_cursor: Optional[StreamState] = {} self._stream_name = stream_name @@ -125,6 +126,9 @@ def __init__( self._set_initial_state(stream_state) + # FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones + self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided + @property def cursor_field(self) -> CursorField: return self._cursor_field @@ -512,13 +516,28 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor: raise ValueError( "Invalid state as stream slices that are emitted should refer to an existing cursor" ) + + if self._use_global_cursor: + return self._create_cursor( + self._global_cursor, + self._lookback_window if self._global_cursor else 0, + ) + partition_key = self._to_partition_key(record.associated_slice.partition) - if partition_key not in self._cursor_per_partition: + if ( + partition_key not in self._cursor_per_partition + and not self._attempt_to_create_cursor_if_not_provided + ): raise ValueError( "Invalid state as stream slices that are emitted should refer to an existing cursor" ) - cursor = self._cursor_per_partition[partition_key] - return cursor + elif partition_key not in self._cursor_per_partition: + return self._create_cursor( + self._global_cursor, + self._lookback_window if self._global_cursor else 0, + ) + else: + return self._cursor_per_partition[partition_key] def limit_reached(self) -> bool: return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 628bea575..5d2415525 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -34,7 +34,6 @@ ) from airbyte_cdk.models import FailureType, Level from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager -from airbyte_cdk.sources.declarative import transformations from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository @@ -604,7 +603,7 @@ WeekClampingStrategy, Weekday, ) -from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, DateTimeStreamStateConverter, @@ -1475,6 +1474,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor( stream_namespace: Optional[str], config: Config, message_repository: Optional[MessageRepository] = None, + stream_state_migrations: Optional[List[Any]] = None, **kwargs: Any, ) -> ConcurrentCursor: # Per-partition incremental streams can dynamically create child cursors which will pass their current @@ -1485,6 +1485,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor( if "stream_state" not in kwargs else kwargs["stream_state"] ) + stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state) component_type = component_definition.get("type") if component_definition.get("type") != model_type.__name__: @@ -1561,6 +1562,7 @@ def create_concurrent_cursor_from_perpartition_cursor( stream_state: MutableMapping[str, Any], partition_router: PartitionRouter, stream_state_migrations: Optional[List[Any]] = None, + attempt_to_create_cursor_if_not_provided: bool = False, **kwargs: Any, ) -> ConcurrentPerPartitionCursor: component_type = component_definition.get("type") @@ -1631,6 +1633,7 @@ def create_concurrent_cursor_from_perpartition_cursor( connector_state_converter=connector_state_converter, cursor_field=cursor_field, use_global_cursor=use_global_cursor, + attempt_to_create_cursor_if_not_provided=attempt_to_create_cursor_if_not_provided, ) @staticmethod @@ -1931,30 +1934,17 @@ def create_declarative_stream( and hasattr(model.incremental_sync, "is_data_feed") and model.incremental_sync.is_data_feed ) - client_side_incremental_sync = None - if ( + client_side_filtering_enabled = ( model.incremental_sync and hasattr(model.incremental_sync, "is_client_side_incremental") and model.incremental_sync.is_client_side_incremental - ): - supported_slicers = ( - DatetimeBasedCursor, - GlobalSubstreamCursor, - PerPartitionWithGlobalCursor, - ) - if combined_slicers and not isinstance(combined_slicers, supported_slicers): - raise ValueError( - "Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead" - ) - cursor = ( - combined_slicers - if isinstance( - combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor) - ) - else self._create_component_from_model(model=model.incremental_sync, config=config) + ) + concurrent_cursor = None + if stop_condition_on_cursor or client_side_filtering_enabled: + stream_slicer = self._build_stream_slicer_from_partition_router( + model.retriever, config, stream_name=model.name ) - - client_side_incremental_sync = {"cursor": cursor} + concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config) if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel): cursor_model = model.incremental_sync @@ -2029,8 +2019,10 @@ def create_declarative_stream( primary_key=primary_key, stream_slicer=combined_slicers, request_options_provider=request_options_provider, - stop_condition_on_cursor=stop_condition_on_cursor, - client_side_incremental_sync=client_side_incremental_sync, + stop_condition_cursor=concurrent_cursor, + client_side_incremental_sync={"cursor": concurrent_cursor} + if client_side_filtering_enabled + else None, transformations=transformations, file_uploader=file_uploader, incremental_sync=model.incremental_sync, @@ -2185,6 +2177,67 @@ def _build_incremental_cursor( return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync return None + def _build_concurrent_cursor( + self, + model: DeclarativeStreamModel, + stream_slicer: Optional[PartitionRouter], + config: Config, + ) -> Optional[StreamSlicer]: + stream_state = self._connector_state_manager.get_stream_state( + stream_name=model.name or "", namespace=None + ) + + if model.incremental_sync and stream_slicer: + # FIXME there is a discrepancy where this logic is applied on the create_*_cursor methods for + # ConcurrentCursor but it is applied outside of create_concurrent_cursor_from_perpartition_cursor + if model.state_migrations: + state_transformations = [ + self._create_component_from_model( + state_migration, config, declarative_stream=model + ) + for state_migration in model.state_migrations + ] + else: + state_transformations = [] + + return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing + state_manager=self._connector_state_manager, + model_type=DatetimeBasedCursorModel, + component_definition=model.incremental_sync.__dict__, + stream_name=model.name or "", + stream_namespace=None, + config=config or {}, + stream_state=stream_state, + stream_state_migrations=state_transformations, + partition_router=stream_slicer, + attempt_to_create_cursor_if_not_provided=True, + ) + elif model.incremental_sync: + if type(model.incremental_sync) == IncrementingCountCursorModel: + return self.create_concurrent_cursor_from_incrementing_count_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing + model_type=IncrementingCountCursorModel, + component_definition=model.incremental_sync.__dict__, + stream_name=model.name or "", + stream_namespace=None, + config=config or {}, + stream_state_migrations=model.state_migrations, + ) + elif type(model.incremental_sync) == DatetimeBasedCursorModel: + return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing + model_type=type(model.incremental_sync), + component_definition=model.incremental_sync.__dict__, + stream_name=model.name or "", + stream_namespace=None, + config=config or {}, + stream_state_migrations=model.state_migrations, + attempt_to_create_cursor_if_not_provided=True, + ) + else: + raise ValueError( + f"Incremental sync of type {type(model.incremental_sync)} is not supported" + ) + return None + def _build_resumable_cursor( self, model: Union[ @@ -2285,7 +2338,7 @@ def create_default_paginator( url_base: str, extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None, decoder: Optional[Decoder] = None, - cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None, + cursor_used_for_stop_condition: Optional[Cursor] = None, ) -> Union[DefaultPaginator, PaginatorTestReadDecorator]: if decoder: if self._is_supported_decoder_for_pagination(decoder): @@ -3146,7 +3199,7 @@ def create_simple_retriever( primary_key: Optional[Union[str, List[str], List[List[str]]]], stream_slicer: Optional[StreamSlicer], request_options_provider: Optional[RequestOptionsProvider] = None, - stop_condition_on_cursor: bool = False, + stop_condition_cursor: Optional[Cursor] = None, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], file_uploader: Optional[DefaultFileUploader] = None, @@ -3277,7 +3330,6 @@ def _get_url() -> str: ), ) - cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None paginator = ( self._create_component_from_model( model=model.paginator, @@ -3285,7 +3337,7 @@ def _get_url() -> str: url_base=_get_url(), extractor_model=model.record_selector.extractor, decoder=decoder, - cursor_used_for_stop_condition=cursor_used_for_stop_condition, + cursor_used_for_stop_condition=stop_condition_cursor or None, ) if model.paginator else NoPagination(parameters={}) diff --git a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py index 7c89ba552..068df72cb 100644 --- a/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py +++ b/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py @@ -7,11 +7,10 @@ import requests -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import ( PaginationStrategy, ) -from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.types import Record @@ -29,8 +28,7 @@ def is_met(self, record: Record) -> bool: class CursorStopCondition(PaginationStopCondition): def __init__( self, - cursor: DeclarativeCursor - | ConcurrentCursor, # migrate to use both old and concurrent versions + cursor: Cursor, ): self._cursor = cursor diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py index a70169197..0ae9178c0 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py @@ -311,3 +311,6 @@ def set_initial_state(self, value: StreamState) -> None: def ensure_at_least_one_state_emitted(self) -> None: self.emit_state_message() + + def should_be_synced(self, record: Record) -> bool: + return True diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py index e219292d1..b9cb621a5 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_final_state_cursor.py @@ -81,3 +81,6 @@ def ensure_at_least_one_state_emitted(self) -> None: self._stream_name, self._stream_namespace ) self._message_repository.emit_message(state_message) + + def should_be_synced(self, record: Record) -> bool: + return True diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index 88d15bc8a..318076835 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -74,6 +74,10 @@ def ensure_at_least_one_state_emitted(self) -> None: """ raise NotImplementedError() + @abstractmethod + def should_be_synced(self, record: Record) -> bool: + pass + def stream_slices(self) -> Iterable[StreamSlice]: """ Default placeholder implementation of generate_slices. @@ -123,6 +127,9 @@ def ensure_at_least_one_state_emitted(self) -> None: ) self._message_repository.emit_message(state_message) + def should_be_synced(self, record: Record) -> bool: + return True + class ConcurrentCursor(Cursor): _START_BOUNDARY = 0 @@ -192,9 +199,23 @@ def _get_concurrent_state( self, state: MutableMapping[str, Any] ) -> Tuple[CursorValueType, MutableMapping[str, Any]]: if self._connector_state_converter.is_state_message_compatible(state): + partitioned_state = self._connector_state_converter.deserialize(state) + slices_from_partitioned_state = partitioned_state.get("slices", []) + + value_from_partitioned_state = None + if slices_from_partitioned_state: + # We assume here that the slices have been already merged + first_slice = slices_from_partitioned_state[0] + value_from_partitioned_state = ( + first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] + if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice + else first_slice[self._connector_state_converter.END_KEY] + ) return ( - self._start or self._connector_state_converter.zero_value, - self._connector_state_converter.deserialize(state), + value_from_partitioned_state + or self._start + or self._connector_state_converter.zero_value, + partitioned_state, ) return self._connector_state_converter.convert_from_sequential_state( self._cursor_field, state, self._start diff --git a/unit_tests/sources/declarative/extractors/test_record_filter.py b/unit_tests/sources/declarative/extractors/test_record_filter.py index 03274e732..9f0cf46d9 100644 --- a/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -1,7 +1,9 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from datetime import datetime, timedelta, timezone from typing import List, Mapping, Optional +from unittest.mock import Mock import pytest @@ -11,10 +13,8 @@ RecordFilter, ) from airbyte_cdk.sources.declarative.incremental import ( - CursorFactory, - DatetimeBasedCursor, - GlobalSubstreamCursor, - PerPartitionWithGlobalCursor, + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, ) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import ( @@ -24,7 +24,12 @@ ) from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter from airbyte_cdk.sources.declarative.types import StreamSlice +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + CustomFormatConcurrentStreamStateConverter, +) from airbyte_cdk.sources.types import Record +from airbyte_cdk.utils.datetime_helpers import ab_datetime_now, ab_datetime_parse DATE_FORMAT = "%Y-%m-%d" RECORDS_TO_FILTER_DATE_FORMAT = [ @@ -272,25 +277,27 @@ def test_client_side_record_filter_decorator_no_parent_stream( records_to_filter: List[Mapping], expected_record_ids: List[int], ): - date_time_based_cursor = DatetimeBasedCursor( - start_datetime=MinMaxDatetime( - datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={} + datetime_based_cursor = ConcurrentCursor( + stream_name="any_stream", + stream_namespace=None, + stream_state=stream_state, + message_repository=Mock(), + connector_state_manager=Mock(), + connector_state_converter=CustomFormatConcurrentStreamStateConverter( + datetime_format=datetime_format ), - end_datetime=MinMaxDatetime(datetime=end_datetime, parameters={}) if end_datetime else None, - step="P10Y", - cursor_field=InterpolatedString.create("created_at", parameters={}), - datetime_format=datetime_format, - cursor_granularity="P1D", - config={}, - parameters={}, + cursor_field=CursorField("created_at"), + slice_boundary_fields=("start", "end"), + start=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_provider=lambda: ab_datetime_parse(end_datetime) if end_datetime else ab_datetime_now(), + slice_range=timedelta(days=365 * 10), ) - date_time_based_cursor.set_initial_state(stream_state) record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( config={}, condition=record_filter_expression, parameters={}, - cursor=date_time_based_cursor, + cursor=datetime_based_cursor, ) filtered_records = list( @@ -341,7 +348,7 @@ def test_client_side_record_filter_decorator_no_parent_stream( } ], }, - "per_partition_with_global", + "global_substream", [2, 3], ), # Use PerPartitionWithGlobalCursor with partition state missing, global cursor used @@ -363,23 +370,26 @@ def test_client_side_record_filter_decorator_no_parent_stream( def test_client_side_record_filter_decorator_with_cursor_types( stream_state: Optional[Mapping], cursor_type: str, expected_record_ids: List[int] ): - def date_time_based_cursor_factory() -> DatetimeBasedCursor: - return DatetimeBasedCursor( - start_datetime=MinMaxDatetime( - datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={} - ), - end_datetime=MinMaxDatetime( - datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={} + def date_time_based_cursor_factory(stream_state, runtime_lookback_window) -> ConcurrentCursor: + return ConcurrentCursor( + stream_name="any_stream", + stream_namespace=None, + stream_state=stream_state, + message_repository=Mock(), + connector_state_manager=Mock(), + connector_state_converter=CustomFormatConcurrentStreamStateConverter( + datetime_format=DATE_FORMAT ), - step="P10Y", - cursor_field=InterpolatedString.create("created_at", parameters={}), - datetime_format=DATE_FORMAT, - cursor_granularity="P1D", - config={}, - parameters={}, + cursor_field=CursorField("created_at"), + slice_boundary_fields=("start", "end"), + start=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_provider=lambda: datetime(2021, 1, 5, tzinfo=timezone.utc), + slice_range=timedelta(days=365 * 10), + cursor_granularity=timedelta(days=1), + lookback_window=runtime_lookback_window, ) - date_time_based_cursor = date_time_based_cursor_factory() + date_time_based_cursor = date_time_based_cursor_factory(stream_state, timedelta(0)) substream_cursor = None partition_router = SubstreamPartitionRouter( @@ -401,29 +411,26 @@ def date_time_based_cursor_factory() -> DatetimeBasedCursor: if cursor_type == "datetime": # Use only DatetimeBasedCursor pass # No additional cursor needed - elif cursor_type == "global_substream": + elif cursor_type in ["global_substream", "per_partition_with_global"]: # Create GlobalSubstreamCursor instance - substream_cursor = GlobalSubstreamCursor( - stream_cursor=date_time_based_cursor, + substream_cursor = ConcurrentPerPartitionCursor( + cursor_factory=ConcurrentCursorFactory(date_time_based_cursor_factory), partition_router=partition_router, - ) - if stream_state: - substream_cursor.set_initial_state(stream_state) - elif cursor_type == "per_partition_with_global": - # Create PerPartitionWithGlobalCursor instance - substream_cursor = PerPartitionWithGlobalCursor( - cursor_factory=CursorFactory(date_time_based_cursor_factory), - partition_router=partition_router, - stream_cursor=date_time_based_cursor, + stream_name="a_stream", + stream_namespace=None, + stream_state=stream_state, + message_repository=Mock(), + connector_state_manager=Mock(), + connector_state_converter=CustomFormatConcurrentStreamStateConverter( + datetime_format=DATE_FORMAT + ), + cursor_field=CursorField("created_at"), + use_global_cursor=cursor_type == "global_substream", + attempt_to_create_cursor_if_not_provided=True, ) else: raise ValueError(f"Unsupported cursor type: {cursor_type}") - if substream_cursor and stream_state: - substream_cursor.set_initial_state(stream_state) - elif stream_state: - date_time_based_cursor.set_initial_state(stream_state) - # Create the record_filter_decorator with appropriate cursor record_filter_decorator = ClientSideIncrementalRecordFilterDecorator( config={}, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 4ac0b11e7..f7df8d6d4 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -188,7 +188,12 @@ transformer = ManifestComponentTransformer() -input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]} +input_config = { + "apikey": "verysecrettoken", + "repos": ["airbyte", "airbyte-cloud"], + "start_time": "2024-01-01T00:00:00.000+00:00", + "end_time": "2025-01-01T00:00:00.000+00:00", +} def get_factory_with_parameters( @@ -1424,7 +1429,7 @@ def test_client_side_incremental_with_partition_router(): assert stream.retriever.record_selector.transform_before_filtering == True assert isinstance( stream.retriever.record_selector.record_filter._cursor, - PerPartitionWithGlobalCursor, + ConcurrentPerPartitionCursor, ) @@ -1997,7 +2002,7 @@ def test_create_default_paginator(): "subcomponent_field_with_hint", DpathExtractor( field_path=[], - config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}, + config=input_config, decoder=JsonDecoder(parameters={}), parameters={}, ), @@ -2013,7 +2018,7 @@ def test_create_default_paginator(): "subcomponent_field_with_hint", DpathExtractor( field_path=[], - config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}, + config=input_config, parameters={}, ), None, @@ -2102,11 +2107,11 @@ def test_create_default_paginator(): pagination_strategy=OffsetIncrement( page_size=10, extractor=None, - config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}, + config=input_config, parameters={}, ), url_base="https://physical_100.com", - config={"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]}, + config=input_config, parameters={"decoder": {"type": "JsonDecoder"}}, ), None, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index fec9170ed..bdfefbd80 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -1891,9 +1891,7 @@ def test_stream_using_is_client_side_incremental_has_cursor_state(): simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever record_filter = simple_retriever.record_selector.record_filter assert isinstance(record_filter, ClientSideIncrementalRecordFilterDecorator) - client_side_incremental_cursor_state = record_filter._cursor._cursor - - assert client_side_incremental_cursor_state == expected_cursor_value + assert list(record_filter._cursor.state.values()) == [expected_cursor_value] @pytest.mark.parametrize( diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index ddca1c689..326deab49 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1779,59 +1779,50 @@ def test_close_partition_with_slice_range_granularity_concurrent_cursor_from_dat assert state_manager.update_state_for_stream.call_count == 3 +_SHOULD_BE_SYNCED_START = 10 + + @pytest.mark.parametrize( "record, should_be_synced", [ [ Record( - data={_A_CURSOR_FIELD_KEY: 10}, + data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START}, stream_name="test_stream", - associated_slice=_partition( - {_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10} - ).to_slice(), ), True, ], [ Record( - data={_A_CURSOR_FIELD_KEY: 9}, + data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START - 1}, stream_name="test_stream", - associated_slice=_partition( - {_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10} - ).to_slice(), ), False, ], [ Record( - data={_A_CURSOR_FIELD_KEY: 21}, + data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START + 1}, stream_name="test_stream", - associated_slice=_partition( - {_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10} - ).to_slice(), ), - False, + True, ], [ Record( data={"not_a_cursor_field": "some_data"}, stream_name="test_stream", - associated_slice=_partition( - {_LOWER_SLICE_BOUNDARY_FIELD: 0, _UPPER_SLICE_BOUNDARY_FIELD: 10} - ).to_slice(), ), True, ], ], ids=[ "with_cursor_field_inside_range", - "with_cursor_field_lower_than_range", - "with_cursor_field_higher_than_range", + "with_cursor_field_lower_than_start", + "with_cursor_field_higher_than_end", "no_cursor", ], ) @freezegun.freeze_time(time_to_freeze=datetime.fromtimestamp(20, timezone.utc)) -def test_should_be_synced(record: Record, should_be_synced: bool): +def test_should_be_synced_non_partitioned_state_no_state(record: Record, should_be_synced: bool): cursor = ConcurrentCursor( _A_STREAM_NAME, _A_STREAM_NAMESPACE, @@ -1841,8 +1832,217 @@ def test_should_be_synced(record: Record, should_be_synced: bool): EpochValueConcurrentStreamStateConverter(True), CursorField(_A_CURSOR_FIELD_KEY), _SLICE_BOUNDARY_FIELDS, - datetime.fromtimestamp(10, timezone.utc), + datetime.fromtimestamp(_SHOULD_BE_SYNCED_START, timezone.utc), EpochValueConcurrentStreamStateConverter.get_end_provider(), _NO_LOOKBACK_WINDOW, ) assert cursor.should_be_synced(record) == should_be_synced + + +def test_given_state_when_should_be_synced_then_use_cursor_value_to_filter(): + state_value = _SHOULD_BE_SYNCED_START + 5 + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + {_A_CURSOR_FIELD_KEY: state_value}, + Mock(spec=MessageRepository), + Mock(spec=ConnectorStateManager), + EpochValueConcurrentStreamStateConverter(True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + datetime.fromtimestamp(_SHOULD_BE_SYNCED_START, timezone.utc), + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: state_value - 1}, stream_name="test_stream") + ) + == False + ) + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: state_value}, stream_name="test_stream") + ) + == True + ) + + +def test_given_partitioned_state_without_slices_nor_start_when_should_be_synced_then_use_zero_value_to_filter(): + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "slices": [], + "state_type": "date-range", + }, + Mock(spec=MessageRepository), + Mock(spec=ConnectorStateManager), + EpochValueConcurrentStreamStateConverter(True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + None, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + assert ( + cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: -1}, stream_name="test_stream")) + == False + ) + assert ( + cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: 0}, stream_name="test_stream")) + == True + ) + + +def test_given_partitioned_state_without_slices_but_start_when_should_be_synced_then_use_start_value_to_filter(): + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "slices": [], + "state_type": "date-range", + }, + Mock(spec=MessageRepository), + Mock(spec=ConnectorStateManager), + EpochValueConcurrentStreamStateConverter(True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + datetime.fromtimestamp(_SHOULD_BE_SYNCED_START, timezone.utc), + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + assert ( + cursor.should_be_synced( + Record( + data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START - 1}, stream_name="test_stream" + ) + ) + == False + ) + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START}, stream_name="test_stream") + ) + == True + ) + + +def test_given_partitioned_state_with_one_slice_and_most_recent_cursor_value_when_should_be_synced_then_use_most_recent_cursor_value_of_slice_to_filter(): + most_recent_cursor_value = 5 + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "slices": [ + {"end": 10, "most_recent_cursor_value": most_recent_cursor_value, "start": 0}, + ], + "state_type": "date-range", + }, + Mock(spec=MessageRepository), + Mock(spec=ConnectorStateManager), + EpochValueConcurrentStreamStateConverter(True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + datetime.fromtimestamp(_SHOULD_BE_SYNCED_START, timezone.utc), + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + assert ( + cursor.should_be_synced( + Record( + data={_A_CURSOR_FIELD_KEY: most_recent_cursor_value - 1}, stream_name="test_stream" + ) + ) + == False + ) + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: most_recent_cursor_value}, stream_name="test_stream") + ) + == True + ) + + +def test_given_partitioned_state_with_one_slice_without_most_recent_cursor_value_when_should_be_synced_then_use_upper_boundary_of_slice_to_filter(): + slice_end = 5 + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "slices": [ + {"end": slice_end, "start": 0}, + ], + "state_type": "date-range", + }, + Mock(spec=MessageRepository), + Mock(spec=ConnectorStateManager), + EpochValueConcurrentStreamStateConverter(True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + datetime.fromtimestamp(_SHOULD_BE_SYNCED_START, timezone.utc), + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: slice_end - 1}, stream_name="test_stream") + ) + == False + ) + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: slice_end}, stream_name="test_stream") + ) + == True + ) + + +def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_first_slice_to_filter(): + first_slice_end = 5 + second_slice_start = first_slice_end + 10 + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + { + "slices": [ + {"end": first_slice_end, "start": 0}, + {"end": first_slice_end + 100, "start": second_slice_start}, + ], + "state_type": "date-range", + }, + Mock(spec=MessageRepository), + Mock(spec=ConnectorStateManager), + EpochValueConcurrentStreamStateConverter(True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + datetime.fromtimestamp(_SHOULD_BE_SYNCED_START, timezone.utc), + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: first_slice_end - 1}, stream_name="test_stream") + ) + == False + ) + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream") + ) + == True + ) + # even if this is within a boundary that has been synced, we don't take any chance and we sync it + # anyway in most cases, it shouldn't be pulled because we query for specific slice boundaries to the API + assert ( + cursor.should_be_synced( + Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream") + ) + == True + ) diff --git a/unit_tests/sources/streams/test_stream_read.py b/unit_tests/sources/streams/test_stream_read.py index ac11b7499..bf13ac351 100644 --- a/unit_tests/sources/streams/test_stream_read.py +++ b/unit_tests/sources/streams/test_stream_read.py @@ -147,6 +147,9 @@ def close_partition(self, partition: Partition) -> None: def ensure_at_least_one_state_emitted(self) -> None: pass + def should_be_synced(self, record: Record) -> bool: + return True + def _stream(slice_to_partition_mapping, slice_logger, logger, message_repository, json_schema=None): return _MockStream(slice_to_partition_mapping, json_schema=json_schema)