diff --git a/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py b/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py index c9d416a72..123cb5027 100644 --- a/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py +++ b/airbyte_cdk/sources/file_based/availability_strategy/default_file_based_availability_strategy.py @@ -96,8 +96,14 @@ def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile: Returns the first file if successful, otherwise raises a CheckAvailabilityError. """ + stream.logger.info(f"Starting to list files for stream: {stream.name}") try: - file = next(iter(stream.get_files())) + files = list(stream.get_files()) + file_count = len(files) + stream.logger.info(f"Found {file_count} files for stream: {stream.name}") + if file_count == 0: + raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name) + file = files[0] except StopIteration: raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name) except CustomFileBasedException as exc: @@ -107,6 +113,7 @@ def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile: FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name ) from exc + stream.logger.info(f"Successfully verified file access for stream: {stream.name}") return file def _check_parse_record( diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index 17a7ee957..cb9197dfe 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -154,6 +154,7 @@ def check_connection( Otherwise, the "error" object should describe what went wrong. """ + logger.info("Starting check connection for file-based source") try: streams = self.streams(config) except Exception as config_exception: @@ -222,7 +223,9 @@ def check_connection( failure_type=FailureType.config_error, ) - return not bool(errors), (errors or None) + success = not bool(errors) + logger.info(f"Completed check connection for file-based source. Result: {success}") + return success, (errors or None) def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index cbf3d119b..f85ad4ba4 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -85,6 +85,10 @@ def get_matching_files( Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs` are available, which may be helpful when implementing this method. + + Implementation should include logging: + - At the beginning: logger.info(f"Starting to match files with glob patterns: {globs}") + - After filtering files: logger.info(f"Completed matching files with glob patterns: {globs}") """ ... diff --git a/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py b/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py index 08ad8c3ae..b06ce9673 100644 --- a/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py +++ b/airbyte_cdk/sources/file_based/stream/cursor/default_file_based_cursor.py @@ -113,15 +113,18 @@ def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool: def get_files_to_sync( self, all_files: Iterable[RemoteFile], logger: logging.Logger ) -> Iterable[RemoteFile]: + logger.info("Starting to determine files to sync based on cursor") if self._is_history_full(): logger.warning( f"The state history is full. " f"This sync and future syncs won't be able to use the history to filter out duplicate files. " f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files." ) - for f in all_files: - if self._should_sync_file(f, logger): - yield f + files_to_sync = [f for f in all_files if self._should_sync_file(f, logger)] + logger.info( + f"Determined {len(files_to_sync)} files to sync out of {len(list(all_files))} total files" + ) + return files_to_sync def get_start_time(self) -> datetime: return self._start_time diff --git a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index 42d01577c..a47126457 100644 --- a/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -342,10 +342,15 @@ def get_files(self) -> Iterable[RemoteFile]: ) def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: + self.logger.info( + f"Starting schema inference for stream {self.name} with {len(files)} files" + ) loop = asyncio.get_event_loop() schema = loop.run_until_complete(self._infer_schema(files)) # as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference - return self._fill_nulls(deepcopy(schema)) + result = self._fill_nulls(deepcopy(schema)) + self.logger.info(f"Completed schema inference for stream {self.name}") + return result @staticmethod def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]: @@ -374,6 +379,9 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: Each file type has a corresponding `infer_schema` handler. Dispatch on file type. """ + self.logger.info( + f"Starting concurrent schema inference for {len(files)} files with {self._discovery_policy.n_concurrent_requests} concurrent requests" + ) base_schema: SchemaType = {} pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set() @@ -383,6 +391,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and ( file := next(files_iterator, None) ): + self.logger.debug(f"Starting schema inference for file: {file.uri}") pending_tasks.add(asyncio.create_task(self._infer_file_schema(file))) n_started += 1 # Return when the first task is completed so that we can enqueue a new task as soon as the @@ -392,6 +401,9 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: ) for task in done: try: + self.logger.debug( + f"Completed schema inference for a file, {len(pending_tasks)} files remaining" + ) base_schema = merge_schemas(base_schema, task.result()) except AirbyteTracedException as ate: raise ate @@ -401,6 +413,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]: exc_info=exc, ) + self.logger.info(f"Completed concurrent schema inference for stream {self.name}") return base_schema async def _infer_file_schema(self, file: RemoteFile) -> SchemaType: