From d5941c3d8943b57f8dd73676d737feec5e37c71f Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 2 Oct 2025 14:16:21 +0300 Subject: [PATCH 01/11] add AbstractFileBasedFileTransferReader --- .../file_based_file_transfer_reader.py | 64 +++++++++++++++++ .../file_based/file_based_stream_reader.py | 68 ++++++++++++++++++- 2 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py diff --git a/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py b/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py new file mode 100644 index 000000000..a8419b9eb --- /dev/null +++ b/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py @@ -0,0 +1,64 @@ +from abc import ABC, abstractmethod + +from airbyte_cdk.sources.file_based.remote_file import RemoteFile + + +class AbstractFileBasedFileTransferReader(ABC): + FILE_SIZE_LIMIT = 1_500_000_000 + + def __init__(self, remote_file: RemoteFile) -> None: + self.remote_file = remote_file + + @property + @abstractmethod + def file_id(self) -> str: + """ + Unique identifier for the file being transferred. + """ + ... + + @property + @abstractmethod + def file_created_at(self) -> str: + """ + Date time when the file was created. + """ + ... + + @property + @abstractmethod + def file_updated_at(self) -> str: + """ + Date time when the file was last updated. + """ + ... + + @property + @abstractmethod + def file_size(self) -> int: + """ + Returns the file size in bytes. + """ + ... + + @abstractmethod + def download_to_local_directory(self, local_file_path: str) -> None: + """ + Download the file from remote source to local storage. + """ + ... + + @property + @abstractmethod + def source_file_relative_path(self) -> str: + """ + Returns the relative path of the source file. + """ + ... + + @property + def file_uri_for_logging(self): + """ + Returns the URI for the file being logged. + """ + return self.remote_file.uri diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index a5fe44d42..a04483c72 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -3,6 +3,7 @@ # import logging +import time from abc import ABC, abstractmethod from datetime import datetime from enum import Enum @@ -19,8 +20,13 @@ preserve_directory_structure, use_file_transfer, ) +from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import ( + AbstractFileBasedFileTransferReader, +) from airbyte_cdk.sources.file_based.file_record_data import FileRecordData from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_protocol_dataclasses.models import FailureType +from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError class FileReadMode(Enum): @@ -38,6 +44,14 @@ class AbstractFileBasedStreamReader(ABC): def __init__(self) -> None: self._config = None + if ( + self.file_transfer_reader_class is None + and self.upload.__func__ == AbstractFileBasedStreamReader.upload + ): + raise NotImplementedError( + "One of file_transfer_reader_class or upload method must be defined to support file transfer." + ) + @property def config(self) -> Optional[AbstractFileBasedSpec]: return self._config @@ -153,7 +167,10 @@ def include_identities_stream(self) -> bool: return include_identities_stream(self.config) return False - @abstractmethod + @property + def file_transfer_reader_class(self) -> AbstractFileBasedFileTransferReader | None: + return None + def upload( self, file: RemoteFile, local_directory: str, logger: logging.Logger ) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]: @@ -173,7 +190,54 @@ def upload( - file_size_bytes (int): The size of the referenced file in bytes. - source_file_relative_path (str): The relative path to the referenced file in source. """ - ... + # if self.file_transfer_reader_class is None and self.upload.__func__ == AbstractFileBasedStreamReader.upload: + # raise NotImplementedError("One of file_transfer_reader_class or upload method must be defined to support file transfer.") + + file_transfer = self.file_transfer_reader_class(file) + file_size = file_transfer.file_size + + if file_size > file_transfer.FILE_SIZE_LIMIT: + message = "File size exceeds the 1 GB limit." + raise FileSizeLimitError( + message=message, internal_message=message, failure_type=FailureType.config_error + ) + + file_paths = self._get_file_transfer_paths( + source_file_relative_path=file_transfer.source_file_relative_path, + staging_directory=local_directory, + ) + local_file_path = file_paths[self.LOCAL_FILE_PATH] + file_relative_path = file_paths[self.FILE_RELATIVE_PATH] + file_name = file_paths[self.FILE_NAME] + + logger.info( + f"Starting to download the file {file_transfer.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)" + ) + start_download_time = time.time() + + file_transfer.download_to_local_directory(local_file_path) + + write_duration = time.time() - start_download_time + logger.info( + f"Finished downloading the file {file_transfer.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds." + ) + + file_record_data = FileRecordData( + folder=file_paths[self.FILE_FOLDER], + file_name=file_name, + bytes=file_size, + id=file_transfer.file_id, + mime_type=file.mime_type, + created_at=file_transfer.file_created_at, + updated_at=file_transfer.file_updated_at, + source_uri=file.uri, + ) + file_reference = AirbyteRecordMessageFileReference( + staging_file_url=local_file_path, + source_file_relative_path=file_relative_path, + file_size_bytes=file_size, + ) + return file_record_data, file_reference def _get_file_transfer_paths( self, source_file_relative_path: str, staging_directory: str From 997c451b509ae3f812e4efb0b4237b5a7c95550f Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 2 Oct 2025 11:21:52 +0000 Subject: [PATCH 02/11] Auto-fix lint and format issues --- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index a04483c72..19062f405 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -11,6 +11,7 @@ from os import makedirs, path from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple +from airbyte_protocol_dataclasses.models import FailureType from wcmatch.glob import GLOBSTAR, globmatch from airbyte_cdk.models import AirbyteRecordMessageFileReference @@ -20,13 +21,12 @@ preserve_directory_structure, use_file_transfer, ) +from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import ( AbstractFileBasedFileTransferReader, ) from airbyte_cdk.sources.file_based.file_record_data import FileRecordData from airbyte_cdk.sources.file_based.remote_file import RemoteFile -from airbyte_protocol_dataclasses.models import FailureType -from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError class FileReadMode(Enum): From 0bf4d54c6ae56b6d5d4bc18d0b2c8c0f42968ee1 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 2 Oct 2025 14:32:06 +0300 Subject: [PATCH 03/11] refactor code --- .../file_based_file_transfer_reader.py | 2 +- .../file_based/file_based_stream_reader.py | 25 ++++++------------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py b/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py index a8419b9eb..6dbbc3081 100644 --- a/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py @@ -57,7 +57,7 @@ def source_file_relative_path(self) -> str: ... @property - def file_uri_for_logging(self): + def file_uri_for_logging(self) -> str: """ Returns the URI for the file being logged. """ diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 19062f405..c97388133 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -9,7 +9,7 @@ from enum import Enum from io import IOBase from os import makedirs, path -from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple +from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple, Type from airbyte_protocol_dataclasses.models import FailureType from wcmatch.glob import GLOBSTAR, globmatch @@ -43,10 +43,9 @@ class AbstractFileBasedStreamReader(ABC): def __init__(self) -> None: self._config = None - if ( self.file_transfer_reader_class is None - and self.upload.__func__ == AbstractFileBasedStreamReader.upload + and type(self).upload is AbstractFileBasedStreamReader.upload ): raise NotImplementedError( "One of file_transfer_reader_class or upload method must be defined to support file transfer." @@ -127,16 +126,6 @@ def filter_files_by_globs_and_start_date( seen.add(file.uri) yield file - @abstractmethod - def file_size(self, file: RemoteFile) -> int: - """Utility method to get size of the remote file. - - This is required for connectors that will support writing to - files. If the connector does not support writing files, then the - subclass can simply `return 0`. - """ - ... - @staticmethod def file_matches_globs(file: RemoteFile, globs: List[str]) -> bool: # Use the GLOBSTAR flag to enable recursive ** matching @@ -168,7 +157,7 @@ def include_identities_stream(self) -> bool: return False @property - def file_transfer_reader_class(self) -> AbstractFileBasedFileTransferReader | None: + def file_transfer_reader_class(self) -> Type[AbstractFileBasedFileTransferReader] | None: return None def upload( @@ -190,14 +179,16 @@ def upload( - file_size_bytes (int): The size of the referenced file in bytes. - source_file_relative_path (str): The relative path to the referenced file in source. """ - # if self.file_transfer_reader_class is None and self.upload.__func__ == AbstractFileBasedStreamReader.upload: - # raise NotImplementedError("One of file_transfer_reader_class or upload method must be defined to support file transfer.") + if self.file_transfer_reader_class is None: + raise NotImplementedError( + "file_transfer_reader_class must be defined to support default file transfer upload method." + ) file_transfer = self.file_transfer_reader_class(file) file_size = file_transfer.file_size if file_size > file_transfer.FILE_SIZE_LIMIT: - message = "File size exceeds the 1 GB limit." + message = "File size exceeds the 1.5 GB limit." raise FileSizeLimitError( message=message, internal_message=message, failure_type=FailureType.config_error ) From bee6d01c738538c3b2e954f951e5113e3e155d34 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 2 Oct 2025 14:39:19 +0300 Subject: [PATCH 04/11] update file size error message --- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index c97388133..2bc5f058c 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -188,7 +188,7 @@ def upload( file_size = file_transfer.file_size if file_size > file_transfer.FILE_SIZE_LIMIT: - message = "File size exceeds the 1.5 GB limit." + message = f"File size exceeds the {file_transfer.FILE_SIZE_LIMIT / 1e+9} GB limit." raise FileSizeLimitError( message=message, internal_message=message, failure_type=FailureType.config_error ) From 7e1b8502ff8e77a6fdb5b68ca514625d84dfc928 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Thu, 2 Oct 2025 11:44:30 +0000 Subject: [PATCH 05/11] Auto-fix lint and format issues --- airbyte_cdk/sources/file_based/file_based_stream_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index 2bc5f058c..fbc25e9f4 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -188,7 +188,7 @@ def upload( file_size = file_transfer.file_size if file_size > file_transfer.FILE_SIZE_LIMIT: - message = f"File size exceeds the {file_transfer.FILE_SIZE_LIMIT / 1e+9} GB limit." + message = f"File size exceeds the {file_transfer.FILE_SIZE_LIMIT / 1e9} GB limit." raise FileSizeLimitError( message=message, internal_message=message, failure_type=FailureType.config_error ) From 3f7d4586c744b572ea6a20ca986daf3a1aa37a41 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Thu, 2 Oct 2025 15:50:52 +0300 Subject: [PATCH 06/11] add unit tests --- .../test_file_based_stream_reader.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index ffd5ba5af..170f47d35 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -7,11 +7,16 @@ from io import IOBase from os import path from typing import Any, ClassVar, Dict, Iterable, List, Mapping, Optional, Set +from unittest.mock import MagicMock import pytest from pydantic.v1 import AnyUrl from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec +from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError +from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import ( + AbstractFileBasedFileTransferReader, +) from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.utils.files_directory import get_files_directory @@ -64,6 +69,69 @@ } +class TestFileBasedFileTransferReader(AbstractFileBasedFileTransferReader): + @property + def file_id(self) -> str: + return "test_file_id" + + @property + def file_created_at(self) -> str: + return "2025-05-05" + + @property + def file_updated_at(self) -> str: + return "2025-05-06" + + @property + def file_size(self) -> int: + return self.remote_file.size + + def download_to_local_directory(self, local_file_path: str) -> None: + pass + + @property + def source_file_relative_path(self) -> str: + return "source/path" + + @property + def file_uri_for_logging(self) -> str: + return "logging/url" + + +class TestStreamReaderWithFileTransferClass(AbstractFileBasedStreamReader): + __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name + + file_transfer_reader_class = TestFileBasedFileTransferReader + + @property + def config(self) -> Optional[AbstractFileBasedSpec]: + return self._config + + @config.setter + def config(self, value: AbstractFileBasedSpec) -> None: + self._config = value + + def get_matching_files(self, globs: List[str]) -> Iterable[RemoteFile]: + pass + + def open_file(self, file: RemoteFile) -> IOBase: + pass + + def get_file_acl_permissions(self, file: RemoteFile, logger: logging.Logger) -> Dict[str, Any]: + return {} + + def load_identity_groups(self, logger: logging.Logger) -> Iterable[Dict[str, Any]]: + return [{}] + + @property + def file_permissions_schema(self) -> Dict[str, Any]: + return {"type": "object", "properties": {}} + + @property + def identities_schema(self) -> Dict[str, Any]: + return {"type": "object", "properties": {}} + + class TestStreamReader(AbstractFileBasedStreamReader): __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name @@ -458,3 +526,24 @@ def test_preserve_sub_directories_scenarios( assert file_paths[AbstractFileBasedStreamReader.LOCAL_FILE_PATH] == expected_local_file_path assert file_paths[AbstractFileBasedStreamReader.FILE_NAME] == path.basename(source_file_path) assert file_paths[AbstractFileBasedStreamReader.FILE_FOLDER] == path.dirname(source_file_path) + + +def test_upload_with_file_transfer_reader(): + stream_reader = TestStreamReaderWithFileTransferClass() + logger = logging.getLogger("airbyte") + + remote_file = MagicMock() + remote_file.size = 200 + remote_file.uri = "test_url" + remote_file.mime_type = "test_mime_type" + file_record_data, file_reference = stream_reader.upload(remote_file, "test_directory", logger) + assert file_record_data + assert file_reference + + remote_file = MagicMock() + remote_file.size = 2_500_000_000 + remote_file.uri = "test_url" + remote_file.mime_type = "test_mime_type" + logger = logging.getLogger("airbyte") + with pytest.raises(FileSizeLimitError): + stream_reader.upload(remote_file, "test_directory", logger) From 1be0b73ecebd5b116e0d093668ec0b446c90afc7 Mon Sep 17 00:00:00 2001 From: Daryna Ishchenko <80129833+darynaishchenko@users.noreply.github.com> Date: Thu, 2 Oct 2025 16:03:00 +0300 Subject: [PATCH 07/11] Update unit_tests/sources/file_based/test_file_based_stream_reader.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- unit_tests/sources/file_based/test_file_based_stream_reader.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index 170f47d35..4b30b6188 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -544,6 +544,7 @@ def test_upload_with_file_transfer_reader(): remote_file.size = 2_500_000_000 remote_file.uri = "test_url" remote_file.mime_type = "test_mime_type" - logger = logging.getLogger("airbyte") + with pytest.raises(FileSizeLimitError): + stream_reader.upload(remote_file, "test_directory", logger) with pytest.raises(FileSizeLimitError): stream_reader.upload(remote_file, "test_directory", logger) From b8c9a2ce9d13509d42c1ea8c0fe95fec08992232 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 3 Oct 2025 14:56:31 +0300 Subject: [PATCH 08/11] added UploadableRemoteFile --- .../file_based_file_transfer_reader.py | 64 ---------------- .../file_based/file_based_stream_reader.py | 48 ++++-------- airbyte_cdk/sources/file_based/remote_file.py | 41 +++++++++- .../test_file_based_stream_reader.py | 74 +++++++------------ 4 files changed, 82 insertions(+), 145 deletions(-) delete mode 100644 airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py diff --git a/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py b/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py deleted file mode 100644 index 6dbbc3081..000000000 --- a/airbyte_cdk/sources/file_based/file_based_file_transfer_reader.py +++ /dev/null @@ -1,64 +0,0 @@ -from abc import ABC, abstractmethod - -from airbyte_cdk.sources.file_based.remote_file import RemoteFile - - -class AbstractFileBasedFileTransferReader(ABC): - FILE_SIZE_LIMIT = 1_500_000_000 - - def __init__(self, remote_file: RemoteFile) -> None: - self.remote_file = remote_file - - @property - @abstractmethod - def file_id(self) -> str: - """ - Unique identifier for the file being transferred. - """ - ... - - @property - @abstractmethod - def file_created_at(self) -> str: - """ - Date time when the file was created. - """ - ... - - @property - @abstractmethod - def file_updated_at(self) -> str: - """ - Date time when the file was last updated. - """ - ... - - @property - @abstractmethod - def file_size(self) -> int: - """ - Returns the file size in bytes. - """ - ... - - @abstractmethod - def download_to_local_directory(self, local_file_path: str) -> None: - """ - Download the file from remote source to local storage. - """ - ... - - @property - @abstractmethod - def source_file_relative_path(self) -> str: - """ - Returns the relative path of the source file. - """ - ... - - @property - def file_uri_for_logging(self) -> str: - """ - Returns the URI for the file being logged. - """ - return self.remote_file.uri diff --git a/airbyte_cdk/sources/file_based/file_based_stream_reader.py b/airbyte_cdk/sources/file_based/file_based_stream_reader.py index fbc25e9f4..4c0cb7c9e 100644 --- a/airbyte_cdk/sources/file_based/file_based_stream_reader.py +++ b/airbyte_cdk/sources/file_based/file_based_stream_reader.py @@ -9,7 +9,7 @@ from enum import Enum from io import IOBase from os import makedirs, path -from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple, Type +from typing import Any, Iterable, List, MutableMapping, Optional, Set, Tuple from airbyte_protocol_dataclasses.models import FailureType from wcmatch.glob import GLOBSTAR, globmatch @@ -22,11 +22,8 @@ use_file_transfer, ) from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError -from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import ( - AbstractFileBasedFileTransferReader, -) from airbyte_cdk.sources.file_based.file_record_data import FileRecordData -from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile class FileReadMode(Enum): @@ -40,16 +37,10 @@ class AbstractFileBasedStreamReader(ABC): FILE_NAME = "file_name" LOCAL_FILE_PATH = "local_file_path" FILE_FOLDER = "file_folder" + FILE_SIZE_LIMIT = 1_500_000_000 def __init__(self) -> None: self._config = None - if ( - self.file_transfer_reader_class is None - and type(self).upload is AbstractFileBasedStreamReader.upload - ): - raise NotImplementedError( - "One of file_transfer_reader_class or upload method must be defined to support file transfer." - ) @property def config(self) -> Optional[AbstractFileBasedSpec]: @@ -156,12 +147,8 @@ def include_identities_stream(self) -> bool: return include_identities_stream(self.config) return False - @property - def file_transfer_reader_class(self) -> Type[AbstractFileBasedFileTransferReader] | None: - return None - def upload( - self, file: RemoteFile, local_directory: str, logger: logging.Logger + self, file: UploadableRemoteFile, local_directory: str, logger: logging.Logger ) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]: """ This is required for connectors that will support writing to @@ -179,22 +166,19 @@ def upload( - file_size_bytes (int): The size of the referenced file in bytes. - source_file_relative_path (str): The relative path to the referenced file in source. """ - if self.file_transfer_reader_class is None: - raise NotImplementedError( - "file_transfer_reader_class must be defined to support default file transfer upload method." - ) + if not isinstance(file, UploadableRemoteFile): + raise TypeError(f"Expected UploadableRemoteFile, got {type(file)}") - file_transfer = self.file_transfer_reader_class(file) - file_size = file_transfer.file_size + file_size = file.size - if file_size > file_transfer.FILE_SIZE_LIMIT: - message = f"File size exceeds the {file_transfer.FILE_SIZE_LIMIT / 1e9} GB limit." + if file_size > self.FILE_SIZE_LIMIT: + message = f"File size exceeds the {self.FILE_SIZE_LIMIT / 1e9} GB limit." raise FileSizeLimitError( message=message, internal_message=message, failure_type=FailureType.config_error ) file_paths = self._get_file_transfer_paths( - source_file_relative_path=file_transfer.source_file_relative_path, + source_file_relative_path=file.source_file_relative_path, staging_directory=local_directory, ) local_file_path = file_paths[self.LOCAL_FILE_PATH] @@ -202,25 +186,25 @@ def upload( file_name = file_paths[self.FILE_NAME] logger.info( - f"Starting to download the file {file_transfer.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)" + f"Starting to download the file {file.file_uri_for_logging} with size: {file_size / (1024 * 1024):,.2f} MB ({file_size / (1024 * 1024 * 1024):.2f} GB)" ) start_download_time = time.time() - file_transfer.download_to_local_directory(local_file_path) + file.download_to_local_directory(local_file_path) write_duration = time.time() - start_download_time logger.info( - f"Finished downloading the file {file_transfer.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds." + f"Finished downloading the file {file.file_uri_for_logging} and saved to {local_file_path} in {write_duration:,.2f} seconds." ) file_record_data = FileRecordData( folder=file_paths[self.FILE_FOLDER], file_name=file_name, bytes=file_size, - id=file_transfer.file_id, + id=file.id, mime_type=file.mime_type, - created_at=file_transfer.file_created_at, - updated_at=file_transfer.file_updated_at, + created_at=file.created_at, + updated_at=file.updated_at, source_uri=file.uri, ) file_reference = AirbyteRecordMessageFileReference( diff --git a/airbyte_cdk/sources/file_based/remote_file.py b/airbyte_cdk/sources/file_based/remote_file.py index 0197a35fd..faaf14001 100644 --- a/airbyte_cdk/sources/file_based/remote_file.py +++ b/airbyte_cdk/sources/file_based/remote_file.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - +from abc import abstractmethod, ABC from datetime import datetime from typing import Optional @@ -16,3 +16,42 @@ class RemoteFile(BaseModel): uri: str last_modified: datetime mime_type: Optional[str] = None + + +class UploadableRemoteFile(RemoteFile, ABC): + """ + A file in a file-based stream that supports uploading(file transferring). + """ + + id: Optional[str] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + @property + @abstractmethod + def size(self) -> int: + """ + Returns the file size in bytes. + """ + ... + + @abstractmethod + def download_to_local_directory(self, local_file_path: str) -> None: + """ + Download the file from remote source to local storage. + """ + ... + + @property + def source_file_relative_path(self) -> str: + """ + Returns the relative path of the source file. + """ + return self.uri + + @property + def file_uri_for_logging(self) -> str: + """ + Returns the URI for the file being logged. + """ + return self.uri diff --git a/unit_tests/sources/file_based/test_file_based_stream_reader.py b/unit_tests/sources/file_based/test_file_based_stream_reader.py index 4b30b6188..13fa1025c 100644 --- a/unit_tests/sources/file_based/test_file_based_stream_reader.py +++ b/unit_tests/sources/file_based/test_file_based_stream_reader.py @@ -14,11 +14,8 @@ from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec from airbyte_cdk.sources.file_based.exceptions import FileSizeLimitError -from airbyte_cdk.sources.file_based.file_based_file_transfer_reader import ( - AbstractFileBasedFileTransferReader, -) from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader -from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_cdk.sources.file_based.remote_file import RemoteFile, UploadableRemoteFile from airbyte_cdk.sources.utils.files_directory import get_files_directory from unit_tests.sources.file_based.helpers import make_remote_files @@ -69,40 +66,9 @@ } -class TestFileBasedFileTransferReader(AbstractFileBasedFileTransferReader): - @property - def file_id(self) -> str: - return "test_file_id" - - @property - def file_created_at(self) -> str: - return "2025-05-05" - - @property - def file_updated_at(self) -> str: - return "2025-05-06" - - @property - def file_size(self) -> int: - return self.remote_file.size - - def download_to_local_directory(self, local_file_path: str) -> None: - pass - - @property - def source_file_relative_path(self) -> str: - return "source/path" - - @property - def file_uri_for_logging(self) -> str: - return "logging/url" - - -class TestStreamReaderWithFileTransferClass(AbstractFileBasedStreamReader): +class TestStreamReaderWithDefaultUpload(AbstractFileBasedStreamReader): __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name - file_transfer_reader_class = TestFileBasedFileTransferReader - @property def config(self) -> Optional[AbstractFileBasedSpec]: return self._config @@ -529,22 +495,34 @@ def test_preserve_sub_directories_scenarios( def test_upload_with_file_transfer_reader(): - stream_reader = TestStreamReaderWithFileTransferClass() + stream_reader = TestStreamReaderWithDefaultUpload() + + class TestUploadableRemoteFile(UploadableRemoteFile): + blob: Any + + @property + def size(self) -> int: + return self.blob.size + + def download_to_local_directory(self, local_file_path: str) -> None: + pass + + blob = MagicMock() + blob.size = 200 + uploadable_remote_file = TestUploadableRemoteFile( + uri="test/uri", last_modified=datetime.now(), blob=blob + ) + logger = logging.getLogger("airbyte") - remote_file = MagicMock() - remote_file.size = 200 - remote_file.uri = "test_url" - remote_file.mime_type = "test_mime_type" - file_record_data, file_reference = stream_reader.upload(remote_file, "test_directory", logger) + file_record_data, file_reference = stream_reader.upload( + uploadable_remote_file, "test_directory", logger + ) assert file_record_data assert file_reference - remote_file = MagicMock() - remote_file.size = 2_500_000_000 - remote_file.uri = "test_url" - remote_file.mime_type = "test_mime_type" + blob.size = 2_500_000_000 with pytest.raises(FileSizeLimitError): - stream_reader.upload(remote_file, "test_directory", logger) + stream_reader.upload(uploadable_remote_file, "test_directory", logger) with pytest.raises(FileSizeLimitError): - stream_reader.upload(remote_file, "test_directory", logger) + stream_reader.upload(uploadable_remote_file, "test_directory", logger) From e40bb9bec8c3d73235466046fc5b8820ad410ab5 Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 3 Oct 2025 15:26:18 +0300 Subject: [PATCH 09/11] updated typing --- airbyte_cdk/sources/file_based/remote_file.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/remote_file.py b/airbyte_cdk/sources/file_based/remote_file.py index faaf14001..d82f6e1c5 100644 --- a/airbyte_cdk/sources/file_based/remote_file.py +++ b/airbyte_cdk/sources/file_based/remote_file.py @@ -24,8 +24,8 @@ class UploadableRemoteFile(RemoteFile, ABC): """ id: Optional[str] = None - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None + created_at: Optional[str] = None + updated_at: Optional[str] = None @property @abstractmethod From eb1c883e8202a4f97f6fa8f39a60490b643f5804 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 3 Oct 2025 12:28:03 +0000 Subject: [PATCH 10/11] Auto-fix lint and format issues --- airbyte_cdk/sources/file_based/remote_file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/file_based/remote_file.py b/airbyte_cdk/sources/file_based/remote_file.py index d82f6e1c5..b2bfb23f5 100644 --- a/airbyte_cdk/sources/file_based/remote_file.py +++ b/airbyte_cdk/sources/file_based/remote_file.py @@ -1,7 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # -from abc import abstractmethod, ABC +from abc import ABC, abstractmethod from datetime import datetime from typing import Optional From 7a0f9626bfe2bf67b139bd6c9efefb96fca738fe Mon Sep 17 00:00:00 2001 From: darynaishchenko Date: Fri, 3 Oct 2025 15:38:48 +0300 Subject: [PATCH 11/11] updated type in FileTransfer --- airbyte_cdk/sources/file_based/file_types/file_transfer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_types/file_transfer.py b/airbyte_cdk/sources/file_based/file_types/file_transfer.py index ddc70e4b9..d260a092d 100644 --- a/airbyte_cdk/sources/file_based/file_types/file_transfer.py +++ b/airbyte_cdk/sources/file_based/file_types/file_transfer.py @@ -7,7 +7,7 @@ from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader from airbyte_cdk.sources.file_based.file_record_data import FileRecordData -from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_cdk.sources.file_based.remote_file import UploadableRemoteFile from airbyte_cdk.sources.utils.files_directory import get_files_directory @@ -17,7 +17,7 @@ def __init__(self) -> None: def upload( self, - file: RemoteFile, + file: UploadableRemoteFile, stream_reader: AbstractFileBasedStreamReader, logger: logging.Logger, ) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]: