Skip to content
1 change: 1 addition & 0 deletions airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AirbyteMessage,
AirbyteProtocol,
AirbyteRecordMessage,
AirbyteRecordMessageFileReference,
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
stream_name=record.stream_name,
data_or_message=record.data,
is_file_transfer_message=record.is_file_transfer_message,
file_reference=record.file_reference,
)
stream = self._stream_name_to_instance[record.stream_name]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def _group_streams(
else None,
logger=self.logger,
cursor=cursor,
supports_file_transfer=bool(file_uploader),
)
)
elif (
Expand Down Expand Up @@ -371,6 +372,7 @@ def _group_streams(
cursor_field=None,
logger=self.logger,
cursor=final_state_cursor,
supports_file_transfer=bool(file_uploader),
)
)
elif (
Expand Down Expand Up @@ -425,6 +427,7 @@ def _group_streams(
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
cursor=perpartition_cursor,
supports_file_transfer=bool(file_uploader),
)
)
else:
Expand Down
28 changes: 26 additions & 2 deletions airbyte_cdk/sources/declarative/retrievers/file_uploader.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import json
import logging
from pathlib import Path
from typing import Optional

from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SafeResponse,
)
from airbyte_cdk.sources.declarative.requesters import Requester
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.utils.files_directory import get_files_directory

logger = logging.getLogger("airbyte")

class FileUploader:
def __init__(
Expand All @@ -24,7 +28,7 @@ def __init__(
def upload(self, record: Record) -> None:
# TODO validate record shape - is the transformation applied at this point?
mocked_response = SafeResponse()
mocked_response.content = json.dumps(record.data)
mocked_response.content = json.dumps(record.data).encode("utf-8")
download_target = list(self._download_target_extractor.extract_records(mocked_response))[0]
if not isinstance(download_target, str):
raise ValueError(
Expand All @@ -40,5 +44,25 @@ def upload(self, record: Record) -> None:
if self._content_extractor:
raise NotImplementedError("TODO")
else:
with open(str(Path(__file__).parent / record.data["file_name"]), "ab") as f:
files_directory = Path(get_files_directory())
# TODO:: we could either interpolate record data if some relative_path is provided or
# use partition_field value in the slice {"partition_field": some_value_id} to create a path
file_relative_path = Path(record.stream_name) / record.data["file_name"]
Comment thread
aldogonzalez8 marked this conversation as resolved.
Outdated

full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)

with open(str(full_path), "wb") as f:
f.write(response.content)
file_size_bytes = full_path.stat().st_size

logger.info("File uploaded successfully")
logger.info(f"File path: {full_path} ")
logger.info(f"File size: {file_size_bytes / 1024} KB")
logger.info(f"File download target: {download_target}")
Comment thread
aldogonzalez8 marked this conversation as resolved.
Outdated

record.file_reference = AirbyteRecordMessageFileReference(
file_url=download_target,
file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)
10 changes: 2 additions & 8 deletions airbyte_cdk/sources/file_based/file_types/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,12 @@
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.remote_file import RemoteFile

AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"
from airbyte_cdk.sources.utils.files_directory import get_files_directory


class FileTransfer:
def __init__(self) -> None:
self._local_directory = (
AIRBYTE_STAGING_DIRECTORY
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
else DEFAULT_LOCAL_DIRECTORY
)
self._local_directory = get_files_directory()

def get_file(
self,
Expand Down
3 changes: 3 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
logger: Logger,
cursor: Cursor,
namespace: Optional[str] = None,
supports_file_transfer: bool = False,
) -> None:
self._stream_partition_generator = partition_generator
self._name = name
Expand All @@ -39,6 +40,7 @@ def __init__(
self._logger = logger
self._cursor = cursor
self._namespace = namespace
self._supports_file_transfer = supports_file_transfer

def generate_partitions(self) -> Iterable[Partition]:
yield from self._stream_partition_generator.generate()
Expand Down Expand Up @@ -68,6 +70,7 @@ def as_airbyte_stream(self) -> AirbyteStream:
json_schema=dict(self._json_schema),
supported_sync_modes=[SyncMode.full_refresh],
is_resumable=False,
is_file_based=self._supports_file_transfer,
)

if self._namespace:
Expand Down
11 changes: 11 additions & 0 deletions airbyte_cdk/sources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from typing import Any, ItemsView, Iterator, KeysView, List, Mapping, Optional, ValuesView

from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.utils.slice_hasher import SliceHasher

# A FieldPointer designates a path to a field inside a mapping. For example, retrieving ["k1", "k1.2"] in the object {"k1" :{"k1.2":
Expand All @@ -24,11 +25,13 @@ def __init__(
stream_name: str,
associated_slice: Optional[StreamSlice] = None,
is_file_transfer_message: bool = False,
file_reference: Optional[AirbyteRecordMessageFileReference] = None,
):
self._data = data
self._associated_slice = associated_slice
self.stream_name = stream_name
self.is_file_transfer_message = is_file_transfer_message
self._file_reference = file_reference

@property
def data(self) -> Mapping[str, Any]:
Expand All @@ -38,6 +41,14 @@ def data(self) -> Mapping[str, Any]:
def associated_slice(self) -> Optional[StreamSlice]:
return self._associated_slice

@property
def file_reference(self) -> AirbyteRecordMessageFileReference:
return self._file_reference

@file_reference.setter
def file_reference(self, value: AirbyteRecordMessageFileReference):
self._file_reference = value

def __repr__(self) -> str:
return repr(self._data)

Expand Down
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/utils/files_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import os

AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"


def get_files_directory() -> str:
return (
AIRBYTE_STAGING_DIRECTORY
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
else DEFAULT_LOCAL_DIRECTORY
)
9 changes: 8 additions & 1 deletion airbyte_cdk/sources/utils/record_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
AirbyteLogMessage,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteRecordMessageFileReference,
AirbyteTraceMessage,
)
from airbyte_cdk.models import Type as MessageType
Expand All @@ -23,6 +24,7 @@ def stream_data_to_airbyte_message(
transformer: TypeTransformer = TypeTransformer(TransformConfig.NoTransform),
schema: Optional[Mapping[str, Any]] = None,
is_file_transfer_message: bool = False,
file_reference: Optional[AirbyteRecordMessageFileReference] = None,
) -> AirbyteMessage:
if schema is None:
schema = {}
Expand All @@ -41,7 +43,12 @@ def stream_data_to_airbyte_message(
stream=stream_name, file=data, emitted_at=now_millis, data={}
)
else:
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
message = AirbyteRecordMessage(
stream=stream_name,
data=data,
emitted_at=now_millis,
file_reference=file_reference,
)
return AirbyteMessage(type=MessageType.RECORD, record=message)
case AirbyteTraceMessage():
return AirbyteMessage(type=MessageType.TRACE, trace=data_or_message)
Expand Down
Loading
Loading