Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,14 @@ def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile:

Returns the first file if successful, otherwise raises a CheckAvailabilityError.
"""
stream.logger.info(f"Starting to list files for stream: {stream.name}")
try:
file = next(iter(stream.get_files()))
files = list(stream.get_files())
file_count = len(files)
stream.logger.info(f"Found {file_count} files for stream: {stream.name}")
if file_count == 0:
raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
file = files[0]
except StopIteration:
raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
except CustomFileBasedException as exc:
Expand All @@ -107,6 +113,7 @@ def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile:
FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name
) from exc

stream.logger.info(f"Successfully verified file access for stream: {stream.name}")
return file

def _check_parse_record(
Expand Down
5 changes: 4 additions & 1 deletion airbyte_cdk/sources/file_based/file_based_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def check_connection(

Otherwise, the "error" object should describe what went wrong.
"""
logger.info("Starting check connection for file-based source")
try:
streams = self.streams(config)
except Exception as config_exception:
Expand Down Expand Up @@ -222,7 +223,9 @@ def check_connection(
failure_type=FailureType.config_error,
)

return not bool(errors), (errors or None)
success = not bool(errors)
logger.info(f"Completed check connection for file-based source. Result: {success}")
return success, (errors or None)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def get_matching_files(

Utility method `self.filter_files_by_globs` and `self.get_prefixes_from_globs`
are available, which may be helpful when implementing this method.

Implementation should include logging:
- At the beginning: logger.info(f"Starting to match files with glob patterns: {globs}")
- After filtering files: logger.info(f"Completed matching files with glob patterns: {globs}")
"""
...

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,18 @@ def _should_sync_file(self, file: RemoteFile, logger: logging.Logger) -> bool:
def get_files_to_sync(
self, all_files: Iterable[RemoteFile], logger: logging.Logger
) -> Iterable[RemoteFile]:
logger.info("Starting to determine files to sync based on cursor")
if self._is_history_full():
logger.warning(
f"The state history is full. "
f"This sync and future syncs won't be able to use the history to filter out duplicate files. "
f"It will instead use the time window of {self._time_window_if_history_is_full} to filter out files."
)
for f in all_files:
if self._should_sync_file(f, logger):
yield f
files_to_sync = [f for f in all_files if self._should_sync_file(f, logger)]
logger.info(
f"Determined {len(files_to_sync)} files to sync out of {len(list(all_files))} total files"
)
return files_to_sync

def get_start_time(self) -> datetime:
return self._start_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,15 @@ def get_files(self) -> Iterable[RemoteFile]:
)

def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
self.logger.info(
f"Starting schema inference for stream {self.name} with {len(files)} files"
)
loop = asyncio.get_event_loop()
schema = loop.run_until_complete(self._infer_schema(files))
# as infer schema returns a Mapping that is assumed to be immutable, we need to create a deepcopy to avoid modifying the reference
return self._fill_nulls(deepcopy(schema))
result = self._fill_nulls(deepcopy(schema))
self.logger.info(f"Completed schema inference for stream {self.name}")
return result

@staticmethod
def _fill_nulls(schema: Mapping[str, Any]) -> Mapping[str, Any]:
Expand Down Expand Up @@ -374,6 +379,9 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
Each file type has a corresponding `infer_schema` handler.
Dispatch on file type.
"""
self.logger.info(
f"Starting concurrent schema inference for {len(files)} files with {self._discovery_policy.n_concurrent_requests} concurrent requests"
)
base_schema: SchemaType = {}
pending_tasks: Set[asyncio.tasks.Task[SchemaType]] = set()

Expand All @@ -383,6 +391,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
while len(pending_tasks) <= self._discovery_policy.n_concurrent_requests and (
file := next(files_iterator, None)
):
self.logger.debug(f"Starting schema inference for file: {file.uri}")
pending_tasks.add(asyncio.create_task(self._infer_file_schema(file)))
n_started += 1
# Return when the first task is completed so that we can enqueue a new task as soon as the
Expand All @@ -392,6 +401,9 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
)
for task in done:
try:
self.logger.debug(
f"Completed schema inference for a file, {len(pending_tasks)} files remaining"
)
base_schema = merge_schemas(base_schema, task.result())
except AirbyteTracedException as ate:
raise ate
Expand All @@ -401,6 +413,7 @@ async def _infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
exc_info=exc,
)

self.logger.info(f"Completed concurrent schema inference for stream {self.name}")
return base_schema

async def _infer_file_schema(self, file: RemoteFile) -> SchemaType:
Expand Down
Loading