diff --git a/airbyte_cdk/sources/file_based/config/unstructured_format.py b/airbyte_cdk/sources/file_based/config/unstructured_format.py index c03540ce6..8bdbc3440 100644 --- a/airbyte_cdk/sources/file_based/config/unstructured_format.py +++ b/airbyte_cdk/sources/file_based/config/unstructured_format.py @@ -100,3 +100,11 @@ class Config(OneOfOptionConfig): discriminator="mode", type="object", ) + + output_format: str = Field( + default="markdown_text", + always_show=True, + title="Output Format", + enum=["markdown_text", "markdown_json"], + description="The output format for parsed document content. `markdown_text` renders the document as flat Markdown text. `markdown_json` outputs a JSON array of structured elements with type, text, and metadata fields, preserving document structure for easier downstream processing.", + ) diff --git a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py index f55675e0a..cccf39cd4 100644 --- a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import json import logging import os import traceback @@ -147,10 +148,17 @@ async def infer_schema( self._get_file_type_error_message(filetype), ) + if format.output_format == "markdown_json": + content_description = "Content of the file as a JSON array of structured elements with type, text, and metadata fields. Might be null if the file could not be parsed" + else: + content_description = ( + "Content of the file as markdown. Might be null if the file could not be parsed" + ) + return { "content": { "type": "string", - "description": "Content of the file as markdown. Might be null if the file could not be parsed", + "description": content_description, }, "document_key": { "type": "string", @@ -225,23 +233,33 @@ def _read_file( if filetype in {FileType.MD, FileType.TXT}: file_content: bytes = file_handle.read() decoded_content: str = optional_decode(file_content) + if format.output_format == "markdown_json": + return json.dumps( + [{"type": "NarrativeText", "text": decoded_content, "metadata": {}}] + ) return decoded_content if format.processing.mode == "local": - return self._read_file_locally( + elements = self._read_file_locally_elements( file_handle, filetype, format.strategy, remote_file, ) + if format.output_format == "markdown_json": + return json.dumps(elements) + return self._render_markdown(elements) elif format.processing.mode == "api": try: - result: str = self._read_file_remotely_with_retries( + elements = self._read_file_remotely_elements_with_retries( file_handle, format.processing, filetype, format.strategy, remote_file, ) + if format.output_format == "markdown_json": + return json.dumps(elements) + return self._render_markdown(elements) except Exception as e: # If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow. # @@ -253,8 +271,6 @@ def _read_file( e, failure_type=FailureType.config_error ) - return result - def _params_to_dict( self, params: Optional[List[APIParameterConfigModel]], strategy: str ) -> Dict[str, Union[str, List[str]]]: @@ -323,6 +339,24 @@ def _read_file_remotely_with_retries( """ return self._read_file_remotely(file_handle, format, filetype, strategy, remote_file) + @backoff.on_exception( + backoff.expo, requests.exceptions.RequestException, max_tries=5, giveup=user_error + ) + def _read_file_remotely_elements_with_retries( + self, + file_handle: IOBase, + format: APIProcessingConfigModel, + filetype: FileType, + strategy: str, + remote_file: RemoteFile, + ) -> List[Dict[str, Any]]: + """ + Read a file remotely and return the raw JSON elements, retrying up to 5 times if the error is not caused by user error. + """ + return self._read_file_remotely_elements( + file_handle, format, filetype, strategy, remote_file + ) + def _read_file_remotely( self, file_handle: IOBase, @@ -352,9 +386,41 @@ def _read_file_remotely( return self._render_markdown(json_response) + def _read_file_remotely_elements( + self, + file_handle: IOBase, + format: APIProcessingConfigModel, + filetype: FileType, + strategy: str, + remote_file: RemoteFile, + ) -> List[Dict[str, Any]]: + headers = {"accept": "application/json", "unstructured-api-key": format.api_key} + + data = self._params_to_dict(format.parameters, strategy) + + file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])} + + response = requests.post( + f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data + ) + + if response.status_code == 422: + raise self._create_parse_error(remote_file, response.json()) + else: + response.raise_for_status() + + json_response: List[Dict[str, Any]] = response.json() + return json_response + def _read_file_locally( self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile ) -> str: + elements = self._read_file_locally_elements(file_handle, filetype, strategy, remote_file) + return self._render_markdown(elements) + + def _read_file_locally_elements( + self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile + ) -> List[Dict[str, Any]]: _import_unstructured() if ( (not unstructured_partition_pdf) @@ -385,7 +451,7 @@ def _read_file_locally( except Exception as e: raise self._create_parse_error(remote_file, str(e)) - return self._render_markdown([element.to_dict() for element in elements]) + return [element.to_dict() for element in elements] def _create_parse_error( self, diff --git a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py index 374a02eed..6b0549275 100644 --- a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py +++ b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py @@ -3,6 +3,7 @@ # import asyncio +import json from datetime import datetime from unittest import mock from unittest.mock import MagicMock, call, mock_open, patch @@ -674,3 +675,165 @@ def test_parse_records_remotely( requests_mock.post.assert_has_calls(expected_requests) else: requests_mock.post.assert_not_called() + + +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +def test_infer_schema_markdown_json(mock_detect_filetype): + """Test that infer_schema returns correct description for markdown_json output format.""" + main_loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + mock_detect_filetype.return_value = FileType.PDF + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_files=False, output_format="markdown_json" + ) + schema = loop.run_until_complete( + UnstructuredParser().infer_schema(config, MagicMock(), MagicMock(), MagicMock()) + ) + assert schema == { + "content": { + "type": "string", + "description": "Content of the file as a JSON array of structured elements with type, text, and metadata fields. Might be null if the file could not be parsed", + }, + "document_key": { + "type": "string", + "description": "Unique identifier of the document, e.g. the file path", + }, + "_ab_source_file_parse_error": { + "type": "string", + "description": "Error message if the file could not be parsed even though the file is supported", + }, + } + loop.close() + asyncio.set_event_loop(main_loop) + + +@patch("unstructured.partition.pdf.partition_pdf") +@patch("unstructured.partition.pptx.partition_pptx") +@patch("unstructured.partition.docx.partition_docx") +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +def test_parse_records_markdown_json_local( + mock_detect_filetype, + mock_partition_docx, + mock_partition_pptx, + mock_partition_pdf, +): + """Test that parse_records returns JSON elements when output_format is markdown_json for local processing.""" + elements = [ + Title("heading"), + Text("This is the text"), + ListItem("This is a list item"), + Formula("This is a formula"), + ] + mock_partition_pdf.return_value = elements + + stream_reader = MagicMock() + mock_open(stream_reader.open_file, read_data=b"fake pdf content") + fake_file = RemoteFile(uri=FILE_URI, last_modified=datetime.now()) + logger = MagicMock() + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_files=False, + output_format="markdown_json", + ) + mock_detect_filetype.return_value = FileType.PDF + + records = list( + UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock()) + ) + assert len(records) == 1 + assert records[0]["document_key"] == FILE_URI + assert records[0]["_ab_source_file_parse_error"] is None + # Verify content is valid JSON + content = json.loads(records[0]["content"]) + assert isinstance(content, list) + assert len(content) == 4 + assert content[0]["type"] == "Title" + assert content[0]["text"] == "heading" + assert content[1]["type"] == "UncategorizedText" + assert content[1]["text"] == "This is the text" + assert content[2]["type"] == "ListItem" + assert content[2]["text"] == "This is a list item" + assert content[3]["type"] == "Formula" + assert content[3]["text"] == "This is a formula" + + +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.requests") +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +@patch("time.sleep", side_effect=lambda _: None) +def test_parse_records_markdown_json_remote( + time_mock, + mock_detect_filetype, + requests_mock, +): + """Test that parse_records returns JSON elements when output_format is markdown_json for API processing.""" + json_response = [ + {"type": "Title", "text": "heading", "metadata": {"page_number": 1}}, + {"type": "NarrativeText", "text": "Some text", "metadata": {"page_number": 1}}, + ] + + stream_reader = MagicMock() + mock_open(stream_reader.open_file, read_data=b"fake pdf content") + fake_file = RemoteFile(uri=FILE_URI, last_modified=datetime.now()) + logger = MagicMock() + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_files=False, + output_format="markdown_json", + processing=APIProcessingConfigModel(mode="api", api_key="test"), + ) + mock_detect_filetype.return_value = FileType.PDF + mock_response = MagicMock() + mock_response.json.return_value = json_response + mock_response.status_code = 200 + requests_mock.post.return_value = mock_response + requests_mock.exceptions.RequestException = requests.exceptions.RequestException + + records = list( + UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock()) + ) + assert len(records) == 1 + assert records[0]["document_key"] == FILE_URI + assert records[0]["_ab_source_file_parse_error"] is None + # Verify content is valid JSON matching the API response + content = json.loads(records[0]["content"]) + assert content == json_response + + +@patch("unstructured.partition.pdf.partition_pdf") +@patch("unstructured.partition.pptx.partition_pptx") +@patch("unstructured.partition.docx.partition_docx") +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +def test_parse_records_markdown_json_md_file( + mock_detect_filetype, + mock_partition_docx, + mock_partition_pptx, + mock_partition_pdf, +): + """Test that MD/TXT files return a JSON element array when output_format is markdown_json.""" + stream_reader = MagicMock() + mock_open(stream_reader.open_file, read_data=b"# Hello World\n\nSome text content") + fake_file = RemoteFile(uri="path/to/file.md", last_modified=datetime.now()) + logger = MagicMock() + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_files=False, + output_format="markdown_json", + ) + mock_detect_filetype.return_value = FileType.MD + + records = list( + UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock()) + ) + assert len(records) == 1 + assert records[0]["document_key"] == "path/to/file.md" + assert records[0]["_ab_source_file_parse_error"] is None + # Verify content is valid JSON with a single NarrativeText element + content = json.loads(records[0]["content"]) + assert isinstance(content, list) + assert len(content) == 1 + assert content[0]["type"] == "NarrativeText" + assert content[0]["text"] == "# Hello World\n\nSome text content" + assert content[0]["metadata"] == {} diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index f16d83e20..675aaf1ca 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -459,6 +459,17 @@ }, ], }, + "output_format": { + "title": "Output Format", + "description": "The output format for parsed document content. `markdown_text` renders the document as flat Markdown text. `markdown_json` outputs a JSON array of structured elements with type, text, and metadata fields, preserving document structure for easier downstream processing.", + "default": "markdown_text", + "always_show": True, + "enum": [ + "markdown_text", + "markdown_json", + ], + "type": "string", + }, }, "description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.", "required": ["filetype"],