diff --git a/airbyte_cdk/sources/abstract_source.py b/airbyte_cdk/sources/abstract_source.py index ab9ee48b8..ca7b4ebee 100644 --- a/airbyte_cdk/sources/abstract_source.py +++ b/airbyte_cdk/sources/abstract_source.py @@ -50,32 +50,29 @@ class AbstractSource(Source, ABC): - """ - Abstract base class for an Airbyte Source. Consumers should implement any abstract methods - in this class to create an Airbyte Specification compliant Source. - """ + """Base class for Airbyte source connectors that orchestrates stream reading and state management.""" @abstractmethod def check_connection( self, logger: logging.Logger, config: Mapping[str, Any] ) -> Tuple[bool, Optional[Any]]: - """ - :param logger: source logger - :param config: The user-provided configuration as specified by the source's spec. - This usually contains information required to check connection e.g. tokens, secrets and keys etc. - :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful - and we can connect to the underlying data source using the provided configuration. - Otherwise, the input config cannot be used to connect to the underlying data source, - and the "error" object should describe what went wrong. - The error object will be cast to string to display the problem to the user. + """Validates that the provided configuration can successfully connect to the data source. + + Args: + logger: Source logger for diagnostic output. + config: User-provided configuration containing credentials and connection parameters. + + Returns: + Tuple of (success boolean, error object). If success is True, connection is valid. + If False, error object describes what went wrong and will be displayed to the user. """ @abstractmethod def streams(self, config: Mapping[str, Any]) -> List[Stream]: - """ - :param config: The user-provided configuration as specified by the source's spec. - Any stream construction related operation should happen here. - :return: A list of the streams in this source connector. + """Returns the list of streams available in this source connector. + + Args: + config: User-provided configuration for initializing streams. """ # Stream name to instance map for applying output object transformation @@ -83,16 +80,12 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: _slice_logger: SliceLogger = DebugSliceLogger() def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: - """Implements the Discover operation from the Airbyte Specification. - See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover. - """ + """Discovers available streams and their schemas from the data source.""" streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] return AirbyteCatalog(streams=streams) def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - """Implements the Check Connection operation from the Airbyte Specification. - See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check. - """ + """Validates connection to the data source using the provided configuration.""" check_succeeded, error = self.check_connection(logger, config) if not check_succeeded: return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) @@ -105,7 +98,7 @@ def read( catalog: ConfiguredAirbyteCatalog, state: Optional[List[AirbyteStateMessage]] = None, ) -> Iterator[AirbyteMessage]: - """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/.""" + """Reads records from configured streams and emits them as Airbyte messages.""" logger.info(f"Starting syncing {self.name}") config, internal_config = split_config(config) # TODO assert all streams exist in the connector @@ -214,6 +207,7 @@ def read( def _serialize_exception( stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None ) -> AirbyteTracedException: + """Converts an exception into an AirbyteTracedException with optional stream-specific error message.""" display_message = stream_instance.get_error_display_message(e) if stream_instance else None if display_message: return AirbyteTracedException.from_exception( @@ -223,6 +217,7 @@ def _serialize_exception( @property def raise_exception_on_missing_stream(self) -> bool: + """Controls whether to raise an exception when a configured stream is not found in the source.""" return False def _read_stream( @@ -233,6 +228,7 @@ def _read_stream( state_manager: ConnectorStateManager, internal_config: InternalConfig, ) -> Iterator[AirbyteMessage]: + """Reads records from a single stream and emits them as Airbyte messages.""" if internal_config.page_size and isinstance(stream_instance, HttpStream): logger.info( f"Setting page size for {stream_instance.name} to {internal_config.page_size}" @@ -289,6 +285,7 @@ def _read_stream( logger.info(f"Read {record_counter} records from {stream_name} stream") def _emit_queued_messages(self) -> Iterable[AirbyteMessage]: + """Emits any messages that have been queued in the message repository.""" if self.message_repository: yield from self.message_repository.consume_queue() return @@ -296,9 +293,7 @@ def _emit_queued_messages(self) -> Iterable[AirbyteMessage]: def _get_message( self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream ) -> AirbyteMessage: - """ - Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage - """ + """Converts StreamData to AirbyteMessage or returns the input if already an AirbyteMessage.""" match record_data_or_message: case AirbyteMessage(): return record_data_or_message @@ -312,11 +307,13 @@ def _get_message( @property def message_repository(self) -> Union[None, MessageRepository]: + """Returns the message repository used for queuing messages during sync operations.""" return _default_message_repository @property def stop_sync_on_stream_failure(self) -> bool: - """ + """Controls whether to stop the entire sync when a single stream fails. + WARNING: This function is in-development which means it is subject to change. Use at your own risk. By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 7af07a5c8..5dc44576e 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -15,7 +15,7 @@ class YamlDeclarativeSource(ConcurrentDeclarativeSource): - """Declarative source defined by a yaml file""" + """Entry point for declarative YAML-based source connectors that loads and executes manifest files.""" def __init__( self, @@ -26,8 +26,15 @@ def __init__( state: Optional[List[AirbyteStateMessage]] = None, config_path: Optional[str] = None, ) -> None: - """ - :param path_to_yaml: Path to the yaml file describing the source + """Initializes a declarative source from a YAML manifest file. + + Args: + path_to_yaml: Path to the manifest YAML file describing the source. + debug: Enable debug logging for manifest parsing and execution. + catalog: Configured catalog for the sync operation. + config: User-provided configuration for the source. + state: Current state for incremental syncs. + config_path: Path to the configuration file. """ self._path_to_yaml = path_to_yaml source_config = self._read_and_parse_yaml_file(path_to_yaml) diff --git a/airbyte_cdk/sources/streams/http/http.py b/airbyte_cdk/sources/streams/http/http.py index fbf4fe35d..f7e009791 100644 --- a/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte_cdk/sources/streams/http/http.py @@ -42,9 +42,7 @@ class HttpStream(Stream, CheckpointMixin, ABC): - """ - Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API. - """ + """Base class for streams that fetch data from HTTP APIs with built-in pagination and error handling.""" source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table) page_size: Optional[int] = ( @@ -108,15 +106,11 @@ def use_cache(self) -> bool: @property @abstractmethod def url_base(self) -> str: - """ - :return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/" - """ + """Returns the base URL for API requests (e.g., "https://api.example.com/v1/").""" @property def http_method(self) -> str: - """ - Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH. - """ + """Returns the HTTP method to use for requests (default: "GET").""" return "GET" @property @@ -165,12 +159,13 @@ def retry_factor(self) -> float: @abstractmethod def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - """ - Override this method to define a pagination strategy. + """Returns the token for the next page of results, or None if no more pages exist. - The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. + Args: + response: HTTP response from the current page request. - :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. + Returns: + Mapping containing pagination token, or None if pagination is complete. """ @abstractmethod @@ -181,9 +176,7 @@ def path( stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> str: - """ - Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity" - """ + """Returns the URL path for the API endpoint (e.g., "users" or "v2/customers").""" def request_params( self, @@ -191,11 +184,7 @@ def request_params( stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> MutableMapping[str, Any]: - """ - Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs. - - E.g: you might want to define query parameters for paging if next_page_token is not None. - """ + """Returns query parameters to include in the HTTP request.""" return {} def request_headers( @@ -204,9 +193,7 @@ def request_headers( stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - """ - Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method. - """ + """Returns non-authentication headers to include in the HTTP request.""" return {} def request_body_data( @@ -261,14 +248,16 @@ def parse_response( stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Iterable[Mapping[str, Any]]: - """ - Parses the raw response object into a list of records. - By default, this returns an iterable containing the input. Override to parse differently. - :param response: - :param stream_state: - :param stream_slice: - :param next_page_token: - :return: An iterable containing the parsed response + """Parses the HTTP response into an iterable of record dictionaries. + + Args: + response: HTTP response object from the API request. + stream_state: Current state for incremental syncs. + stream_slice: Current partition being processed. + next_page_token: Token for the current page of results. + + Returns: + Iterable of record dictionaries extracted from the response. """ def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]: