-
Notifications
You must be signed in to change notification settings - Fork 42
feat: add Raw Files parser option for File CDK (do not merge) #427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| from .avro_format import AvroFormat | ||
| from .csv_format import CsvFormat | ||
| from .excel_format import ExcelFormat | ||
| from .file_based_stream_config import FileBasedStreamConfig | ||
| from .jsonl_format import JsonlFormat | ||
| from .parquet_format import ParquetFormat | ||
| from .raw_format import RawFormat | ||
| from .unstructured_format import UnstructuredFormat |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| # | ||
| # Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| from pydantic.v1 import BaseModel, Field | ||
|
|
||
| from airbyte_cdk.utils.oneof_option_config import OneOfOptionConfig | ||
|
|
||
|
|
||
| class RawFormat(BaseModel): | ||
| class Config(OneOfOptionConfig): | ||
| title = "Raw Files Format" | ||
| description = "Use this format when you want to copy files without parsing them. Must be used with the 'Copy Raw Files' delivery method." | ||
| discriminator = "filetype" | ||
|
|
||
| filetype: str = Field( | ||
| "raw", | ||
| const=True, | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| # | ||
| # Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
| # | ||
|
|
||
| import logging | ||
| from typing import Any, Dict, Iterable, Mapping, Optional, Tuple | ||
|
|
||
| from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig | ||
| from airbyte_cdk.sources.file_based.exceptions import FileBasedSourceError, RecordParseError | ||
| from airbyte_cdk.sources.file_based.file_based_stream_reader import ( | ||
| AbstractFileBasedStreamReader, | ||
| FileReadMode, | ||
| ) | ||
| from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser | ||
| from airbyte_cdk.sources.file_based.remote_file import RemoteFile | ||
| from airbyte_cdk.sources.file_based.schema_helpers import SchemaType | ||
|
|
||
|
|
||
| class RawParser(FileTypeParser): | ||
| """ | ||
| A parser that doesn't actually parse files. It's designed to be used with the "Copy Raw Files" delivery method. | ||
| """ | ||
|
|
||
| @property | ||
| def parser_max_n_files_for_schema_inference(self) -> Optional[int]: | ||
| """ | ||
| Just check one file as the schema is static | ||
| """ | ||
| return 1 | ||
|
|
||
| @property | ||
| def parser_max_n_files_for_parsability(self) -> Optional[int]: | ||
| """ | ||
| Do not check any files for parsability since we're not actually parsing them | ||
| """ | ||
| return 0 | ||
|
|
||
| def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: | ||
| """ | ||
| Verify that this parser is only used with the "Copy Raw Files" delivery method. | ||
| """ | ||
| from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec | ||
| from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import use_file_transfer | ||
|
|
||
| # Create a mock config to check if the delivery method is set to use file transfer | ||
| mock_config = type('MockConfig', (AbstractFileBasedSpec,), { | ||
| 'delivery_method': config.source_config.delivery_method, | ||
| 'documentation_url': staticmethod(lambda: ""), | ||
| })() | ||
|
|
||
| if not use_file_transfer(mock_config): | ||
| return False, "The 'Raw Files' parser can only be used with the 'Copy Raw Files' delivery method." | ||
|
|
||
| return True, None | ||
|
|
||
| async def infer_schema( | ||
| self, | ||
| config: FileBasedStreamConfig, | ||
| file: RemoteFile, | ||
| stream_reader: AbstractFileBasedStreamReader, | ||
| logger: logging.Logger, | ||
| ) -> SchemaType: | ||
| """ | ||
| Return a minimal schema since we're not actually parsing the files. | ||
| """ | ||
| return {} | ||
|
|
||
| def parse_records( | ||
| self, | ||
| config: FileBasedStreamConfig, | ||
| file: RemoteFile, | ||
| stream_reader: AbstractFileBasedStreamReader, | ||
| logger: logging.Logger, | ||
| discovered_schema: Optional[Mapping[str, SchemaType]], | ||
| ) -> Iterable[Dict[str, Any]]: | ||
| """ | ||
| This method should never be called since we're using the "Copy Raw Files" delivery method. | ||
| """ | ||
|
|
||
| # This is a safeguard in case this method is called | ||
| # Since we're not actually parsing files, just return an empty iterator | ||
| # The validation that this format is only used with "Copy Raw Files" delivery method | ||
| # will be handled at a higher level in the availability strategy | ||
| # Return an empty iterable | ||
| return iter([]) | ||
|
|
||
| @property | ||
| def file_read_mode(self) -> FileReadMode: | ||
| """ | ||
| We don't actually read the files, but if we did, we'd use binary mode. | ||
| """ | ||
| return FileReadMode.READ_BINARY | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these to the top of the file.