Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 3 additions & 26 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,34 +468,11 @@ def _is_concurrent_cursor_incremental_without_partition_routing(
def _get_retriever(
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
) -> Retriever:
retriever = declarative_stream.retriever

# This is an optimization so that we don't invoke any cursor or state management flows within the
# low-code framework because state management is handled through the ConcurrentCursor.
if declarative_stream and isinstance(retriever, SimpleRetriever):
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
# properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)

# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
# for semi-incremental streams using is_client_side_incremental to filter properly
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
):
retriever.record_selector.record_filter._cursor.set_initial_state(
stream_state=stream_state
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds

if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever):
# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None

return retriever
declarative_stream.retriever.cursor = None
Comment thread
maxi297 marked this conversation as resolved.
return declarative_stream.retriever

@staticmethod
def _select_streams(
Expand Down
12 changes: 4 additions & 8 deletions airbyte_cdk/sources/declarative/extractors/record_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.incremental import (
DatetimeBasedCursor,
GlobalSubstreamCursor,
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState


Expand Down Expand Up @@ -53,13 +49,13 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
"""
Applies a filter to a list of records to exclude those that are older than the stream_state/start_date.

:param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values
:param Cursor cursor: Cursor used to filter out values
:param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state
"""

def __init__(
self,
cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor],
cursor: Union[Cursor],
**kwargs: Any,
):
super().__init__(**kwargs)
Expand All @@ -77,7 +73,7 @@ def filter_records(
for record in records
if self._cursor.should_be_synced(
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
# Record stream name is empty cause it is not used durig the filtering
# Record stream name is empty because it is not used during the filtering
Record(data=record, associated_slice=stream_slice, stream_name="")
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
connector_state_converter: AbstractStreamStateConverter,
cursor_field: CursorField,
use_global_cursor: bool = False,
attempt_to_create_cursor_if_not_provided: bool = False,
) -> None:
self._global_cursor: Optional[StreamState] = {}
self._stream_name = stream_name
Expand Down Expand Up @@ -125,6 +126,9 @@ def __init__(

self._set_initial_state(stream_state)

# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided

@property
def cursor_field(self) -> CursorField:
return self._cursor_field
Expand Down Expand Up @@ -512,13 +516,28 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)

if self._use_global_cursor:
Comment thread
maxi297 marked this conversation as resolved.
return self._create_cursor(
self._global_cursor,
self._lookback_window if self._global_cursor else 0,
)

partition_key = self._to_partition_key(record.associated_slice.partition)
if partition_key not in self._cursor_per_partition:
if (
partition_key not in self._cursor_per_partition
and not self._attempt_to_create_cursor_if_not_provided
):
raise ValueError(
"Invalid state as stream slices that are emitted should refer to an existing cursor"
)
cursor = self._cursor_per_partition[partition_key]
return cursor
elif partition_key not in self._cursor_per_partition:
return self._create_cursor(
self._global_cursor,
self._lookback_window if self._global_cursor else 0,
)
else:
return self._cursor_per_partition[partition_key]

def limit_reached(self) -> bool:
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
104 changes: 77 additions & 27 deletions airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
)
from airbyte_cdk.models import FailureType, Level
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative import transformations
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
Expand Down Expand Up @@ -604,7 +603,7 @@
WeekClampingStrategy,
Weekday,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, CursorField
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField
from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import (
CustomFormatConcurrentStreamStateConverter,
DateTimeStreamStateConverter,
Expand Down Expand Up @@ -1561,6 +1560,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
stream_state: MutableMapping[str, Any],
partition_router: PartitionRouter,
stream_state_migrations: Optional[List[Any]] = None,
attempt_to_create_cursor_if_not_provided: bool = False,
**kwargs: Any,
) -> ConcurrentPerPartitionCursor:
component_type = component_definition.get("type")
Expand Down Expand Up @@ -1631,6 +1631,7 @@ def create_concurrent_cursor_from_perpartition_cursor(
connector_state_converter=connector_state_converter,
cursor_field=cursor_field,
use_global_cursor=use_global_cursor,
attempt_to_create_cursor_if_not_provided=attempt_to_create_cursor_if_not_provided,
)

@staticmethod
Expand Down Expand Up @@ -1931,30 +1932,17 @@ def create_declarative_stream(
and hasattr(model.incremental_sync, "is_data_feed")
and model.incremental_sync.is_data_feed
)
client_side_incremental_sync = None
if (
client_side_filtering_enabled = (
model.incremental_sync
and hasattr(model.incremental_sync, "is_client_side_incremental")
and model.incremental_sync.is_client_side_incremental
):
supported_slicers = (
DatetimeBasedCursor,
GlobalSubstreamCursor,
PerPartitionWithGlobalCursor,
)
if combined_slicers and not isinstance(combined_slicers, supported_slicers):
raise ValueError(
"Unsupported Slicer is used. PerPartitionWithGlobalCursor should be used here instead"
)
cursor = (
combined_slicers
if isinstance(
combined_slicers, (PerPartitionWithGlobalCursor, GlobalSubstreamCursor)
)
else self._create_component_from_model(model=model.incremental_sync, config=config)
)
concurrent_cursor = None
if stop_condition_on_cursor or client_side_filtering_enabled:
stream_slicer = self._build_stream_slicer_from_partition_router(
model.retriever, config, stream_name=model.name
)

client_side_incremental_sync = {"cursor": cursor}
concurrent_cursor = self._build_concurrent_cursor(model, stream_slicer, config)

if model.incremental_sync and isinstance(model.incremental_sync, DatetimeBasedCursorModel):
cursor_model = model.incremental_sync
Expand Down Expand Up @@ -2029,8 +2017,10 @@ def create_declarative_stream(
primary_key=primary_key,
stream_slicer=combined_slicers,
request_options_provider=request_options_provider,
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
stop_condition_cursor=concurrent_cursor,
client_side_incremental_sync={"cursor": concurrent_cursor}
if client_side_filtering_enabled
else None,
transformations=transformations,
file_uploader=file_uploader,
incremental_sync=model.incremental_sync,
Expand Down Expand Up @@ -2185,6 +2175,66 @@ def _build_incremental_cursor(
return self._create_component_from_model(model=model.incremental_sync, config=config) # type: ignore[no-any-return] # Will be created Cursor as stream_slicer_model is model.incremental_sync
return None

def _build_concurrent_cursor(
self,
model: DeclarativeStreamModel,
stream_slicer: Optional[PartitionRouter],
config: Config,
) -> Optional[StreamSlicer]:
stream_state = self._connector_state_manager.get_stream_state(
stream_name=model.name or "", namespace=None
)

if model.incremental_sync and stream_slicer:
# FIXME should this be in create_concurrent_cursor_from_perpartition_cursor
if model.state_migrations:
state_transformations = [
self._create_component_from_model(
state_migration, config, declarative_stream=model
)
for state_migration in model.state_migrations
]
else:
state_transformations = []

return self.create_concurrent_cursor_from_perpartition_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
state_manager=self._connector_state_manager,
model_type=DatetimeBasedCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state=stream_state,
stream_state_migrations=state_transformations,
partition_router=stream_slicer,
attempt_to_create_cursor_if_not_provided=True,
)
elif model.incremental_sync:
if type(model.incremental_sync) == IncrementingCountCursorModel:
return self.create_concurrent_cursor_from_incrementing_count_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=IncrementingCountCursorModel,
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state_migrations=model.state_migrations,
Comment thread
maxi297 marked this conversation as resolved.
)
elif type(model.incremental_sync) == DatetimeBasedCursorModel:
return self.create_concurrent_cursor_from_datetime_based_cursor( # type: ignore # This is a known issue that we are creating and returning a ConcurrentCursor which does not technically implement the (low-code) StreamSlicer. However, (low-code) StreamSlicer and ConcurrentCursor both implement StreamSlicer.stream_slices() which is the primary method needed for checkpointing
model_type=type(model.incremental_sync),
component_definition=model.incremental_sync.__dict__,
stream_name=model.name or "",
stream_namespace=None,
config=config or {},
stream_state_migrations=model.state_migrations,
attempt_to_create_cursor_if_not_provided=True,
)
else:
raise ValueError(
f"Incremental sync of type {type(model.incremental_sync)} is not supported"
)
return None

Comment thread
maxi297 marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
def _build_resumable_cursor(
self,
model: Union[
Expand Down Expand Up @@ -2285,7 +2335,7 @@ def create_default_paginator(
url_base: str,
extractor_model: Optional[Union[CustomRecordExtractorModel, DpathExtractorModel]] = None,
decoder: Optional[Decoder] = None,
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
cursor_used_for_stop_condition: Optional[Cursor] = None,
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
if decoder:
if self._is_supported_decoder_for_pagination(decoder):
Expand Down Expand Up @@ -3146,7 +3196,7 @@ def create_simple_retriever(
primary_key: Optional[Union[str, List[str], List[List[str]]]],
stream_slicer: Optional[StreamSlicer],
request_options_provider: Optional[RequestOptionsProvider] = None,
stop_condition_on_cursor: bool = False,
stop_condition_cursor: Optional[Cursor] = None,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
file_uploader: Optional[DefaultFileUploader] = None,
Expand Down Expand Up @@ -3277,7 +3327,7 @@ def _get_url() -> str:
),
)

cursor_used_for_stop_condition = cursor if stop_condition_on_cursor else None
cursor_used_for_stop_condition = cursor if stop_condition_cursor else None
Comment thread
maxi297 marked this conversation as resolved.
Outdated
paginator = (
self._create_component_from_model(
model=model.paginator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

import requests

from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import (
PaginationStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.types import Record


Expand All @@ -29,8 +28,7 @@ def is_met(self, record: Record) -> bool:
class CursorStopCondition(PaginationStopCondition):
def __init__(
self,
cursor: DeclarativeCursor
| ConcurrentCursor, # migrate to use both old and concurrent versions
cursor: Cursor,
):
self._cursor = cursor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,6 @@ def set_initial_state(self, value: StreamState) -> None:

def ensure_at_least_one_state_emitted(self) -> None:
self.emit_state_message()

def should_be_synced(self, record: Record) -> bool:
return True
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ def ensure_at_least_one_state_emitted(self) -> None:
self._stream_name, self._stream_namespace
)
self._message_repository.emit_message(state_message)

def should_be_synced(self, record: Record) -> bool:
return True
25 changes: 23 additions & 2 deletions airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
"""
raise NotImplementedError()

@abstractmethod
def should_be_synced(self, record: Record) -> bool:
pass

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Default placeholder implementation of generate_slices.
Expand Down Expand Up @@ -123,6 +127,9 @@ def ensure_at_least_one_state_emitted(self) -> None:
)
self._message_repository.emit_message(state_message)

def should_be_synced(self, record: Record) -> bool:
return True


class ConcurrentCursor(Cursor):
_START_BOUNDARY = 0
Expand Down Expand Up @@ -192,9 +199,23 @@ def _get_concurrent_state(
self, state: MutableMapping[str, Any]
) -> Tuple[CursorValueType, MutableMapping[str, Any]]:
if self._connector_state_converter.is_state_message_compatible(state):
partitioned_state = self._connector_state_converter.deserialize(state)
slices_from_partitioned_state = partitioned_state.get("slices", [])

value_from_partitioned_state = None
if slices_from_partitioned_state:
# We assume here that the slices have been already merged
first_slice = slices_from_partitioned_state[0]
value_from_partitioned_state = (
first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
else first_slice[self._connector_state_converter.END_KEY]
)
return (
self._start or self._connector_state_converter.zero_value,
self._connector_state_converter.deserialize(state),
value_from_partitioned_state
or self._start
or self._connector_state_converter.zero_value,
partitioned_state,
)
return self._connector_state_converter.convert_from_sequential_state(
self._cursor_field, state, self._start
Expand Down
Loading
Loading