From da8e3b293e4326e2c07fda4038d4e96c44b9cebb Mon Sep 17 00:00:00 2001 From: maxi297 Date: Wed, 26 Mar 2025 09:16:49 -0400 Subject: [PATCH 1/2] Add file_based information in discovered catalog --- .../declarative/concurrent_declarative_source.py | 3 +++ .../sources/streams/concurrent/default_stream.py | 3 +++ .../sources/declarative/file/test_file_stream.py | 11 +++++++++++ 3 files changed, 17 insertions(+) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 7a0063b5e..657ab2df4 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -339,6 +339,7 @@ def _group_streams( else None, logger=self.logger, cursor=cursor, + supports_file_transfer=bool(file_uploader), ) ) elif ( @@ -371,6 +372,7 @@ def _group_streams( cursor_field=None, logger=self.logger, cursor=final_state_cursor, + supports_file_transfer=bool(file_uploader), ) ) elif ( @@ -425,6 +427,7 @@ def _group_streams( cursor_field=perpartition_cursor.cursor_field.cursor_field_key, logger=self.logger, cursor=perpartition_cursor, + supports_file_transfer=bool(file_uploader), ) ) else: diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 7679a1eb6..54600d635 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -29,6 +29,7 @@ def __init__( logger: Logger, cursor: Cursor, namespace: Optional[str] = None, + supports_file_transfer: bool = False, ) -> None: self._stream_partition_generator = partition_generator self._name = name @@ -39,6 +40,7 @@ def __init__( self._logger = logger self._cursor = cursor self._namespace = namespace + self._supports_file_transfer = supports_file_transfer def generate_partitions(self) -> Iterable[Partition]: yield from self._stream_partition_generator.generate() @@ -68,6 +70,7 @@ def as_airbyte_stream(self) -> AirbyteStream: json_schema=dict(self._json_schema), supported_sync_modes=[SyncMode.full_refresh], is_resumable=False, + is_file_based=self._supports_file_transfer, ) if self._namespace: diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index a90a2705c..63d207ac1 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -7,6 +7,7 @@ from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput +from airbyte_cdk.test.entrypoint_wrapper import discover as entrypoint_discover from airbyte_cdk.test.entrypoint_wrapper import read as entrypoint_read from airbyte_cdk.test.state_builder import StateBuilder @@ -49,6 +50,10 @@ def read( _source(catalog, config, state), config, catalog, state, expecting_exception ) +def discover(config_builder: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: + config = config_builder.build() + return entrypoint_discover(_source(CatalogBuilder().build(), config), config, expecting_exception) + class FileStreamTest(TestCase): def _config(self) -> ConfigBuilder: @@ -90,3 +95,9 @@ def test_get_article_attachments(self) -> None: assert file_reference.file_url assert file_reference.file_relative_path assert file_reference.file_size_bytes + + def test_discover_article_attachments(self) -> None: + output = discover(self._config()) + + article_attachments_stream = next(filter(lambda stream: stream.name == "article_attachments", output.catalog.catalog.streams)) + assert article_attachments_stream.is_file_based From b2b54f7702fc482870a677974af4d58b6c8214a9 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Wed, 26 Mar 2025 14:53:19 +0000 Subject: [PATCH 2/2] Auto-fix lint and format issues --- .../sources/declarative/retrievers/file_uploader.py | 2 +- airbyte_cdk/sources/utils/record_helper.py | 2 +- .../sources/declarative/file/test_file_stream.py | 11 +++++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py index 83850881d..348aa81c7 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader.py @@ -2,11 +2,11 @@ from pathlib import Path from typing import Optional +from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( SafeResponse, ) -from airbyte_cdk.models import AirbyteRecordMessageFileReference from airbyte_cdk.sources.declarative.requesters import Requester from airbyte_cdk.sources.declarative.types import Record, StreamSlice from airbyte_cdk.sources.utils.files_directory import get_files_directory diff --git a/airbyte_cdk/sources/utils/record_helper.py b/airbyte_cdk/sources/utils/record_helper.py index a82078d10..d41907cf1 100644 --- a/airbyte_cdk/sources/utils/record_helper.py +++ b/airbyte_cdk/sources/utils/record_helper.py @@ -9,8 +9,8 @@ AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, - AirbyteTraceMessage, AirbyteRecordMessageFileReference, + AirbyteTraceMessage, ) from airbyte_cdk.models import Type as MessageType from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index 63d207ac1..1c0547830 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -50,9 +50,12 @@ def read( _source(catalog, config, state), config, catalog, state, expecting_exception ) + def discover(config_builder: ConfigBuilder, expecting_exception: bool = False) -> EntrypointOutput: config = config_builder.build() - return entrypoint_discover(_source(CatalogBuilder().build(), config), config, expecting_exception) + return entrypoint_discover( + _source(CatalogBuilder().build(), config), config, expecting_exception + ) class FileStreamTest(TestCase): @@ -99,5 +102,9 @@ def test_get_article_attachments(self) -> None: def test_discover_article_attachments(self) -> None: output = discover(self._config()) - article_attachments_stream = next(filter(lambda stream: stream.name == "article_attachments", output.catalog.catalog.streams)) + article_attachments_stream = next( + filter( + lambda stream: stream.name == "article_attachments", output.catalog.catalog.streams + ) + ) assert article_attachments_stream.is_file_based