Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ac278b9
files-mode-api: initial changes to emit record
aldogonzalez8 Mar 25, 2025
7a02ec8
Add file_based information in discovered catalog (#446)
maxi297 Mar 26, 2025
962ddbe
file-mode-api: make uploader updates record file_reference field
aldogonzalez8 Mar 26, 2025
05bc2cb
file-mode-api: add logging to file creation
aldogonzalez8 Mar 27, 2025
3565455
file-mode-api: fix file_url to be full path
aldogonzalez8 Mar 28, 2025
188f9a5
feat(file-mode-api): move file uploader to record selector level. (#449)
aldogonzalez8 Mar 31, 2025
68480b7
feat(file-mode-api: add filename extractor component (#453)
aldogonzalez8 Mar 31, 2025
6ebfc05
file-mode-api: merge from maxi297/pox-file-upload
aldogonzalez8 Apr 2, 2025
b7bfef5
file-mode-api: initial commit for file based cdk to align with new pr…
aldogonzalez8 Apr 3, 2025
6445211
file-mode-api: merge from maxi297/pox-file-upload
aldogonzalez8 Apr 3, 2025
2cbd065
Auto-fix lint and format issues
Apr 3, 2025
69b7835
file-mode-api: fix lint
aldogonzalez8 Apr 3, 2025
682318e
file-mode-api: bump protocol to latest pre-dev
aldogonzalez8 Apr 4, 2025
8701c27
file-mode-api: add more tests
aldogonzalez8 Apr 4, 2025
9fb3b8c
file-mode-api: limit schema on cdk side with a model
aldogonzalez8 Apr 4, 2025
61ccf7c
file-mode-api: remove unnecesary imports
aldogonzalez8 Apr 4, 2025
aa1f394
file-mode-api: fix for mypy
aldogonzalez8 Apr 4, 2025
abeba93
Auto-fix lint and format issues
Apr 4, 2025
e68873a
file-mode-api: minor fix for schema not having extra fields
aldogonzalez8 Apr 4, 2025
2be82e9
file-mode-api: refactor _get_file_transfer_paths
aldogonzalez8 Apr 5, 2025
8080867
file-mode-api: remove file reference of the FileBasedStreamFacade
aldogonzalez8 Apr 7, 2025
6db0406
file-mode-api: add new file_reference in the FileBasedStreamFacade wh…
aldogonzalez8 Apr 7, 2025
199d99a
file-mode-api: remove_unnecesary absolut_path field from _get_file_tr…
aldogonzalez8 Apr 7, 2025
8920154
file-mode-api: provide ability to source to provide a path extractor …
aldogonzalez8 Apr 7, 2025
cb5884e
file-mode-api: bump to latest version (non-dev) of airbyte-protocol-m…
aldogonzalez8 Apr 7, 2025
fe77b13
file-mode-api: fix lint, andd ruff
aldogonzalez8 Apr 8, 2025
b4ce3fa
file-mode-api: fix for file_uploader as protocol attributes name chan…
aldogonzalez8 Apr 8, 2025
e746feb
Auto-fix lint and format issues
Apr 8, 2025
8d567ac
file-mode-api: add missing test file
aldogonzalez8 Apr 8, 2025
703c268
file-mode-api: add missing file_based key for some scenatios
aldogonzalez8 Apr 8, 2025
18a31d2
file-mode-api: fix tests for new is_file_based field in Stream
aldogonzalez8 Apr 8, 2025
682598b
Auto-fix lint and format issues
Apr 8, 2025
f8179ec
file-mode-api: remove unused fields from file-based schema
aldogonzalez8 Apr 9, 2025
9369f26
fil-mode-api: make file_transfer an internal field and remove unneces…
aldogonzalez8 Apr 14, 2025
c758f3b
merge from main
aldogonzalez8 Apr 14, 2025
9dc319e
fil-mode-api: poetry lock
aldogonzalez8 Apr 14, 2025
a4156fb
file-mode-api: uri make connectors pass the relative path of the file…
aldogonzalez8 Apr 14, 2025
8693830
file-mode-api: ruff format
aldogonzalez8 Apr 14, 2025
8623551
file-mode-api: add doc strings to _get_file_transfer_paths
aldogonzalez8 Apr 14, 2025
a50905b
file-mode-api: add source_uri to schema
aldogonzalez8 Apr 14, 2025
4658790
file-mode-api: fix unit tests
aldogonzalez8 Apr 14, 2025
0ac0d5c
merge from main
aldogonzalez8 Apr 15, 2025
e9474c7
merge from maxi297/poc-file-upload
aldogonzalez8 Apr 15, 2025
f144f98
file-api: add created_at to schema
aldogonzalez8 Apr 16, 2025
47abb52
merge from main
aldogonzalez8 Apr 16, 2025
3fc844c
poetry lockl
aldogonzalez8 Apr 16, 2025
26ae61c
connector builder: initial changes to pass file reference info to data
aldogonzalez8 Apr 17, 2025
fe03e0a
merge from maxi297/poc-file-upload
aldogonzalez8 Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,8 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader, FileWriter, NoopFileWriter, \
ConnectorBuilderFileUploader, BaseFileUploader
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -3590,7 +3591,7 @@ def create_fixed_window_call_rate_policy(

def create_file_uploader(
self, model: FileUploaderModel, config: Config, **kwargs: Any
) -> FileUploader:
) -> BaseFileUploader:
name = "File Uploader"
requester = self._create_component_from_model(
model=model.requester,
Expand All @@ -3604,14 +3605,18 @@ def create_file_uploader(
name=name,
**kwargs,
)
return FileUploader(
emit_connector_builder_messages = self._emit_connector_builder_messages
file_uploader = FileUploader(
requester=requester,
download_target_extractor=download_target_extractor,
config=config,
file_writer=NoopFileWriter() if emit_connector_builder_messages else FileWriter(),
parameters=model.parameters or {},
filename_extractor=model.filename_extractor if model.filename_extractor else None,
)

return ConnectorBuilderFileUploader(file_uploader) if emit_connector_builder_messages else file_uploader

def create_moving_window_call_rate_policy(
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
) -> MovingWindowCallRatePolicy:
Expand Down
61 changes: 57 additions & 4 deletions airbyte_cdk/sources/declarative/retrievers/file_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pathlib import Path
from typing import Any, Mapping, Optional, Union

from abc import ABC, abstractmethod
from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
Expand All @@ -24,12 +25,56 @@

logger = logging.getLogger("airbyte")

@dataclass
class BaseFileUploader(ABC):
"""
Base class for file uploader
"""

@abstractmethod
def upload(self, record: Record) -> None:
"""
Uploads the file to the specified location
"""
...

class BaseFileWriter(ABC):
"""
Base File writer class
"""

@abstractmethod
def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
...

class FileWriter(BaseFileWriter):

def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
with open(str(file_path), "wb") as f:
f.write(content)

return file_path.stat().st_size

class NoopFileWriter(BaseFileWriter):

def write(self, file_path: Path, content: bytes) -> int:
"""
Noop file writer
"""
return 0

@dataclass
class FileUploader:
class FileUploader(BaseFileUploader):
requester: Requester
download_target_extractor: RecordExtractor
config: Config
file_writer: BaseFileWriter
parameters: InitVar[Mapping[str, Any]]

filename_extractor: Optional[Union[InterpolatedString, str]] = None
Expand Down Expand Up @@ -77,9 +122,7 @@ def upload(self, record: Record) -> None:
full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)

with open(str(full_path), "wb") as f:
f.write(response.content)
file_size_bytes = full_path.stat().st_size
file_size_bytes = self.file_writer.write(full_path, content=response.content)

logger.info("File uploaded successfully")
logger.info(f"File url: {str(full_path)}")
Expand All @@ -91,3 +134,13 @@ def upload(self, record: Record) -> None:
source_file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)


@dataclass
class ConnectorBuilderFileUploader(BaseFileUploader):
file_uploader: FileUploader

def upload(self, record: Record) -> None:
self.file_uploader.upload(record=record)
for file_reference_attribute in [file_reference_attribute for file_reference_attribute in record.file_reference.__dict__ if not file_reference_attribute.startswith('_')]:
record.data[file_reference_attribute] = getattr(record.file_reference, file_reference_attribute)
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/yaml_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
catalog: Optional[ConfiguredAirbyteCatalog] = None,
config: Optional[Mapping[str, Any]] = None,
state: Optional[List[AirbyteStateMessage]] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> None:
"""
:param path_to_yaml: Path to the yaml file describing the source
Expand All @@ -36,6 +37,7 @@ def __init__(
config=config or {},
state=state or [],
source_config=source_config,
emit_connector_builder_messages=emit_connector_builder_messages
)

def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
Expand Down
49 changes: 47 additions & 2 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest import TestCase
from unittest.mock import Mock
from unittest.mock import Mock, patch

from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
Expand Down Expand Up @@ -34,6 +34,7 @@ def _source(
config: Dict[str, Any],
state: Optional[List[AirbyteStateMessage]] = None,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> YamlDeclarativeSource:
if not yaml_file:
yaml_file = "file_stream_manifest.yaml"
Expand All @@ -42,6 +43,7 @@ def _source(
catalog=catalog,
config=config,
state=state,
emit_connector_builder_messages=emit_connector_builder_messages
)


Expand All @@ -51,11 +53,12 @@ def read(
state_builder: Optional[StateBuilder] = None,
expecting_exception: bool = False,
yaml_file: Optional[str] = None,
emit_connector_builder_messages: Optional[bool] = False
) -> EntrypointOutput:
config = config_builder.build()
state = state_builder.build() if state_builder else StateBuilder().build()
return entrypoint_read(
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
_source(catalog, config, state, yaml_file, emit_connector_builder_messages), config, catalog, state, expecting_exception
)


Expand Down Expand Up @@ -190,6 +193,48 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
)
assert file_reference.file_size_bytes

def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
emit_connector_builder_messages=True,
)

assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
assert file_reference.source_file_relative_path
# because we didn't write the file, the size is 0
assert file_reference.file_size_bytes == 0

# Assert file reference fields are copied to record data
record_data = output.records[0].record.data
assert record_data["staging_file_url"] == file_reference.staging_file_url
assert record_data["source_file_relative_path"] == file_reference.source_file_relative_path
assert record_data["file_size_bytes"] == file_reference.file_size_bytes

def test_discover_article_attachments(self) -> None:
output = discover(self._config())

Expand Down
Loading