Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 8 additions & 41 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
partition_factory=DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
declarative_stream.schema_loader,
retriever,
self.message_repository,
),
Expand Down Expand Up @@ -344,7 +344,7 @@ def _group_streams(
partition_generator = StreamSlicerPartitionGenerator(
DeclarativePartitionFactory(
declarative_stream.name,
declarative_stream.get_json_schema(),
declarative_stream.schema_loader,
declarative_stream.retriever,
self.message_repository,
),
Expand All @@ -361,7 +361,7 @@ def _group_streams(
DefaultStream(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
json_schema=lambda: declarative_stream.schema_loader.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=None,
Expand All @@ -377,7 +377,7 @@ def _group_streams(
and hasattr(declarative_stream.retriever, "stream_slicer")
and isinstance(
declarative_stream.retriever.stream_slicer,
(GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
ConcurrentPerPartitionCursor,
)
):
stream_state = self._connector_state_manager.get_stream_state(
Expand Down Expand Up @@ -435,6 +435,8 @@ def _group_streams(
and self.is_partially_declarative
):
concurrent_streams.append(declarative_stream.get_underlying_stream())
elif isinstance(declarative_stream, DefaultStream): # FIXME added temporarily until the ConcurrentDeclarativeSource is cleaned up
concurrent_streams.append(declarative_stream)
else:
synchronous_streams.append(declarative_stream)

Expand All @@ -453,49 +455,14 @@ def _is_concurrent_cursor_incremental_without_partition_routing(
in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
)
and hasattr(declarative_stream.retriever, "stream_slicer")
and (
isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
# IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
# add isintance check here if we want to create a Declarative IncrementingCountCursor
# or isinstance(
# declarative_stream.retriever.stream_slicer, IncrementingCountCursor
# )
or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
)
and isinstance(declarative_stream.retriever.stream_slicer, (ConcurrentCursor, AsyncJobPartitionRouter))
)

@staticmethod
def _get_retriever(
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
) -> Retriever:
retriever = declarative_stream.retriever

# This is an optimization so that we don't invoke any cursor or state management flows within the
# low-code framework because state management is handled through the ConcurrentCursor.
if declarative_stream and isinstance(retriever, SimpleRetriever):
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
# like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
# properly initialized with state.
if retriever.cursor:
retriever.cursor.set_initial_state(stream_state=stream_state)

# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
# for semi-incremental streams using is_client_side_incremental to filter properly
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
):
retriever.record_selector.record_filter._cursor.set_initial_state(
stream_state=stream_state
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds

# We zero it out here, but since this is a cursor reference, the state is still properly
# instantiated for the other components that reference it
retriever.cursor = None

return retriever
return declarative_stream.retriever

@staticmethod
def _select_streams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airbyte_cdk.sources.declarative.incremental import (
DatetimeBasedCursor,
GlobalSubstreamCursor,
PerPartitionWithGlobalCursor,
PerPartitionWithGlobalCursor, ConcurrentPerPartitionCursor,
)
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
from collections import OrderedDict
from copy import deepcopy
from datetime import timedelta
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.models import AirbyteStateMessage, AirbyteStateBlob, AirbyteStreamState, AirbyteStateType, StreamDescriptor
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
Timer,
iterate_with_last_flag_and_state,
)
# It is interesting that this file depends on the declarative stuff. If we ever think that per partition cursors will ever be needed outside the declarative package, we would need to add an interface here to ensure that we avoid circular dependencies
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import (
Expand Down Expand Up @@ -124,6 +126,7 @@ def __init__(
self._timer = Timer()

self._set_initial_state(stream_state)
self._synced_some_data = False

@property
def cursor_field(self) -> CursorField:
Expand Down Expand Up @@ -154,6 +157,7 @@ def state(self) -> MutableMapping[str, Any]:

def close_partition(self, partition: Partition) -> None:
# Attempt to retrieve the stream slice
logger.warning(f"GODO: stream {self._stream_name} closing partition {partition.to_slice()}")
stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment]

# Ensure stream_slice is not None
Expand Down Expand Up @@ -209,8 +213,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
if not any(
semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
):
self._global_cursor = self._new_global_cursor
self._lookback_window = self._timer.finish()
if self._synced_some_data:
# we only update those if we actually synced some data
self._global_cursor = self._new_global_cursor
self._lookback_window = self._timer.finish()
self._parent_state = self._partition_router.get_stream_state()
self._emit_state_message(throttle=False)

Expand Down Expand Up @@ -454,6 +460,7 @@ def observe(self, record: Record) -> None:
except ValueError:
return

self._synced_some_data = True
record_cursor = self._connector_state_converter.output_format(
self._connector_state_converter.parse_value(record_cursor_value)
)
Expand Down Expand Up @@ -522,3 +529,23 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:

def limit_reached(self) -> bool:
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

@staticmethod
def get_parent_state(stream_state: Optional[StreamState], parent_stream_name: str) -> Optional[AirbyteStateMessage]:
return AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(parent_stream_name, None),
stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name])
)
) if stream_state and "parent_state" in stream_state else None

@staticmethod
def get_global_state(stream_state: Optional[StreamState], parent_stream_name: str) -> Optional[AirbyteStateMessage]:
return AirbyteStateMessage(
type=AirbyteStateType.STREAM,
stream=AirbyteStreamState(
stream_descriptor=StreamDescriptor(parent_stream_name, None),
stream_state=AirbyteStateBlob(stream_state["state"])
)
) if stream_state and "state" in stream_state else None
Original file line number Diff line number Diff line change
Expand Up @@ -429,17 +429,6 @@ def _send_log(self, level: Level, message: str) -> None:
)
)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
first_cursor_value = first.get(cursor_field)
second_cursor_value = second.get(cursor_field)
if first_cursor_value and second_cursor_value:
return self.parse_date(first_cursor_value) >= self.parse_date(second_cursor_value)
elif first_cursor_value:
return True
else:
return False

