Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 25 additions & 28 deletions airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,49 +50,42 @@


class AbstractSource(Source, ABC):
"""
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods
in this class to create an Airbyte Specification compliant Source.
"""
"""Base class for Airbyte source connectors that orchestrates stream reading and state management."""

@abstractmethod
def check_connection(
self, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Optional[Any]]:
"""
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
and we can connect to the underlying data source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source,
and the "error" object should describe what went wrong.
The error object will be cast to string to display the problem to the user.
"""Validates that the provided configuration can successfully connect to the data source.

Args:
logger: Source logger for diagnostic output.
config: User-provided configuration containing credentials and connection parameters.

Returns:
Tuple of (success boolean, error object). If success is True, connection is valid.
If False, error object describes what went wrong and will be displayed to the user.
"""

@abstractmethod
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""Returns the list of streams available in this source connector.

Args:
config: User-provided configuration for initializing streams.
Copy link

Copilot AI Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for streams() is missing a Returns: section to document that it returns a List[Stream], which would be helpful for developers implementing this abstract method.

Suggested change
config: User-provided configuration for initializing streams.
config: User-provided configuration for initializing streams.
Returns:
List[Stream]: The list of stream instances available for this source connector.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside (Devin-ignore) -

No, Copilot. No point in documenting what the function signature already declares. DRY is better.

"""

# Stream name to instance map for applying output object transformation
_stream_to_instance_map: Dict[str, Stream] = {}
_slice_logger: SliceLogger = DebugSliceLogger()

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification.
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
"""
"""Discovers available streams and their schemas from the data source."""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification.
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
"""
"""Validates connection to the data source using the provided configuration."""
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
Expand All @@ -105,7 +98,7 @@ def read(
catalog: ConfiguredAirbyteCatalog,
state: Optional[List[AirbyteStateMessage]] = None,
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
"""Reads records from configured streams and emits them as Airbyte messages."""
logger.info(f"Starting syncing {self.name}")
config, internal_config = split_config(config)
# TODO assert all streams exist in the connector
Expand Down Expand Up @@ -214,6 +207,7 @@ def read(
def _serialize_exception(
stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None
) -> AirbyteTracedException:
"""Converts an exception into an AirbyteTracedException with optional stream-specific error message."""
display_message = stream_instance.get_error_display_message(e) if stream_instance else None
if display_message:
return AirbyteTracedException.from_exception(
Expand All @@ -223,6 +217,7 @@ def _serialize_exception(

@property
def raise_exception_on_missing_stream(self) -> bool:
"""Controls whether to raise an exception when a configured stream is not found in the source."""
return False

def _read_stream(
Expand All @@ -233,6 +228,7 @@ def _read_stream(
state_manager: ConnectorStateManager,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""Reads records from a single stream and emits them as Airbyte messages."""
if internal_config.page_size and isinstance(stream_instance, HttpStream):
logger.info(
f"Setting page size for {stream_instance.name} to {internal_config.page_size}"
Expand Down Expand Up @@ -289,16 +285,15 @@ def _read_stream(
logger.info(f"Read {record_counter} records from {stream_name} stream")

def _emit_queued_messages(self) -> Iterable[AirbyteMessage]:
"""Emits any messages that have been queued in the message repository."""
if self.message_repository:
yield from self.message_repository.consume_queue()
return

def _get_message(
self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream
) -> AirbyteMessage:
"""
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
"""
"""Converts StreamData to AirbyteMessage or returns the input if already an AirbyteMessage."""
match record_data_or_message:
case AirbyteMessage():
return record_data_or_message
Expand All @@ -312,11 +307,13 @@ def _get_message(

@property
def message_repository(self) -> Union[None, MessageRepository]:
"""Returns the message repository used for queuing messages during sync operations."""
return _default_message_repository

@property
def stop_sync_on_stream_failure(self) -> bool:
"""
"""Controls whether to stop the entire sync when a single stream fails.

WARNING: This function is in-development which means it is subject to change. Use at your own risk.

By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
Expand Down
13 changes: 10 additions & 3 deletions airbyte_cdk/sources/declarative/yaml_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


class YamlDeclarativeSource(ConcurrentDeclarativeSource):
"""Declarative source defined by a yaml file"""
"""Entry point for declarative YAML-based source connectors that loads and executes manifest files."""

def __init__(
self,
Expand All @@ -26,8 +26,15 @@ def __init__(
state: Optional[List[AirbyteStateMessage]] = None,
config_path: Optional[str] = None,
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
"""Initializes a declarative source from a YAML manifest file.

Args:
path_to_yaml: Path to the manifest YAML file describing the source.
debug: Enable debug logging for manifest parsing and execution.
catalog: Configured catalog for the sync operation.
config: User-provided configuration for the source.
state: Current state for incremental syncs.
config_path: Path to the configuration file.
"""
self._path_to_yaml = path_to_yaml
source_config = self._read_and_parse_yaml_file(path_to_yaml)
Expand Down
53 changes: 21 additions & 32 deletions airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@


class HttpStream(Stream, CheckpointMixin, ABC):
"""
Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
"""
"""Base class for streams that fetch data from HTTP APIs with built-in pagination and error handling."""

source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
page_size: Optional[int] = (
Expand Down Expand Up @@ -108,15 +106,11 @@ def use_cache(self) -> bool:
@property
@abstractmethod
def url_base(self) -> str:
"""
:return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
"""
"""Returns the base URL for API requests (e.g., "https://api.example.com/v1/")."""

@property
def http_method(self) -> str:
"""
Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
"""
"""Returns the HTTP method to use for requests (default: "GET")."""
return "GET"

@property
Expand Down Expand Up @@ -165,12 +159,13 @@ def retry_factor(self) -> float:

@abstractmethod
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
"""
Override this method to define a pagination strategy.
"""Returns the token for the next page of results, or None if no more pages exist.

The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
Args:
response: HTTP response from the current page request.

:return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
Returns:
Mapping containing pagination token, or None if pagination is complete.
"""

@abstractmethod
Expand All @@ -181,21 +176,15 @@ def path(
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
"""
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
"""
"""Returns the URL path for the API endpoint (e.g., "users" or "v2/customers")."""

def request_params(
self,
stream_state: Optional[Mapping[str, Any]],
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
"""
Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
"""Returns query parameters to include in the HTTP request."""
return {}

def request_headers(
Expand All @@ -204,9 +193,7 @@ def request_headers(
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
"""
"""Returns non-authentication headers to include in the HTTP request."""
return {}

def request_body_data(
Expand Down Expand Up @@ -261,14 +248,16 @@ def parse_response(
stream_slice: Optional[Mapping[str, Any]] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Iterable[Mapping[str, Any]]:
"""
Parses the raw response object into a list of records.
By default, this returns an iterable containing the input. Override to parse differently.
:param response:
:param stream_state:
:param stream_slice:
:param next_page_token:
:return: An iterable containing the parsed response
"""Parses the HTTP response into an iterable of record dictionaries.

Args:
response: HTTP response object from the API request.
stream_state: Current state for incremental syncs.
stream_slice: Current partition being processed.
next_page_token: Token for the current page of results.

Returns:
Iterable of record dictionaries extracted from the response.
"""

def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:
Expand Down
Loading