From 1afcc98dc4ec80acd76e6d0a40257e0437fa0fe0 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:02:53 +0000 Subject: [PATCH 1/6] feat: add markdown_json output format for unstructured document parsing Add a new output_format config option to UnstructuredFormat with two choices: - markdown_text (default, backward-compatible): renders documents as flat Markdown text - markdown_json: outputs a JSON array of structured elements with type, text, and metadata fields This preserves document structure (element types, page numbers, coordinates, section depth) for easier downstream processing. The change is additive and non-breaking. Co-Authored-By: Ryan Waskewich --- .../file_based/config/unstructured_format.py | 8 ++ .../file_types/unstructured_parser.py | 73 +++++++++- .../file_types/test_unstructured_parser.py | 126 ++++++++++++++++++ 3 files changed, 202 insertions(+), 5 deletions(-) diff --git a/airbyte_cdk/sources/file_based/config/unstructured_format.py b/airbyte_cdk/sources/file_based/config/unstructured_format.py index c03540ce6..8bdbc3440 100644 --- a/airbyte_cdk/sources/file_based/config/unstructured_format.py +++ b/airbyte_cdk/sources/file_based/config/unstructured_format.py @@ -100,3 +100,11 @@ class Config(OneOfOptionConfig): discriminator="mode", type="object", ) + + output_format: str = Field( + default="markdown_text", + always_show=True, + title="Output Format", + enum=["markdown_text", "markdown_json"], + description="The output format for parsed document content. `markdown_text` renders the document as flat Markdown text. `markdown_json` outputs a JSON array of structured elements with type, text, and metadata fields, preserving document structure for easier downstream processing.", + ) diff --git a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py index f55675e0a..f04ea132d 100644 --- a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import json import logging import os import traceback @@ -147,10 +148,17 @@ async def infer_schema( self._get_file_type_error_message(filetype), ) + if format.output_format == "markdown_json": + content_description = "Content of the file as a JSON array of structured elements with type, text, and metadata fields. Might be null if the file could not be parsed" + else: + content_description = ( + "Content of the file as markdown. Might be null if the file could not be parsed" + ) + return { "content": { "type": "string", - "description": "Content of the file as markdown. Might be null if the file could not be parsed", + "description": content_description, }, "document_key": { "type": "string", @@ -227,15 +235,18 @@ def _read_file( decoded_content: str = optional_decode(file_content) return decoded_content if format.processing.mode == "local": - return self._read_file_locally( + elements = self._read_file_locally_elements( file_handle, filetype, format.strategy, remote_file, ) + if format.output_format == "markdown_json": + return json.dumps(elements) + return self._render_markdown(elements) elif format.processing.mode == "api": try: - result: str = self._read_file_remotely_with_retries( + elements = self._read_file_remotely_elements_with_retries( file_handle, format.processing, filetype, @@ -253,7 +264,9 @@ def _read_file( e, failure_type=FailureType.config_error ) - return result + if format.output_format == "markdown_json": + return json.dumps(elements) + return self._render_markdown(elements) def _params_to_dict( self, params: Optional[List[APIParameterConfigModel]], strategy: str @@ -323,6 +336,24 @@ def _read_file_remotely_with_retries( """ return self._read_file_remotely(file_handle, format, filetype, strategy, remote_file) + @backoff.on_exception( + backoff.expo, requests.exceptions.RequestException, max_tries=5, giveup=user_error + ) + def _read_file_remotely_elements_with_retries( + self, + file_handle: IOBase, + format: APIProcessingConfigModel, + filetype: FileType, + strategy: str, + remote_file: RemoteFile, + ) -> List[Dict[str, Any]]: + """ + Read a file remotely and return the raw JSON elements, retrying up to 5 times if the error is not caused by user error. + """ + return self._read_file_remotely_elements( + file_handle, format, filetype, strategy, remote_file + ) + def _read_file_remotely( self, file_handle: IOBase, @@ -352,9 +383,41 @@ def _read_file_remotely( return self._render_markdown(json_response) + def _read_file_remotely_elements( + self, + file_handle: IOBase, + format: APIProcessingConfigModel, + filetype: FileType, + strategy: str, + remote_file: RemoteFile, + ) -> List[Dict[str, Any]]: + headers = {"accept": "application/json", "unstructured-api-key": format.api_key} + + data = self._params_to_dict(format.parameters, strategy) + + file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])} + + response = requests.post( + f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data + ) + + if response.status_code == 422: + raise self._create_parse_error(remote_file, response.json()) + else: + response.raise_for_status() + + json_response: List[Dict[str, Any]] = response.json() + return json_response + def _read_file_locally( self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile ) -> str: + elements = self._read_file_locally_elements(file_handle, filetype, strategy, remote_file) + return self._render_markdown(elements) + + def _read_file_locally_elements( + self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile + ) -> List[Dict[str, Any]]: _import_unstructured() if ( (not unstructured_partition_pdf) @@ -385,7 +448,7 @@ def _read_file_locally( except Exception as e: raise self._create_parse_error(remote_file, str(e)) - return self._render_markdown([element.to_dict() for element in elements]) + return [element.to_dict() for element in elements] def _create_parse_error( self, diff --git a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py index 374a02eed..56be05a03 100644 --- a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py +++ b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py @@ -3,6 +3,7 @@ # import asyncio +import json from datetime import datetime from unittest import mock from unittest.mock import MagicMock, call, mock_open, patch @@ -674,3 +675,128 @@ def test_parse_records_remotely( requests_mock.post.assert_has_calls(expected_requests) else: requests_mock.post.assert_not_called() + + +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +def test_infer_schema_markdown_json(mock_detect_filetype): + """Test that infer_schema returns correct description for markdown_json output format.""" + main_loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + mock_detect_filetype.return_value = FileType.PDF + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_files=False, output_format="markdown_json" + ) + schema = loop.run_until_complete( + UnstructuredParser().infer_schema(config, MagicMock(), MagicMock(), MagicMock()) + ) + assert schema == { + "content": { + "type": "string", + "description": "Content of the file as a JSON array of structured elements with type, text, and metadata fields. Might be null if the file could not be parsed", + }, + "document_key": { + "type": "string", + "description": "Unique identifier of the document, e.g. the file path", + }, + "_ab_source_file_parse_error": { + "type": "string", + "description": "Error message if the file could not be parsed even though the file is supported", + }, + } + loop.close() + asyncio.set_event_loop(main_loop) + + +@patch("unstructured.partition.pdf.partition_pdf") +@patch("unstructured.partition.pptx.partition_pptx") +@patch("unstructured.partition.docx.partition_docx") +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +def test_parse_records_markdown_json_local( + mock_detect_filetype, + mock_partition_docx, + mock_partition_pptx, + mock_partition_pdf, +): + """Test that parse_records returns JSON elements when output_format is markdown_json for local processing.""" + elements = [ + Title("heading"), + Text("This is the text"), + ListItem("This is a list item"), + Formula("This is a formula"), + ] + mock_partition_pdf.return_value = elements + + stream_reader = MagicMock() + mock_open(stream_reader.open_file, read_data=b"fake pdf content") + fake_file = RemoteFile(uri=FILE_URI, last_modified=datetime.now()) + logger = MagicMock() + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_files=False, + output_format="markdown_json", + ) + mock_detect_filetype.return_value = FileType.PDF + + records = list( + UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock()) + ) + assert len(records) == 1 + assert records[0]["document_key"] == FILE_URI + assert records[0]["_ab_source_file_parse_error"] is None + # Verify content is valid JSON + content = json.loads(records[0]["content"]) + assert isinstance(content, list) + assert len(content) == 4 + assert content[0]["type"] == "Title" + assert content[0]["text"] == "heading" + assert content[1]["type"] == "NarrativeText" + assert content[1]["text"] == "This is the text" + assert content[2]["type"] == "ListItem" + assert content[2]["text"] == "This is a list item" + assert content[3]["type"] == "Formula" + assert content[3]["text"] == "This is a formula" + + +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.requests") +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +@patch("time.sleep", side_effect=lambda _: None) +def test_parse_records_markdown_json_remote( + time_mock, + mock_detect_filetype, + requests_mock, +): + """Test that parse_records returns JSON elements when output_format is markdown_json for API processing.""" + json_response = [ + {"type": "Title", "text": "heading", "metadata": {"page_number": 1}}, + {"type": "NarrativeText", "text": "Some text", "metadata": {"page_number": 1}}, + ] + + stream_reader = MagicMock() + mock_open(stream_reader.open_file, read_data=b"fake pdf content") + fake_file = RemoteFile(uri=FILE_URI, last_modified=datetime.now()) + logger = MagicMock() + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_file_types=False, + output_format="markdown_json", + processing=APIProcessingConfigModel(mode="api", api_key="test"), + ) + mock_detect_filetype.return_value = FileType.PDF + mock_response = MagicMock() + mock_response.json.return_value = json_response + mock_response.status_code = 200 + requests_mock.post.return_value = mock_response + requests_mock.exceptions.RequestException = requests.exceptions.RequestException + + records = list( + UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock()) + ) + assert len(records) == 1 + assert records[0]["document_key"] == FILE_URI + assert records[0]["_ab_source_file_parse_error"] is None + # Verify content is valid JSON matching the API response + content = json.loads(records[0]["content"]) + assert content == json_response From a8f4e04d9475417e53431269252fb81cae81bb9a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:07:22 +0000 Subject: [PATCH 2/6] fix: wrap MD/TXT content in JSON element when output_format is markdown_json When output_format='markdown_json', MD and TXT files now return a JSON array with a single NarrativeText element instead of raw text, ensuring consistent output format across all file types. Co-Authored-By: Ryan Waskewich --- .../file_types/unstructured_parser.py | 4 ++ .../file_types/test_unstructured_parser.py | 37 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py index f04ea132d..bcfb53d36 100644 --- a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py @@ -233,6 +233,10 @@ def _read_file( if filetype in {FileType.MD, FileType.TXT}: file_content: bytes = file_handle.read() decoded_content: str = optional_decode(file_content) + if format.output_format == "markdown_json": + return json.dumps( + [{"type": "NarrativeText", "text": decoded_content, "metadata": {}}] + ) return decoded_content if format.processing.mode == "local": elements = self._read_file_locally_elements( diff --git a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py index 56be05a03..a6b23d391 100644 --- a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py +++ b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py @@ -800,3 +800,40 @@ def test_parse_records_markdown_json_remote( # Verify content is valid JSON matching the API response content = json.loads(records[0]["content"]) assert content == json_response + + +@patch("unstructured.partition.pdf.partition_pdf") +@patch("unstructured.partition.pptx.partition_pptx") +@patch("unstructured.partition.docx.partition_docx") +@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype") +def test_parse_records_markdown_json_md_file( + mock_detect_filetype, + mock_partition_docx, + mock_partition_pptx, + mock_partition_pdf, +): + """Test that MD/TXT files return a JSON element array when output_format is markdown_json.""" + stream_reader = MagicMock() + mock_open(stream_reader.open_file, read_data=b"# Hello World\n\nSome text content") + fake_file = RemoteFile(uri="path/to/file.md", last_modified=datetime.now()) + logger = MagicMock() + config = MagicMock() + config.format = UnstructuredFormat( + skip_unprocessable_files=False, + output_format="markdown_json", + ) + mock_detect_filetype.return_value = FileType.MD + + records = list( + UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock()) + ) + assert len(records) == 1 + assert records[0]["document_key"] == "path/to/file.md" + assert records[0]["_ab_source_file_parse_error"] is None + # Verify content is valid JSON with a single NarrativeText element + content = json.loads(records[0]["content"]) + assert isinstance(content, list) + assert len(content) == 1 + assert content[0]["type"] == "NarrativeText" + assert content[0]["text"] == "# Hello World\n\nSome text content" + assert content[0]["metadata"] == {} From dc9bb1d88a76da1c92427e6b3a724079475cc5c7 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:12:44 +0000 Subject: [PATCH 3/6] fix: correct skip_unprocessable_file_types typo to skip_unprocessable_files in test Co-Authored-By: Ryan Waskewich --- .../sources/file_based/file_types/test_unstructured_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py index a6b23d391..f892708d1 100644 --- a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py +++ b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py @@ -780,7 +780,7 @@ def test_parse_records_markdown_json_remote( logger = MagicMock() config = MagicMock() config.format = UnstructuredFormat( - skip_unprocessable_file_types=False, + skip_unprocessable_files=False, output_format="markdown_json", processing=APIProcessingConfigModel(mode="api", api_key="test"), ) From e484f050dae016bee465cce61b8531936240ac23 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:23:10 +0000 Subject: [PATCH 4/6] fix: update expected spec to include output_format field in unstructured format Co-Authored-By: Ryan Waskewich --- .../sources/file_based/scenarios/csv_scenarios.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/unit_tests/sources/file_based/scenarios/csv_scenarios.py index f16d83e20..675aaf1ca 100644 --- a/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -459,6 +459,17 @@ }, ], }, + "output_format": { + "title": "Output Format", + "description": "The output format for parsed document content. `markdown_text` renders the document as flat Markdown text. `markdown_json` outputs a JSON array of structured elements with type, text, and metadata fields, preserving document structure for easier downstream processing.", + "default": "markdown_text", + "always_show": True, + "enum": [ + "markdown_text", + "markdown_json", + ], + "type": "string", + }, }, "description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.", "required": ["filetype"], From eecb6c26f6f6458a333798deb988bbfc80522454 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:27:56 +0000 Subject: [PATCH 5/6] fix: move rendering/serialization inside try/except for API path error wrapping Co-Authored-By: Ryan Waskewich --- .../sources/file_based/file_types/unstructured_parser.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py index bcfb53d36..cccf39cd4 100644 --- a/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py +++ b/airbyte_cdk/sources/file_based/file_types/unstructured_parser.py @@ -257,6 +257,9 @@ def _read_file( format.strategy, remote_file, ) + if format.output_format == "markdown_json": + return json.dumps(elements) + return self._render_markdown(elements) except Exception as e: # If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow. # @@ -268,10 +271,6 @@ def _read_file( e, failure_type=FailureType.config_error ) - if format.output_format == "markdown_json": - return json.dumps(elements) - return self._render_markdown(elements) - def _params_to_dict( self, params: Optional[List[APIParameterConfigModel]], strategy: str ) -> Dict[str, Union[str, List[str]]]: From 25bb2d1b8c5d910cf1a97ca8ecd40df06572c852 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Mar 2026 21:37:27 +0000 Subject: [PATCH 6/6] fix: correct element type assertion from NarrativeText to UncategorizedText in test Co-Authored-By: Ryan Waskewich --- .../sources/file_based/file_types/test_unstructured_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py index f892708d1..6b0549275 100644 --- a/unit_tests/sources/file_based/file_types/test_unstructured_parser.py +++ b/unit_tests/sources/file_based/file_types/test_unstructured_parser.py @@ -752,7 +752,7 @@ def test_parse_records_markdown_json_local( assert len(content) == 4 assert content[0]["type"] == "Title" assert content[0]["text"] == "heading" - assert content[1]["type"] == "NarrativeText" + assert content[1]["type"] == "UncategorizedText" assert content[1]["text"] == "This is the text" assert content[2]["type"] == "ListItem" assert content[2]["text"] == "This is a list item"