diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 86b52fd52..4ea710706 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -481,7 +481,8 @@ SimpleRetriever, SimpleRetrieverTestReadDecorator, ) -from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader +from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader, FileWriter, NoopFileWriter, \ + ConnectorBuilderFileUploader, BaseFileUploader from airbyte_cdk.sources.declarative.schema import ( ComplexFieldType, DefaultSchemaLoader, @@ -3590,7 +3591,7 @@ def create_fixed_window_call_rate_policy( def create_file_uploader( self, model: FileUploaderModel, config: Config, **kwargs: Any - ) -> FileUploader: + ) -> BaseFileUploader: name = "File Uploader" requester = self._create_component_from_model( model=model.requester, @@ -3604,14 +3605,18 @@ def create_file_uploader( name=name, **kwargs, ) - return FileUploader( + emit_connector_builder_messages = self._emit_connector_builder_messages + file_uploader = FileUploader( requester=requester, download_target_extractor=download_target_extractor, config=config, + file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(), parameters=model.parameters or {}, filename_extractor=model.filename_extractor if model.filename_extractor else None, ) + return ConnectorBuilderFileUploader(file_uploader) if emit_connector_builder_messages else file_uploader + def create_moving_window_call_rate_policy( self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any ) -> MovingWindowCallRatePolicy: diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index 98342e1af..b025dde2b 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import Any, Mapping, Optional, Union +from abc import ABC, abstractmethod from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import ( @@ -24,12 +25,56 @@ logger = logging.getLogger("airbyte") +@dataclass +class BaseFileUploader(ABC): + """ + Base class for file uploader + """ + + @abstractmethod + def upload(self, record: Record) -> None: + """ + Uploads the file to the specified location + """ + ... + +class BaseFileWriter(ABC): + """ + Base File writer class + """ + + @abstractmethod + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + ... + +class FileWriter(BaseFileWriter): + + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + with open(str(file_path), "wb") as f: + f.write(content) + + return file_path.stat().st_size + +class NoopFileWriter(BaseFileWriter): + + def write(self, file_path: Path, content: bytes) -> int: + """ + Noop file writer + """ + return 0 @dataclass -class FileUploader: +class FileUploader(BaseFileUploader): requester: Requester download_target_extractor: RecordExtractor config: Config + file_writer: BaseFileWriter parameters: InitVar[Mapping[str, Any]] filename_extractor: Optional[Union[InterpolatedString, str]] = None @@ -77,9 +122,7 @@ def upload(self, record: Record) -> None: full_path = files_directory / file_relative_path full_path.parent.mkdir(parents=True, exist_ok=True) - with open(str(full_path), "wb") as f: - f.write(response.content) - file_size_bytes = full_path.stat().st_size + file_size_bytes = self.file_writer.write(full_path, content=response.content) logger.info("File uploaded successfully") logger.info(f"File url: {str(full_path)}") @@ -91,3 +134,13 @@ def upload(self, record: Record) -> None: source_file_relative_path=str(file_relative_path), file_size_bytes=file_size_bytes, ) + + +@dataclass +class ConnectorBuilderFileUploader(BaseFileUploader): + file_uploader: FileUploader + + def upload(self, record: Record) -> None: + self.file_uploader.upload(record=record) + for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]: + record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute) diff --git a/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 93bdc55e9..ce902e1c4 100644 --- a/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -24,6 +24,7 @@ def __init__( catalog: Optional[ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[AirbyteStateMessage]] = None, + emit_connector_builder_messages: Optional[bool] = False ) -> None: """ :param path_to_yaml: Path to the yaml file describing the source @@ -36,6 +37,7 @@ def __init__( config=config or {}, state=state or [], source_config=source_config, + emit_connector_builder_messages=emit_connector_builder_messages ) def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index e6ee40d5b..3792b6010 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -3,7 +3,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import Mock, patch from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource @@ -34,6 +34,7 @@ def _source( config: Dict[str, Any], state: Optional[List[AirbyteStateMessage]] = None, yaml_file: Optional[str] = None, + emit_connector_builder_messages: Optional[bool] = False ) -> YamlDeclarativeSource: if not yaml_file: yaml_file = "file_stream_manifest.yaml" @@ -42,6 +43,7 @@ def _source( catalog=catalog, config=config, state=state, + emit_connector_builder_messages=emit_connector_builder_messages ) @@ -51,11 +53,12 @@ def read( state_builder: Optional[StateBuilder] = None, expecting_exception: bool = False, yaml_file: Optional[str] = None, + emit_connector_builder_messages: Optional[bool] = False ) -> EntrypointOutput: config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() return entrypoint_read( - _source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception + _source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception ) @@ -190,6 +193,48 @@ def test_get_article_attachments_with_filename_extractor(self) -> None: ) assert file_reference.file_size_bytes + def test_get_article_attachments_messages_for_connector_builder(self) -> None: + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url=STREAM_URL), + HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENTS_URL), + HttpResponse( + json.dumps(find_template("file_api/article_attachments", __file__)), 200 + ), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL), + HttpResponse( + find_binary_response("file_api/article_attachment_content.png", __file__), 200 + ), + ) + + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml", + emit_connector_builder_messages=True, + ) + + assert len(output.records) == 1 + file_reference = output.records[0].record.file_reference + assert file_reference + assert file_reference.staging_file_url + assert file_reference.source_file_relative_path + # because we didn't write the file, the size is 0 + assert file_reference.file_size_bytes == 0 + + # Assert file reference fields are copied to record data + record_data = output.records[0].record.data + assert record_data["staging_file_url"] == file_reference.staging_file_url + assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path + assert record_data["file_size_bytes"] == file_reference.file_size_bytes + def test_discover_article_attachments(self) -> None: output = discover(self._config())