def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
"""
Updates the lookback window based on a given number of seconds if the new duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,6 @@ def get_request_body_json(
def should_be_synced(self, record: Record) -> bool:
return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
return self._stream_cursor.is_greater_than_or_equal(
self._convert_record_to_cursor_record(first),
self._convert_record_to_cursor_record(second),
)

@staticmethod
def _convert_record_to_cursor_record(record: Record) -> Record:
return Record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,21 +315,6 @@ def should_be_synced(self, record: Record) -> bool:
self._convert_record_to_cursor_record(record)
)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
if not first.associated_slice or not second.associated_slice:
raise ValueError(
f"Both records should have an associated slice but got {first.associated_slice} and {second.associated_slice}"
)
if first.associated_slice.partition != second.associated_slice.partition:
raise ValueError(
f"To compare records, partition should be the same but got {first.associated_slice.partition} and {second.associated_slice.partition}"
)

return self._get_cursor(first).is_greater_than_or_equal(
self._convert_record_to_cursor_record(first),
self._convert_record_to_cursor_record(second),
)

@staticmethod
def _convert_record_to_cursor_record(record: Record) -> Record:
return Record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,3 @@ def get_request_body_json(

def should_be_synced(self, record: Record) -> bool:
return self._get_active_cursor().should_be_synced(record)

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
return self._global_cursor.is_greater_than_or_equal(first, second)
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ def should_be_synced(self, record: Record) -> bool:
"""
return True

def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
"""
RFR record don't have ordering to be compared between one another.
"""
return False

def select_state(self, stream_slice: Optional[StreamSlice] = None) -> Optional[StreamState]:
# A top-level RFR cursor only manages the state of a single partition
return self._cursor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConditionalStreams as ConditionalStreamsModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DeclarativeStream as DeclarativeStreamModel,
)
Expand Down Expand Up @@ -297,7 +294,7 @@ def connection_checker(self) -> ConnectionChecker:
f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}"
)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def streams(self, config: Mapping[str, Any]) -> List[Stream]: # FIXME with the recent change, this method can return Stream and AbstractStream which means it does not align with AbstractSource interface anymore. How big of a deal is this?
if self._spec_component:
self._spec_component.validate_config(config)

Expand Down
Loading
Loading