Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airbyte_cdk/sources/file_based/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from .avro_format import AvroFormat
from .csv_format import CsvFormat
from .excel_format import ExcelFormat
from .file_based_stream_config import FileBasedStreamConfig
from .jsonl_format import JsonlFormat
from .parquet_format import ParquetFormat
from .raw_format import RawFormat
from .unstructured_format import UnstructuredFormat
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
from airbyte_cdk.sources.file_based.config.raw_format import RawFormat
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
from airbyte_cdk.sources.file_based.exceptions import ConfigValidationError, FileBasedSourceError
from airbyte_cdk.sources.file_based.schema_helpers import type_mapping_to_jsonschema
Expand Down Expand Up @@ -58,7 +59,13 @@ class FileBasedStreamConfig(BaseModel):
default=3,
)
format: Union[
AvroFormat, CsvFormat, JsonlFormat, ParquetFormat, UnstructuredFormat, ExcelFormat
AvroFormat,
CsvFormat,
JsonlFormat,
ParquetFormat,
RawFormat,
UnstructuredFormat,
ExcelFormat,
] = Field(
title="Format",
description="The configuration options that are used to alter how to read incoming files that deviate from the standard formatting.",
Expand Down
19 changes: 19 additions & 0 deletions airbyte_cdk/sources/file_based/config/raw_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from pydantic.v1 import BaseModel, Field

from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig


class RawFormat(BaseModel):
class Config(OneOfOptionConfig):
title = "Raw Files Format"
description = "Use this format when you want to copy files without parsing them. Must be used with the 'Copy Raw Files' delivery method."
discriminator = "filetype"

filetype: str = Field(
"raw",
const=True,
)
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/file_based/file_types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
from airbyte_cdk.sources.file_based.config.raw_format import RawFormat
from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat

from .avro_parser import AvroParser
Expand All @@ -14,6 +15,7 @@
from .file_type_parser import FileTypeParser
from .jsonl_parser import JsonlParser
from .parquet_parser import ParquetParser
from .raw_parser import RawParser
from .unstructured_parser import UnstructuredParser

default_parsers: Mapping[Type[Any], FileTypeParser] = {
Expand All @@ -22,6 +24,7 @@
ExcelFormat: ExcelParser(),
JsonlFormat: JsonlParser(),
ParquetFormat: ParquetParser(),
RawFormat: RawParser(),
UnstructuredFormat: UnstructuredParser(),
}

Expand All @@ -31,6 +34,7 @@
"ExcelParser",
"JsonlParser",
"ParquetParser",
"RawParser",
"UnstructuredParser",
"FileTransfer",
"default_parsers",
Expand Down
92 changes: 92 additions & 0 deletions airbyte_cdk/sources/file_based/file_types/raw_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError
from airbyte_cdk.sources.file_based.file_based_stream_reader import (
AbstractFileBasedStreamReader,
FileReadMode,
)
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType


class RawParser(FileTypeParser):
"""
A parser that doesn't actually parse files. It's designed to be used with the "Copy Raw Files" delivery method.
"""

@property
def parser_max_n_files_for_schema_inference(self) -> Optional[int]:
"""
Just check one file as the schema is static
"""
return 1

@property
def parser_max_n_files_for_parsability(self) -> Optional[int]:
"""
Do not check any files for parsability since we're not actually parsing them
"""
return 0

def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
"""
Verify that this parser is only used with the "Copy Raw Files" delivery method.
"""
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import use_file_transfer
Comment on lines +42 to +43
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move these to the top of the file.


# Create a mock config to check if the delivery method is set to use file transfer
mock_config = type('MockConfig', (AbstractFileBasedSpec,), {
'delivery_method': config.source_config.delivery_method,
'documentation_url': staticmethod(lambda: ""),
})()

if not use_file_transfer(mock_config):
return False, "The 'Raw Files' parser can only be used with the 'Copy Raw Files' delivery method."

return True, None

async def infer_schema(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> SchemaType:
"""
Return a minimal schema since we're not actually parsing the files.
"""
return {}

def parse_records(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
discovered_schema: Optional[Mapping[str, SchemaType]],
) -> Iterable[Dict[str, Any]]:
"""
This method should never be called since we're using the "Copy Raw Files" delivery method.
"""

# This is a safeguard in case this method is called
# Since we're not actually parsing files, just return an empty iterator
# The validation that this format is only used with "Copy Raw Files" delivery method
# will be handled at a higher level in the availability strategy
# Return an empty iterable
return iter([])

@property
def file_read_mode(self) -> FileReadMode:
"""
We don't actually read the files, but if we did, we'd use binary mode.
"""
return FileReadMode.READ_BINARY
Loading