diff --git a/airbyte_cdk/sources/file_based/config/__init__.py b/airbyte_cdk/sources/file_based/config/__init__.py index e69de29bb..702734390 100644 --- a/airbyte_cdk/sources/file_based/config/__init__.py +++ b/airbyte_cdk/sources/file_based/config/__init__.py @@ -0,0 +1,8 @@ +from .avro_format import AvroFormat +from .csv_format import CsvFormat +from .excel_format import ExcelFormat +from .file_based_stream_config import FileBasedStreamConfig +from .jsonl_format import JsonlFormat +from .parquet_format import ParquetFormat +from .raw_format import RawFormat +from .unstructured_format import UnstructuredFormat diff --git a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index eb592a4aa..620f7443a 100644 --- a/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -12,6 +12,7 @@ from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat +from airbyte_cdk.sources.file_based.config.raw_format import RawFormat from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema @@ -58,7 +59,13 @@ class FileBasedStreamConfig(BaseModel): default=3, ) format: Union[ - AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat + AvroFormat, + CsvFormat, + JsonlFormat, + ParquetFormat, + RawFormat, + UnstructuredFormat, + ExcelFormat, ] = Field( title="Format", description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.", diff --git a/airbyte_cdk/sources/file_based/config/raw_format.py b/airbyte_cdk/sources/file_based/config/raw_format.py new file mode 100644 index 000000000..1831331fc --- /dev/null +++ b/airbyte_cdk/sources/file_based/config/raw_format.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from pydantic.v1 import BaseModel, Field + +from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig + + +class RawFormat(BaseModel): + class Config(OneOfOptionConfig): + title = "Raw Files Format" + description = "Use this format when you want to copy files without parsing them. Must be used with the 'Copy Raw Files' delivery method." + discriminator = "filetype" + + filetype: str = Field( + "raw", + const=True, + ) diff --git a/airbyte_cdk/sources/file_based/file_types/__init__.py b/airbyte_cdk/sources/file_based/file_types/__init__.py index b9d8f1d52..3d7c1a924 100644 --- a/airbyte_cdk/sources/file_based/file_types/__init__.py +++ b/airbyte_cdk/sources/file_based/file_types/__init__.py @@ -5,6 +5,7 @@ from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat +from airbyte_cdk.sources.file_based.config.raw_format import RawFormat from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat from .avro_parser import AvroParser @@ -14,6 +15,7 @@ from .file_type_parser import FileTypeParser from .jsonl_parser import JsonlParser from .parquet_parser import ParquetParser +from .raw_parser import RawParser from .unstructured_parser import UnstructuredParser default_parsers: Mapping[Type[Any], FileTypeParser] = { @@ -22,6 +24,7 @@ ExcelFormat: ExcelParser(), JsonlFormat: JsonlParser(), ParquetFormat: ParquetParser(), + RawFormat: RawParser(), UnstructuredFormat: UnstructuredParser(), } @@ -31,6 +34,7 @@ "ExcelParser", "JsonlParser", "ParquetParser", + "RawParser", "UnstructuredParser", "FileTransfer", "default_parsers", diff --git a/airbyte_cdk/sources/file_based/file_types/raw_parser.py b/airbyte_cdk/sources/file_based/file_types/raw_parser.py new file mode 100644 index 000000000..1e7d26bdf --- /dev/null +++ b/airbyte_cdk/sources/file_based/file_types/raw_parser.py @@ -0,0 +1,92 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import logging +from typing import Any, Dict, Iterable, Mapping, Optional, Tuple + +from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig +from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError +from airbyte_cdk.sources.file_based.file_based_stream_reader import ( + AbstractFileBasedStreamReader, + FileReadMode, +) +from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser +from airbyte_cdk.sources.file_based.remote_file import RemoteFile +from airbyte_cdk.sources.file_based.schema_helpers import SchemaType + + +class RawParser(FileTypeParser): + """ + A parser that doesn't actually parse files. It's designed to be used with the "Copy Raw Files" delivery method. + """ + + @property + def parser_max_n_files_for_schema_inference(self) -> Optional[int]: + """ + Just check one file as the schema is static + """ + return 1 + + @property + def parser_max_n_files_for_parsability(self) -> Optional[int]: + """ + Do not check any files for parsability since we're not actually parsing them + """ + return 0 + + def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: + """ + Verify that this parser is only used with the "Copy Raw Files" delivery method. + """ + from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec + from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import use_file_transfer + + # Create a mock config to check if the delivery method is set to use file transfer + mock_config = type('MockConfig', (AbstractFileBasedSpec,), { + 'delivery_method': config.source_config.delivery_method, + 'documentation_url': staticmethod(lambda: ""), + })() + + if not use_file_transfer(mock_config): + return False, "The 'Raw Files' parser can only be used with the 'Copy Raw Files' delivery method." + + return True, None + + async def infer_schema( + self, + config: FileBasedStreamConfig, + file: RemoteFile, + stream_reader: AbstractFileBasedStreamReader, + logger: logging.Logger, + ) -> SchemaType: + """ + Return a minimal schema since we're not actually parsing the files. + """ + return {} + + def parse_records( + self, + config: FileBasedStreamConfig, + file: RemoteFile, + stream_reader: AbstractFileBasedStreamReader, + logger: logging.Logger, + discovered_schema: Optional[Mapping[str, SchemaType]], + ) -> Iterable[Dict[str, Any]]: + """ + This method should never be called since we're using the "Copy Raw Files" delivery method. + """ + + # This is a safeguard in case this method is called + # Since we're not actually parsing files, just return an empty iterator + # The validation that this format is only used with "Copy Raw Files" delivery method + # will be handled at a higher level in the availability strategy + # Return an empty iterable + return iter([]) + + @property + def file_read_mode(self) -> FileReadMode: + """ + We don't actually read the files, but if we did, we'd use binary mode. + """ + return FileReadMode.READ_BINARY