From 0b54dcc21bc64b3548163b9876d54a28cf16fb43 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Wed, 20 May 2026 16:24:22 -0400 Subject: [PATCH 01/13] feat(PLU-161): flatten_metadata option for databricks_volume_delta_tables Adds an opt-in `flatten_metadata` flag to the Volumes Delta Tables uploader. When set, the stager recursively flattens the metadata dict into top-level `metadata_*` columns (stops at lists, preserves the `metadata_` prefix per the PRD) using the existing `flatten_dict` helper. The uploader skips auto-create and intersects incoming columns with the user-managed table schema, dropping unknowns with a log line. Default is False so existing Workflow DB-serialized configs continue to behave identically. Co-Authored-By: Claude Opus 4.7 (1M context) --- test/unit/connectors/databricks/__init__.py | 0 .../databricks/test_volumes_table.py | 112 ++++++++++++++++++ .../connectors/databricks/volumes_table.py | 57 +++++++-- 3 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 test/unit/connectors/databricks/__init__.py create mode 100644 test/unit/connectors/databricks/test_volumes_table.py diff --git a/test/unit/connectors/databricks/__init__.py b/test/unit/connectors/databricks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py new file mode 100644 index 000000000..716f5f025 --- /dev/null +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -0,0 +1,112 @@ +import json +from pathlib import Path + +import pytest + +from unstructured_ingest.data_types.file_data import FileData, SourceIdentifiers +from unstructured_ingest.processes.connectors.databricks.volumes_table import ( + DatabricksVolumeDeltaTableStager, + DatabricksVolumeDeltaTableStagerConfig, + DatabricksVolumeDeltaTableUploaderConfig, +) + + +def _file_data() -> FileData: + return FileData( + identifier="doc-1", + connector_type="databricks_volume_delta_tables", + source_identifiers=SourceIdentifiers( + filename="example.pdf", + fullpath="s3://bucket/example.pdf", + ), + ) + + +def _write_elements(path: Path, elements: list[dict]) -> Path: + path.write_text(json.dumps(elements)) + return path + + +def _run_stager(tmp_path: Path, elements: list[dict], flatten_metadata: bool) -> list[dict]: + elements_in = _write_elements(tmp_path / "elements.json", elements) + stager = DatabricksVolumeDeltaTableStager( + upload_stager_config=DatabricksVolumeDeltaTableStagerConfig( + flatten_metadata=flatten_metadata + ) + ) + out_path = stager.run( + elements_filepath=elements_in, + output_dir=tmp_path / "out", + output_filename="elements.json", + file_data=_file_data(), + ) + return json.loads(Path(out_path).read_text()) + + +def _baseline_metadata() -> dict: + return { + "filename": "example.pdf", + "filetype": "application/pdf", + "page_number": 1, + "languages": ["eng"], + "data_source": { + "url": "s3://bucket/example.pdf", + "version": "abc123", + "record_locator": {"protocol": "s3", "remote_file_path": "s3://bucket/"}, + }, + } + + +def test_stager_blob_mode_is_default(tmp_path: Path): + elements = [{"element_id": "el-1", "text": "hello", "metadata": _baseline_metadata()}] + [row] = _run_stager(tmp_path, elements, flatten_metadata=False) + + assert "metadata" in row + assert isinstance(row["metadata"], str) + assert json.loads(row["metadata"]) == _baseline_metadata() + assert not any(k.startswith("metadata_") for k in row) + + +def test_stager_flatten_preserves_metadata_prefix(tmp_path: Path): + elements = [{"element_id": "el-1", "text": "hello", "metadata": _baseline_metadata()}] + [row] = _run_stager(tmp_path, elements, flatten_metadata=True) + + assert "metadata" not in row + assert row["metadata_filename"] == "example.pdf" + assert row["metadata_filetype"] == "application/pdf" + assert row["metadata_page_number"] == 1 + assert row["metadata_data_source_url"] == "s3://bucket/example.pdf" + assert row["metadata_data_source_version"] == "abc123" + assert row["metadata_data_source_record_locator_protocol"] == "s3" + assert row["metadata_data_source_record_locator_remote_file_path"] == "s3://bucket/" + + +def test_stager_flatten_stops_at_lists(tmp_path: Path): + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": { + "languages": ["eng", "fra"], + "sent_to": ["a@example.com", "b@example.com"], + }, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=True) + + assert row["metadata_languages"] == ["eng", "fra"] + assert row["metadata_sent_to"] == ["a@example.com", "b@example.com"] + assert "metadata_languages_0" not in row + assert "metadata_sent_to_0" not in row + + +@pytest.mark.parametrize( + "config_cls", + [DatabricksVolumeDeltaTableUploaderConfig, DatabricksVolumeDeltaTableStagerConfig], +) +def test_flatten_metadata_defaults_false_for_workflow_db_backcompat(config_cls): + """Old configs persisted before PLU-161 have no `flatten_metadata` field. Deserialization + must produce flatten_metadata=False so existing connectors are byte-identical.""" + kwargs = {"catalog": "c", "volume": "v"} if "Uploader" in config_cls.__name__ else {} + config = config_cls.model_validate(kwargs) + assert config.flatten_metadata is False diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index 47cdc3094..4ac0468a6 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -22,10 +22,14 @@ from unstructured_ingest.processes.connectors.databricks.volumes import DatabricksPathMixin from unstructured_ingest.processes.connectors.sql.databricks_delta_tables import ( DatabricksDeltaTablesConnectionConfig, - DatabricksDeltaTablesUploadStagerConfig, ) from unstructured_ingest.utils.constants import RECORD_ID_LABEL -from unstructured_ingest.utils.data_prep import get_enhanced_element_id, get_json_data, write_data +from unstructured_ingest.utils.data_prep import ( + flatten_dict, + get_enhanced_element_id, + get_json_data, + write_data, +) from unstructured_ingest.utils.databricks import quote_identifier CONNECTOR_TYPE = "databricks_volume_delta_tables" @@ -34,13 +38,22 @@ pass +FLATTEN_METADATA_DESCRIPTION = ( + "If true, recursively flatten the element metadata into top-level columns " + "(stops at lists, preserves the `metadata_` prefix). The destination table " + "must already exist with the desired schema — the connector will not " + "auto-create. Unknown incoming fields are dropped; missing columns stay null." +) + + class DatabricksVolumeDeltaTableUploaderConfig(UploaderConfig, DatabricksPathMixin): database: str = Field(description="Database name", default="default") table_name: Optional[str] = Field(description="Table name", default=None) + flatten_metadata: bool = Field(default=False, description=FLATTEN_METADATA_DESCRIPTION) class DatabricksVolumeDeltaTableStagerConfig(UploadStagerConfig): - pass + flatten_metadata: bool = Field(default=False, description=FLATTEN_METADATA_DESCRIPTION) @dataclass @@ -63,10 +76,22 @@ def run( output_path = output_dir / output_filename final_output_path = output_path.with_suffix(".json") data = get_json_data(path=elements_filepath) + flatten = self.upload_stager_config.flatten_metadata for element in data: element["id"] = get_enhanced_element_id(element_dict=element, file_data=file_data) element[RECORD_ID_LABEL] = file_data.identifier - element["metadata"] = json.dumps(element.get("metadata", {})) + metadata = element.pop("metadata", {}) + if flatten: + element.update( + flatten_dict( + metadata, + parent_key="metadata", + separator="_", + flatten_lists=False, + ) + ) + else: + element["metadata"] = json.dumps(metadata) write_data(path=final_output_path, data=data, indent=None) return final_output_path @@ -93,6 +118,12 @@ def create_destination( table_names = [r[1] for r in cursor.fetchall()] if table_name in table_names: return False + if self.upload_config.flatten_metadata: + raise ValueError( + f"Table {table_name!r} does not exist. With flatten_metadata=true, " + "the destination table must be pre-created — auto-create is disabled " + "to prevent silent data loss." + ) with collection_config_file.open() as schema_file: data_lines = schema_file.readlines() data_lines[0] = data_lines[0].replace("elements", table_name) @@ -173,8 +204,20 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: f"migrating content from {catalog_path} to table {self.upload_config.table_name}" ) data = get_json_data(path=path) - columns = data[0].keys() - select_columns = ["PARSE_JSON(metadata)" if c == "metadata" else c for c in columns] + json_columns = list(data[0].keys()) + if self.upload_config.flatten_metadata: + table_columns = set(self.get_table_columns().keys()) + columns = [c for c in json_columns if c in table_columns] + dropped = [c for c in json_columns if c not in table_columns] + if dropped: + logger.info( + "Following columns from incoming data will be dropped to match " + f"the table's schema: {', '.join(sorted(dropped))}" + ) + select_columns = list(columns) + else: + columns = json_columns + select_columns = ["PARSE_JSON(metadata)" if c == "metadata" else c for c in columns] column_str = ", ".join(columns) select_column_str = ", ".join(select_columns) sql_statment = f"INSERT INTO `{self.upload_config.table_name}` ({column_str}) SELECT {select_column_str} FROM json.`{catalog_path}`" # noqa: E501 @@ -186,5 +229,5 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None: uploader=DatabricksVolumeDeltaTableUploader, uploader_config=DatabricksVolumeDeltaTableUploaderConfig, upload_stager=DatabricksVolumeDeltaTableStager, - upload_stager_config=DatabricksDeltaTablesUploadStagerConfig, + upload_stager_config=DatabricksVolumeDeltaTableStagerConfig, ) From 0253ba608f73c9d43bd16106217f46a4ededb7e9 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Wed, 20 May 2026 22:44:00 -0400 Subject: [PATCH 02/13] feat(PLU-161): coerce datetime fields on the flatten path Stringified unix epochs (e.g. metadata.data_source.date_processed = "1779329564.5102773") are rejected by Databricks's implicit string -> TIMESTAMP cast with CAST_INVALID_INPUT. Run flattened datetime fields through the SQL connector's existing parse_date_string and emit ISO format so TIMESTAMP columns coerce natively. Malformed values pass through unchanged so the table can reject them, matching the non-flatten path's semantics. The known datetime field set is imported from sql.sql._DATE_COLUMNS so this stays in sync with the rest of ingest as new datetime fields land. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../databricks/test_volumes_table.py | 61 +++++++++++++++++++ .../connectors/databricks/volumes_table.py | 30 ++++++++- 2 files changed, 90 insertions(+), 1 deletion(-) diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index 716f5f025..e167123e3 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -100,6 +100,67 @@ def test_stager_flatten_stops_at_lists(tmp_path: Path): assert "metadata_sent_to_0" not in row +def test_stager_flatten_coerces_epoch_string_datetime(tmp_path: Path): + """Stringified unix epochs in known datetime fields get coerced to ISO format so + Databricks's implicit string -> TIMESTAMP cast succeeds (PLU-161 case I).""" + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": { + "data_source": { + "date_created": "1779329000.0", + "date_modified": "1779329500.0", + "date_processed": "1779329564.5102773", + }, + }, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=True) + + for key in ( + "metadata_data_source_date_created", + "metadata_data_source_date_modified", + "metadata_data_source_date_processed", + ): + assert isinstance(row[key], str) + assert row[key].startswith("20"), f"{key} not in ISO format: {row[key]!r}" + # confirm it round-trips through datetime.fromisoformat + from datetime import datetime as _dt + + _dt.fromisoformat(row[key]) + + +def test_stager_flatten_leaves_malformed_datetime_alone(tmp_path: Path): + """Unparseable datetime values pass through unchanged so the destination table + can reject them — matches the non-flatten path's behavior.""" + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": {"data_source": {"date_created": "not-a-date"}}, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=True) + assert row["metadata_data_source_date_created"] == "not-a-date" + + +def test_stager_blob_mode_does_not_coerce_datetime(tmp_path: Path): + """The non-flatten path is unchanged — datetime fields stay as strings inside + the JSON blob, matching today's behavior.""" + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": {"data_source": {"date_processed": "1779329564.5102773"}}, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=False) + assert "metadata" in row + decoded = json.loads(row["metadata"]) + assert decoded["data_source"]["date_processed"] == "1779329564.5102773" + + @pytest.mark.parametrize( "config_cls", [DatabricksVolumeDeltaTableUploaderConfig, DatabricksVolumeDeltaTableStagerConfig], diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index 4ac0468a6..0a1b0047a 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -1,6 +1,6 @@ import json import os -from contextlib import contextmanager +from contextlib import contextmanager, suppress from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any, Generator, Optional @@ -23,6 +23,13 @@ from unstructured_ingest.processes.connectors.sql.databricks_delta_tables import ( DatabricksDeltaTablesConnectionConfig, ) + +# Reuse the SQL connector's canonical datetime field set + parser so the flatten +# path doesn't drift from the rest of ingest when new datetime fields land. +from unstructured_ingest.processes.connectors.sql.sql import ( + _DATE_COLUMNS, + parse_date_string, +) from unstructured_ingest.utils.constants import RECORD_ID_LABEL from unstructured_ingest.utils.data_prep import ( flatten_dict, @@ -56,6 +63,26 @@ class DatabricksVolumeDeltaTableStagerConfig(UploadStagerConfig): flatten_metadata: bool = Field(default=False, description=FLATTEN_METADATA_DESCRIPTION) +def _coerce_flattened_datetimes(row: dict[str, Any]) -> None: + """Convert stringified-epoch values on known datetime keys to ISO format in-place. + + After flattening, datetime metadata fields (e.g. `metadata_data_source_date_processed`) + arrive as stringified unix epochs like `"1779329564.5102773"`. Databricks's implicit + string → TIMESTAMP cast rejects those (`CAST_INVALID_INPUT`). Run them through the + same `parse_date_string` the SQL connector uses, then emit ISO format which + Databricks coerces natively. Malformed values pass through unchanged — same as the + non-flatten path. + """ + suffixes = tuple(f"_{col}" for col in _DATE_COLUMNS) + for key, value in list(row.items()): + if value is None or not isinstance(value, (str, int)): + continue + if not key.endswith(suffixes): + continue + with suppress(Exception): + row[key] = parse_date_string(value).isoformat(sep=" ") + + @dataclass class DatabricksVolumeDeltaTableStager(UploadStager): upload_stager_config: DatabricksVolumeDeltaTableStagerConfig = field( @@ -90,6 +117,7 @@ def run( flatten_lists=False, ) ) + _coerce_flattened_datetimes(element) else: element["metadata"] = json.dumps(metadata) write_data(path=final_output_path, data=data, indent=None) From db98886b681c28d4130a3841884a7cf763255f76 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Wed, 20 May 2026 22:47:50 -0400 Subject: [PATCH 03/13] chore(PLU-161): changelog + bump to 1.7.0 Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 6 ++++++ unstructured_ingest/__version__.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a96fd9d97..f23bdb2eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## [1.7.0] + +### Enhancements + +- **feat(databricks): add `flatten_metadata` option to the Volumes Delta Tables uploader.** Opt-in, default off, so existing connectors are byte-identical. When set, the stager recursively flattens the element metadata into top-level `metadata_*` columns (stops at lists, preserves the `metadata_` prefix) using the existing `flatten_dict` helper, and the uploader skips auto-create and intersects incoming columns with the user-managed table schema, dropping unknowns with a log line. Known datetime fields (`*_date_created`, `*_date_modified`, `*_date_processed`, `*_last_modified`) are coerced from stringified epoch to ISO format so Databricks's implicit string → `TIMESTAMP` cast succeeds; the field set is imported from the SQL connector so this stays in sync as new datetime fields land. + ## [1.6.2] ### Fixes diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 4e4d3296f..1042344ec 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "1.6.2" # pragma: no cover +__version__ = "1.7.0" # pragma: no cover From 69060f78a21245f171ea561211c8664563aa606f Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Wed, 20 May 2026 22:50:41 -0400 Subject: [PATCH 04/13] chore(PLU-161): drop point-in-time comments from volumes_table Co-Authored-By: Claude Opus 4.7 (1M context) --- .../connectors/databricks/volumes_table.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index 0a1b0047a..d14eca903 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -23,9 +23,6 @@ from unstructured_ingest.processes.connectors.sql.databricks_delta_tables import ( DatabricksDeltaTablesConnectionConfig, ) - -# Reuse the SQL connector's canonical datetime field set + parser so the flatten -# path doesn't drift from the rest of ingest when new datetime fields land. from unstructured_ingest.processes.connectors.sql.sql import ( _DATE_COLUMNS, parse_date_string, @@ -64,14 +61,13 @@ class DatabricksVolumeDeltaTableStagerConfig(UploadStagerConfig): def _coerce_flattened_datetimes(row: dict[str, Any]) -> None: - """Convert stringified-epoch values on known datetime keys to ISO format in-place. + """Convert stringified-epoch datetime fields to ISO format in-place. - After flattening, datetime metadata fields (e.g. `metadata_data_source_date_processed`) - arrive as stringified unix epochs like `"1779329564.5102773"`. Databricks's implicit - string → TIMESTAMP cast rejects those (`CAST_INVALID_INPUT`). Run them through the - same `parse_date_string` the SQL connector uses, then emit ISO format which - Databricks coerces natively. Malformed values pass through unchanged — same as the - non-flatten path. + Element metadata datetime fields (e.g. `*_date_processed`) arrive as stringified + unix epochs like `"1779329564.5102773"`, which Databricks's implicit string → + `TIMESTAMP` cast rejects with `CAST_INVALID_INPUT`. Emit ISO format instead so + `TIMESTAMP` columns coerce natively. Unparseable values are left alone for the + table to reject. """ suffixes = tuple(f"_{col}" for col in _DATE_COLUMNS) for key, value in list(row.items()): From dd3dec52477784f464bdaa42e8e00aabd0c195a8 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 21 May 2026 11:33:20 -0400 Subject: [PATCH 05/13] refactor(PLU-161): drop metadata_ prefix from flattened columns Team decided to match Milvus's flatten output for consistency: column names are the metadata key path joined by `_`, with no `metadata_` prefix. Also widens the datetime coercion match so top-level keys like `date_processed` (no leading underscore once unprefixed) still get parsed. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 2 +- .../databricks/test_volumes_table.py | 42 +++++++++++-------- .../connectors/databricks/volumes_table.py | 19 ++++----- 3 files changed, 34 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f23bdb2eb..1286dba84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### Enhancements -- **feat(databricks): add `flatten_metadata` option to the Volumes Delta Tables uploader.** Opt-in, default off, so existing connectors are byte-identical. When set, the stager recursively flattens the element metadata into top-level `metadata_*` columns (stops at lists, preserves the `metadata_` prefix) using the existing `flatten_dict` helper, and the uploader skips auto-create and intersects incoming columns with the user-managed table schema, dropping unknowns with a log line. Known datetime fields (`*_date_created`, `*_date_modified`, `*_date_processed`, `*_last_modified`) are coerced from stringified epoch to ISO format so Databricks's implicit string → `TIMESTAMP` cast succeeds; the field set is imported from the SQL connector so this stays in sync as new datetime fields land. +- **feat(databricks): add `flatten_metadata` option to the Volumes Delta Tables uploader.** Opt-in, default off, so existing connectors are byte-identical. When set, the stager recursively flattens the element metadata into top-level columns (stops at lists) using the existing `flatten_dict` helper — column names match the metadata key path joined by `_` with no `metadata_` prefix, consistent with Milvus. The uploader skips auto-create and intersects incoming columns with the user-managed table schema, dropping unknowns with a log line. Known datetime fields (`date_created`, `date_modified`, `date_processed`, `last_modified` and any `*_` nested variants) are coerced from stringified epoch to ISO format so Databricks's implicit string → `TIMESTAMP` cast succeeds; the field set is imported from the SQL connector so this stays in sync as new datetime fields land. ## [1.6.2] diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index e167123e3..be536d645 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -64,21 +64,21 @@ def test_stager_blob_mode_is_default(tmp_path: Path): assert "metadata" in row assert isinstance(row["metadata"], str) assert json.loads(row["metadata"]) == _baseline_metadata() - assert not any(k.startswith("metadata_") for k in row) -def test_stager_flatten_preserves_metadata_prefix(tmp_path: Path): +def test_stager_flatten_drops_metadata_prefix(tmp_path: Path): elements = [{"element_id": "el-1", "text": "hello", "metadata": _baseline_metadata()}] [row] = _run_stager(tmp_path, elements, flatten_metadata=True) assert "metadata" not in row - assert row["metadata_filename"] == "example.pdf" - assert row["metadata_filetype"] == "application/pdf" - assert row["metadata_page_number"] == 1 - assert row["metadata_data_source_url"] == "s3://bucket/example.pdf" - assert row["metadata_data_source_version"] == "abc123" - assert row["metadata_data_source_record_locator_protocol"] == "s3" - assert row["metadata_data_source_record_locator_remote_file_path"] == "s3://bucket/" + assert row["filename"] == "example.pdf" + assert row["filetype"] == "application/pdf" + assert row["page_number"] == 1 + assert row["data_source_url"] == "s3://bucket/example.pdf" + assert row["data_source_version"] == "abc123" + assert row["data_source_record_locator_protocol"] == "s3" + assert row["data_source_record_locator_remote_file_path"] == "s3://bucket/" + assert not any(k.startswith("metadata_") for k in row) def test_stager_flatten_stops_at_lists(tmp_path: Path): @@ -94,20 +94,25 @@ def test_stager_flatten_stops_at_lists(tmp_path: Path): ] [row] = _run_stager(tmp_path, elements, flatten_metadata=True) - assert row["metadata_languages"] == ["eng", "fra"] - assert row["metadata_sent_to"] == ["a@example.com", "b@example.com"] - assert "metadata_languages_0" not in row - assert "metadata_sent_to_0" not in row + assert row["languages"] == ["eng", "fra"] + assert row["sent_to"] == ["a@example.com", "b@example.com"] + assert "languages_0" not in row + assert "sent_to_0" not in row def test_stager_flatten_coerces_epoch_string_datetime(tmp_path: Path): """Stringified unix epochs in known datetime fields get coerced to ISO format so - Databricks's implicit string -> TIMESTAMP cast succeeds (PLU-161 case I).""" + Databricks's implicit string -> TIMESTAMP cast succeeds (PLU-161 case I). + + Covers both top-level (`date_processed` injected by the pipeline at element root) + and nested (`data_source_*`) datetime keys. + """ elements = [ { "element_id": "el-1", "text": "hello", "metadata": { + "date_processed": "1779329600.0", "data_source": { "date_created": "1779329000.0", "date_modified": "1779329500.0", @@ -119,9 +124,10 @@ def test_stager_flatten_coerces_epoch_string_datetime(tmp_path: Path): [row] = _run_stager(tmp_path, elements, flatten_metadata=True) for key in ( - "metadata_data_source_date_created", - "metadata_data_source_date_modified", - "metadata_data_source_date_processed", + "date_processed", + "data_source_date_created", + "data_source_date_modified", + "data_source_date_processed", ): assert isinstance(row[key], str) assert row[key].startswith("20"), f"{key} not in ISO format: {row[key]!r}" @@ -142,7 +148,7 @@ def test_stager_flatten_leaves_malformed_datetime_alone(tmp_path: Path): } ] [row] = _run_stager(tmp_path, elements, flatten_metadata=True) - assert row["metadata_data_source_date_created"] == "not-a-date" + assert row["data_source_date_created"] == "not-a-date" def test_stager_blob_mode_does_not_coerce_datetime(tmp_path: Path): diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index d14eca903..74ce523d7 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -44,9 +44,9 @@ FLATTEN_METADATA_DESCRIPTION = ( "If true, recursively flatten the element metadata into top-level columns " - "(stops at lists, preserves the `metadata_` prefix). The destination table " - "must already exist with the desired schema — the connector will not " - "auto-create. Unknown incoming fields are dropped; missing columns stay null." + "(stops at lists). The destination table must already exist with the " + "desired schema — the connector will not auto-create. Unknown incoming " + "fields are dropped; missing columns stay null." ) @@ -63,17 +63,17 @@ class DatabricksVolumeDeltaTableStagerConfig(UploadStagerConfig): def _coerce_flattened_datetimes(row: dict[str, Any]) -> None: """Convert stringified-epoch datetime fields to ISO format in-place. - Element metadata datetime fields (e.g. `*_date_processed`) arrive as stringified - unix epochs like `"1779329564.5102773"`, which Databricks's implicit string → - `TIMESTAMP` cast rejects with `CAST_INVALID_INPUT`. Emit ISO format instead so - `TIMESTAMP` columns coerce natively. Unparseable values are left alone for the - table to reject. + Element metadata datetime fields (e.g. `date_processed`, `data_source_date_processed`) + arrive as stringified unix epochs like `"1779329564.5102773"`, which Databricks's + implicit string → `TIMESTAMP` cast rejects with `CAST_INVALID_INPUT`. Emit ISO + format instead so `TIMESTAMP` columns coerce natively. Unparseable values are + left alone for the table to reject. """ suffixes = tuple(f"_{col}" for col in _DATE_COLUMNS) for key, value in list(row.items()): if value is None or not isinstance(value, (str, int)): continue - if not key.endswith(suffixes): + if key not in _DATE_COLUMNS and not key.endswith(suffixes): continue with suppress(Exception): row[key] = parse_date_string(value).isoformat(sep=" ") @@ -108,7 +108,6 @@ def run( element.update( flatten_dict( metadata, - parent_key="metadata", separator="_", flatten_lists=False, ) From 8141097fdcf4c8b641ad55b3989a137de157b677 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 21 May 2026 11:43:15 -0400 Subject: [PATCH 06/13] chore(PLU-161): tighten flatten_metadata description and changelog Match the terseness of neighboring Field descriptions (AstraDB, Milvus) and the 1.5.x changelog entries. Drops the FLATTEN_METADATA_DESCRIPTION constant since the inlined string is short enough at both call sites. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 2 +- .../connectors/databricks/volumes_table.py | 24 +++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1286dba84..577122d8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### Enhancements -- **feat(databricks): add `flatten_metadata` option to the Volumes Delta Tables uploader.** Opt-in, default off, so existing connectors are byte-identical. When set, the stager recursively flattens the element metadata into top-level columns (stops at lists) using the existing `flatten_dict` helper — column names match the metadata key path joined by `_` with no `metadata_` prefix, consistent with Milvus. The uploader skips auto-create and intersects incoming columns with the user-managed table schema, dropping unknowns with a log line. Known datetime fields (`date_created`, `date_modified`, `date_processed`, `last_modified` and any `*_` nested variants) are coerced from stringified epoch to ISO format so Databricks's implicit string → `TIMESTAMP` cast succeeds; the field set is imported from the SQL connector so this stays in sync as new datetime fields land. +- **feat(databricks): add `flatten_metadata` option to the Volumes Delta Tables uploader.** Opt-in, default off. When set, the stager flattens element metadata into top-level columns matching Milvus's unprefixed naming, and the uploader skips auto-create against the user-managed table, dropping unknown incoming columns with a log line. Stringified-epoch datetime fields are coerced to ISO so Databricks's implicit `TIMESTAMP` cast accepts them. ## [1.6.2] diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index 74ce523d7..300c37214 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -42,22 +42,26 @@ pass -FLATTEN_METADATA_DESCRIPTION = ( - "If true, recursively flatten the element metadata into top-level columns " - "(stops at lists). The destination table must already exist with the " - "desired schema — the connector will not auto-create. Unknown incoming " - "fields are dropped; missing columns stay null." -) - - class DatabricksVolumeDeltaTableUploaderConfig(UploaderConfig, DatabricksPathMixin): database: str = Field(description="Database name", default="default") table_name: Optional[str] = Field(description="Table name", default=None) - flatten_metadata: bool = Field(default=False, description=FLATTEN_METADATA_DESCRIPTION) + flatten_metadata: bool = Field( + default=False, + description=( + "Flatten metadata into top-level columns. Destination table must already " + "exist (no auto-create); unknown incoming fields are dropped." + ), + ) class DatabricksVolumeDeltaTableStagerConfig(UploadStagerConfig): - flatten_metadata: bool = Field(default=False, description=FLATTEN_METADATA_DESCRIPTION) + flatten_metadata: bool = Field( + default=False, + description=( + "Flatten metadata into top-level columns. Destination table must already " + "exist (no auto-create); unknown incoming fields are dropped." + ), + ) def _coerce_flattened_datetimes(row: dict[str, Any]) -> None: From df268c1cfa0eb7dd656d3a12b407b1d1385034fc Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 21 May 2026 11:48:16 -0400 Subject: [PATCH 07/13] fix(PLU-161): accept float epoch values in datetime coercion Cubic flagged that the isinstance check excluded `float`, so a JSON-parsed unquoted epoch would silently bypass coercion. Today the upstream emits stringified epochs, but tightening the type tuple is cheap and matches what `parse_date_string` already accepts. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../connectors/databricks/test_volumes_table.py | 15 +++++++++++++++ .../connectors/databricks/volumes_table.py | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index be536d645..af595c3d3 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -137,6 +137,21 @@ def test_stager_flatten_coerces_epoch_string_datetime(tmp_path: Path): _dt.fromisoformat(row[key]) +def test_stager_flatten_coerces_float_epoch_datetime(tmp_path: Path): + """JSON-parsed numeric epochs (no quotes) arrive as Python floats; coercion + must still pick them up.""" + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": {"data_source": {"date_processed": 1779329564.5102773}}, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=True) + assert isinstance(row["data_source_date_processed"], str) + assert row["data_source_date_processed"].startswith("20") + + def test_stager_flatten_leaves_malformed_datetime_alone(tmp_path: Path): """Unparseable datetime values pass through unchanged so the destination table can reject them — matches the non-flatten path's behavior.""" diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index 300c37214..ec64f6d40 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -75,7 +75,7 @@ def _coerce_flattened_datetimes(row: dict[str, Any]) -> None: """ suffixes = tuple(f"_{col}" for col in _DATE_COLUMNS) for key, value in list(row.items()): - if value is None or not isinstance(value, (str, int)): + if value is None or not isinstance(value, (str, int, float)): continue if key not in _DATE_COLUMNS and not key.endswith(suffixes): continue From d68c20f292d65ee30c91d60912db190fa912466c Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 21 May 2026 11:56:45 -0400 Subject: [PATCH 08/13] test(PLU-161): cover uploader create_destination + run branches Six new uploader unit tests, mirroring the mock_cursor / mock_get_cursor pattern from test_databricks_delta_tables.py: - create_destination raises IngestValueError when flatten=true and the table is missing (PLU-161 case E) - create_destination still auto-creates when flatten=false and the table is missing (case C, backcompat) - create_destination is a no-op when the table already exists, under either flag value - run() with flatten=true emits raw column names in SELECT (no PARSE_JSON, no `metadata` column) - run() with flatten=false still wraps `metadata` with PARSE_JSON (regression guard) - run() with flatten=true drops unknown incoming columns and surfaces them in a single info log line Co-Authored-By: Claude Opus 4.7 (1M context) --- .../databricks/test_volumes_table.py | 185 ++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index af595c3d3..45c63a183 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -1,14 +1,23 @@ import json from pathlib import Path +from unittest.mock import MagicMock import pytest +from pydantic import Secret +from pytest_mock import MockerFixture from unstructured_ingest.data_types.file_data import FileData, SourceIdentifiers +from unstructured_ingest.error import ValueError as IngestValueError from unstructured_ingest.processes.connectors.databricks.volumes_table import ( DatabricksVolumeDeltaTableStager, DatabricksVolumeDeltaTableStagerConfig, + DatabricksVolumeDeltaTableUploader, DatabricksVolumeDeltaTableUploaderConfig, ) +from unstructured_ingest.processes.connectors.sql.databricks_delta_tables import ( + DatabricksDeltaTablesAccessConfig, + DatabricksDeltaTablesConnectionConfig, +) def _file_data() -> FileData: @@ -192,3 +201,179 @@ def test_flatten_metadata_defaults_false_for_workflow_db_backcompat(config_cls): kwargs = {"catalog": "c", "volume": "v"} if "Uploader" in config_cls.__name__ else {} config = config_cls.model_validate(kwargs) assert config.flatten_metadata is False + + +def _make_uploader( + flatten_metadata: bool, table_name: str = "elements" +) -> DatabricksVolumeDeltaTableUploader: + return DatabricksVolumeDeltaTableUploader( + connection_config=DatabricksDeltaTablesConnectionConfig( + access_config=Secret(DatabricksDeltaTablesAccessConfig(token="tok")), + server_hostname="example.databricks.com", + http_path="/sql/1.0/warehouses/xxx", + ), + upload_config=DatabricksVolumeDeltaTableUploaderConfig( + catalog="cat", + databricks_schema="sch", + volume="vol", + database="db", + table_name=table_name, + flatten_metadata=flatten_metadata, + ), + ) + + +@pytest.fixture +def mock_cursor(mocker: MockerFixture) -> MagicMock: + return mocker.MagicMock() + + +@pytest.fixture +def mock_get_cursor(mocker: MockerFixture, mock_cursor: MagicMock) -> MagicMock: + mock = mocker.patch( + "unstructured_ingest.processes.connectors.sql.databricks_delta_tables" + ".DatabricksDeltaTablesConnectionConfig.get_cursor", + autospec=True, + ) + mock.return_value.__enter__.return_value = mock_cursor + return mock + + +def _executed_sql(mock_cursor: MagicMock) -> list[str]: + return [c.args[0] for c in mock_cursor.execute.call_args_list] + + +def _insert_sql(mock_cursor: MagicMock) -> str: + return next(sql for sql in _executed_sql(mock_cursor) if sql.startswith("INSERT")) + + +def test_create_destination_flatten_true_missing_table_raises( + mock_cursor: MagicMock, mock_get_cursor: MagicMock +): + """PLU-161 case E: with flatten_metadata=true, missing table is a hard fail — + auto-create is disabled to prevent silent data loss.""" + uploader = _make_uploader(flatten_metadata=True, table_name="missing_table") + mock_cursor.fetchall.return_value = [] # SHOW TABLES → no rows + + with pytest.raises(IngestValueError, match="auto-create is disabled"): + uploader.create_destination() + + assert not any("CREATE TABLE" in sql for sql in _executed_sql(mock_cursor)) + + +def test_create_destination_flatten_false_missing_table_autocreates( + mock_cursor: MagicMock, mock_get_cursor: MagicMock +): + """PLU-161 case C: backcompat — flatten=false still auto-creates from the + asset schema when the destination table is missing.""" + uploader = _make_uploader(flatten_metadata=False, table_name="new_table") + mock_cursor.fetchall.return_value = [] # SHOW TABLES → no rows + + assert uploader.create_destination() is True + + create_sqls = [sql for sql in _executed_sql(mock_cursor) if "CREATE TABLE" in sql] + assert len(create_sqls) == 1 + assert "new_table" in create_sqls[0] + + +@pytest.mark.parametrize("flatten_metadata", [True, False]) +def test_create_destination_existing_table_returns_false( + mock_cursor: MagicMock, mock_get_cursor: MagicMock, flatten_metadata: bool +): + """When the table already exists, create_destination is a no-op regardless of + the flag — neither branch should attempt a CREATE TABLE.""" + uploader = _make_uploader(flatten_metadata=flatten_metadata, table_name="elements") + # `SHOW TABLES` rows are `(database, tableName, isTemporary)`; r[1] is the name. + mock_cursor.fetchall.return_value = [("sch", "elements", False)] + + assert uploader.create_destination() is False + assert not any("CREATE TABLE" in sql for sql in _executed_sql(mock_cursor)) + + +def _staged_elements(tmp_path: Path, row: dict) -> Path: + path = tmp_path / "staged.json" + path.write_text(json.dumps([row])) + return path + + +def test_run_flatten_true_select_uses_raw_columns( + tmp_path: Path, mock_cursor: MagicMock, mock_get_cursor: MagicMock +): + """Under flatten_metadata=true the SELECT clause references columns directly — + no PARSE_JSON wrapping, and no `metadata` column.""" + uploader = _make_uploader(flatten_metadata=True) + # Pre-populate the columns cache: same keys as incoming, no record_id → can_delete=false + uploader._columns = { + "element_id": "string", + "text": "string", + "filename": "string", + "data_source_url": "string", + } + path = _staged_elements( + tmp_path, + { + "element_id": "el-1", + "text": "hello", + "filename": "x.pdf", + "data_source_url": "s3://bucket/x.pdf", + }, + ) + + uploader.run(path=path, file_data=_file_data()) + + insert_sql = _insert_sql(mock_cursor) + assert "PARSE_JSON" not in insert_sql + assert "metadata" not in insert_sql + for col in ("element_id", "text", "filename", "data_source_url"): + assert col in insert_sql + + +def test_run_flatten_false_select_wraps_metadata_in_parse_json( + tmp_path: Path, mock_cursor: MagicMock, mock_get_cursor: MagicMock +): + """Regression guard for the today-default path: flatten_metadata=false must + still wrap the `metadata` column with PARSE_JSON so the VARIANT cast works.""" + uploader = _make_uploader(flatten_metadata=False) + uploader._columns = {"element_id": "string", "text": "string", "metadata": "variant"} + path = _staged_elements( + tmp_path, + {"element_id": "el-1", "text": "hello", "metadata": json.dumps({"filename": "x.pdf"})}, + ) + + uploader.run(path=path, file_data=_file_data()) + + insert_sql = _insert_sql(mock_cursor) + assert "PARSE_JSON(metadata)" in insert_sql + + +def test_run_flatten_true_drops_unknown_columns_and_logs( + tmp_path: Path, + mock_cursor: MagicMock, + mock_get_cursor: MagicMock, + caplog: pytest.LogCaptureFixture, +): + """Under flatten_metadata=true, incoming columns not present in the destination + table are dropped from the INSERT and surfaced in a single info log line.""" + uploader = _make_uploader(flatten_metadata=True) + uploader._columns = {"element_id": "string", "text": "string", "filename": "string"} + path = _staged_elements( + tmp_path, + { + "element_id": "el-1", + "text": "hello", + "filename": "x.pdf", + "unknown_col": "drop me", + "another_extra": 42, + }, + ) + + with caplog.at_level("INFO", logger="unstructured_ingest"): + uploader.run(path=path, file_data=_file_data()) + + insert_sql = _insert_sql(mock_cursor) + assert "unknown_col" not in insert_sql + assert "another_extra" not in insert_sql + drop_lines = [r for r in caplog.records if "dropped" in r.message.lower()] + assert len(drop_lines) == 1 + assert "unknown_col" in drop_lines[0].message + assert "another_extra" in drop_lines[0].message From edd7a1ccf888802e21a7dd1fb2d74708e848ae98 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 21 May 2026 12:09:08 -0400 Subject: [PATCH 09/13] chore(PLU-161): patch bump 1.6.3 instead of 1.7.0 Adding a new opt-in config field to one connector matches the patch-bump precedent set by 1.5.2 (delegated oauth_token) and 1.6.1 (refresh_token). Minor bumps in recent history (1.5.0, 1.6.0) have been reserved for ACL pass-through, which adds a new dimension of data flow. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 2 +- unstructured_ingest/__version__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 577122d8d..946ff68e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## [1.7.0] +## [1.6.3] ### Enhancements diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 1042344ec..f005954bd 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "1.7.0" # pragma: no cover +__version__ = "1.6.3" # pragma: no cover From 0c31cb23cbd1a1125e5b5994f2b7198e07a1dd85 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Fri, 22 May 2026 09:38:49 -0400 Subject: [PATCH 10/13] refactor(PLU-161): drop datetime coercion from flatten path Let metadata fields flow into Delta as their JSON-native types. Customers designing the user-managed table should declare datetime fields as STRING. Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 2 +- .../databricks/test_volumes_table.py | 82 ------------------- .../connectors/databricks/volumes_table.py | 26 +----- 3 files changed, 2 insertions(+), 108 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 946ff68e0..b628c8ed8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### Enhancements -- **feat(databricks): add `flatten_metadata` option to the Volumes Delta Tables uploader.** Opt-in, default off. When set, the stager flattens element metadata into top-level columns matching Milvus's unprefixed naming, and the uploader skips auto-create against the user-managed table, dropping unknown incoming columns with a log line. Stringified-epoch datetime fields are coerced to ISO so Databricks's implicit `TIMESTAMP` cast accepts them. +- **feat(databricks): add `flatten_metadata` option to the Volumes Delta Tables uploader.** Opt-in, default off. When set, the stager flattens element metadata into top-level columns matching Milvus's unprefixed naming, and the uploader skips auto-create against the user-managed table, dropping unknown incoming columns with a log line. ## [1.6.2] diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index 45c63a183..611f80e23 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -109,88 +109,6 @@ def test_stager_flatten_stops_at_lists(tmp_path: Path): assert "sent_to_0" not in row -def test_stager_flatten_coerces_epoch_string_datetime(tmp_path: Path): - """Stringified unix epochs in known datetime fields get coerced to ISO format so - Databricks's implicit string -> TIMESTAMP cast succeeds (PLU-161 case I). - - Covers both top-level (`date_processed` injected by the pipeline at element root) - and nested (`data_source_*`) datetime keys. - """ - elements = [ - { - "element_id": "el-1", - "text": "hello", - "metadata": { - "date_processed": "1779329600.0", - "data_source": { - "date_created": "1779329000.0", - "date_modified": "1779329500.0", - "date_processed": "1779329564.5102773", - }, - }, - } - ] - [row] = _run_stager(tmp_path, elements, flatten_metadata=True) - - for key in ( - "date_processed", - "data_source_date_created", - "data_source_date_modified", - "data_source_date_processed", - ): - assert isinstance(row[key], str) - assert row[key].startswith("20"), f"{key} not in ISO format: {row[key]!r}" - # confirm it round-trips through datetime.fromisoformat - from datetime import datetime as _dt - - _dt.fromisoformat(row[key]) - - -def test_stager_flatten_coerces_float_epoch_datetime(tmp_path: Path): - """JSON-parsed numeric epochs (no quotes) arrive as Python floats; coercion - must still pick them up.""" - elements = [ - { - "element_id": "el-1", - "text": "hello", - "metadata": {"data_source": {"date_processed": 1779329564.5102773}}, - } - ] - [row] = _run_stager(tmp_path, elements, flatten_metadata=True) - assert isinstance(row["data_source_date_processed"], str) - assert row["data_source_date_processed"].startswith("20") - - -def test_stager_flatten_leaves_malformed_datetime_alone(tmp_path: Path): - """Unparseable datetime values pass through unchanged so the destination table - can reject them — matches the non-flatten path's behavior.""" - elements = [ - { - "element_id": "el-1", - "text": "hello", - "metadata": {"data_source": {"date_created": "not-a-date"}}, - } - ] - [row] = _run_stager(tmp_path, elements, flatten_metadata=True) - assert row["data_source_date_created"] == "not-a-date" - - -def test_stager_blob_mode_does_not_coerce_datetime(tmp_path: Path): - """The non-flatten path is unchanged — datetime fields stay as strings inside - the JSON blob, matching today's behavior.""" - elements = [ - { - "element_id": "el-1", - "text": "hello", - "metadata": {"data_source": {"date_processed": "1779329564.5102773"}}, - } - ] - [row] = _run_stager(tmp_path, elements, flatten_metadata=False) - assert "metadata" in row - decoded = json.loads(row["metadata"]) - assert decoded["data_source"]["date_processed"] == "1779329564.5102773" - - @pytest.mark.parametrize( "config_cls", [DatabricksVolumeDeltaTableUploaderConfig, DatabricksVolumeDeltaTableStagerConfig], diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index ec64f6d40..28336d2c0 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -1,6 +1,6 @@ import json import os -from contextlib import contextmanager, suppress +from contextlib import contextmanager from dataclasses import dataclass, field from pathlib import Path from typing import TYPE_CHECKING, Any, Generator, Optional @@ -23,10 +23,6 @@ from unstructured_ingest.processes.connectors.sql.databricks_delta_tables import ( DatabricksDeltaTablesConnectionConfig, ) -from unstructured_ingest.processes.connectors.sql.sql import ( - _DATE_COLUMNS, - parse_date_string, -) from unstructured_ingest.utils.constants import RECORD_ID_LABEL from unstructured_ingest.utils.data_prep import ( flatten_dict, @@ -64,25 +60,6 @@ class DatabricksVolumeDeltaTableStagerConfig(UploadStagerConfig): ) -def _coerce_flattened_datetimes(row: dict[str, Any]) -> None: - """Convert stringified-epoch datetime fields to ISO format in-place. - - Element metadata datetime fields (e.g. `date_processed`, `data_source_date_processed`) - arrive as stringified unix epochs like `"1779329564.5102773"`, which Databricks's - implicit string → `TIMESTAMP` cast rejects with `CAST_INVALID_INPUT`. Emit ISO - format instead so `TIMESTAMP` columns coerce natively. Unparseable values are - left alone for the table to reject. - """ - suffixes = tuple(f"_{col}" for col in _DATE_COLUMNS) - for key, value in list(row.items()): - if value is None or not isinstance(value, (str, int, float)): - continue - if key not in _DATE_COLUMNS and not key.endswith(suffixes): - continue - with suppress(Exception): - row[key] = parse_date_string(value).isoformat(sep=" ") - - @dataclass class DatabricksVolumeDeltaTableStager(UploadStager): upload_stager_config: DatabricksVolumeDeltaTableStagerConfig = field( @@ -116,7 +93,6 @@ def run( flatten_lists=False, ) ) - _coerce_flattened_datetimes(element) else: element["metadata"] = json.dumps(metadata) write_data(path=final_output_path, data=data, indent=None) From 459ed9833a2c51f794b4421fc516867ccb33f41d Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Fri, 22 May 2026 11:34:31 -0400 Subject: [PATCH 11/13] test(PLU-161): cover datetime pass-through contract in flatten + blob modes After dropping coercion in 0c31cb23 the existing tests no longer exercised how datetime metadata flows through to Delta. Pin the new contract: string and float epochs pass through unchanged in flatten mode, and the blob mode keeps them byte-identical inside the JSON column. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../databricks/test_volumes_table.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index 611f80e23..f0df845b7 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -109,6 +109,67 @@ def test_stager_flatten_stops_at_lists(tmp_path: Path): assert "sent_to_0" not in row +def test_stager_flatten_passes_datetime_strings_through_unchanged(tmp_path: Path): + """Datetime fields flow into the flattened row as their JSON-native string form — + no ISO coercion. Customers declare these columns as STRING in their Delta table.""" + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": { + "date_processed": "1779329600.0", + "data_source": { + "date_created": "1779329000.0", + "date_modified": "1779329500.0", + "date_processed": "1779329564.5102773", + }, + }, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=True) + + assert row["date_processed"] == "1779329600.0" + assert row["data_source_date_created"] == "1779329000.0" + assert row["data_source_date_modified"] == "1779329500.0" + assert row["data_source_date_processed"] == "1779329564.5102773" + + +def test_stager_flatten_passes_float_epoch_through_unchanged(tmp_path: Path): + """JSON-parsed numeric epochs arrive as Python floats and must pass through + untouched — neither converted to ISO strings nor stringified.""" + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": {"data_source": {"date_processed": 1779329564.5102773}}, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=True) + + assert row["data_source_date_processed"] == 1779329564.5102773 + assert isinstance(row["data_source_date_processed"], float) + + +def test_stager_blob_mode_preserves_datetime_strings(tmp_path: Path): + """The non-flatten path is unchanged — datetime fields stay as strings inside + the JSON blob with byte-identical values.""" + elements = [ + { + "element_id": "el-1", + "text": "hello", + "metadata": { + "date_processed": "1779329600.0", + "data_source": {"date_processed": "1779329564.5102773"}, + }, + } + ] + [row] = _run_stager(tmp_path, elements, flatten_metadata=False) + + decoded = json.loads(row["metadata"]) + assert decoded["date_processed"] == "1779329600.0" + assert decoded["data_source"]["date_processed"] == "1779329564.5102773" + + @pytest.mark.parametrize( "config_cls", [DatabricksVolumeDeltaTableUploaderConfig, DatabricksVolumeDeltaTableStagerConfig], From ef5c24695e91e67456edb5bb06f549364b1d924f Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Fri, 22 May 2026 12:54:02 -0400 Subject: [PATCH 12/13] feat(PLU-161): fail fast in precheck when flatten_metadata table is missing Adds a flatten-gated SHOW TABLES check to the Volumes Delta Tables precheck so a misconfigured workflow fails at save time instead of per-document at run time. create_destination under flatten mode becomes an early-return no-op so precheck is the single source of truth. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../databricks/test_volumes_table.py | 79 +++++++++++++++---- .../connectors/databricks/volumes_table.py | 20 +++-- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index f0df845b7..0331b7d22 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -226,25 +226,24 @@ def _insert_sql(mock_cursor: MagicMock) -> str: return next(sql for sql in _executed_sql(mock_cursor) if sql.startswith("INSERT")) -def test_create_destination_flatten_true_missing_table_raises( +def test_create_destination_flatten_true_is_noop( mock_cursor: MagicMock, mock_get_cursor: MagicMock ): - """PLU-161 case E: with flatten_metadata=true, missing table is a hard fail — - auto-create is disabled to prevent silent data loss.""" + """Under flatten_metadata=true the user manages the destination table; precheck + validates existence, so create_destination is a no-op and must not touch the + warehouse (no SHOW TABLES, no CREATE TABLE).""" uploader = _make_uploader(flatten_metadata=True, table_name="missing_table") - mock_cursor.fetchall.return_value = [] # SHOW TABLES → no rows - with pytest.raises(IngestValueError, match="auto-create is disabled"): - uploader.create_destination() + assert uploader.create_destination() is False - assert not any("CREATE TABLE" in sql for sql in _executed_sql(mock_cursor)) + assert _executed_sql(mock_cursor) == [] def test_create_destination_flatten_false_missing_table_autocreates( mock_cursor: MagicMock, mock_get_cursor: MagicMock ): - """PLU-161 case C: backcompat — flatten=false still auto-creates from the - asset schema when the destination table is missing.""" + """flatten=false still auto-creates from the asset schema when the destination + table is missing.""" uploader = _make_uploader(flatten_metadata=False, table_name="new_table") mock_cursor.fetchall.return_value = [] # SHOW TABLES → no rows @@ -255,13 +254,12 @@ def test_create_destination_flatten_false_missing_table_autocreates( assert "new_table" in create_sqls[0] -@pytest.mark.parametrize("flatten_metadata", [True, False]) -def test_create_destination_existing_table_returns_false( - mock_cursor: MagicMock, mock_get_cursor: MagicMock, flatten_metadata: bool +def test_create_destination_flatten_false_existing_table_returns_false( + mock_cursor: MagicMock, mock_get_cursor: MagicMock ): - """When the table already exists, create_destination is a no-op regardless of - the flag — neither branch should attempt a CREATE TABLE.""" - uploader = _make_uploader(flatten_metadata=flatten_metadata, table_name="elements") + """flatten=false with an existing table is a no-op — SHOW TABLES finds the + name and we return False without issuing a CREATE.""" + uploader = _make_uploader(flatten_metadata=False, table_name="elements") # `SHOW TABLES` rows are `(database, tableName, isTemporary)`; r[1] is the name. mock_cursor.fetchall.return_value = [("sch", "elements", False)] @@ -269,6 +267,57 @@ def test_create_destination_existing_table_returns_false( assert not any("CREATE TABLE" in sql for sql in _executed_sql(mock_cursor)) +def _precheck_fetchall(*, catalogs, databases, tables): + """SHOW CATALOGS / SHOW DATABASES / SHOW TABLES are the only fetchall calls in + precheck, in that order. SHOW TABLES rows are (database, tableName, isTemporary).""" + return [ + [(c,) for c in catalogs], + [(d,) for d in databases], + [("db", t, False) for t in tables], + ] + + +def test_precheck_flatten_true_missing_table_raises( + mock_cursor: MagicMock, mock_get_cursor: MagicMock +): + """With flatten_metadata=true, precheck must fail fast when the destination table + is missing — otherwise the per-document INSERT is the first thing that notices.""" + uploader = _make_uploader(flatten_metadata=True, table_name="missing_table") + mock_cursor.fetchall.side_effect = _precheck_fetchall( + catalogs=["cat"], databases=["db"], tables=["other_table"] + ) + + with pytest.raises(IngestValueError, match="auto-create is disabled"): + uploader.precheck() + + +def test_precheck_flatten_true_existing_table_passes( + mock_cursor: MagicMock, mock_get_cursor: MagicMock +): + uploader = _make_uploader(flatten_metadata=True, table_name="elements") + mock_cursor.fetchall.side_effect = _precheck_fetchall( + catalogs=["cat"], databases=["db"], tables=["elements"] + ) + + uploader.precheck() + + +def test_precheck_flatten_false_skips_table_check( + mock_cursor: MagicMock, mock_get_cursor: MagicMock +): + """Backcompat: flatten=false leaves table existence to create_destination / + auto-create, so precheck must not fail when the table is missing.""" + uploader = _make_uploader(flatten_metadata=False, table_name="missing_table") + mock_cursor.fetchall.side_effect = [ + [("cat",)], # SHOW CATALOGS + [("db",)], # SHOW DATABASES + ] + + uploader.precheck() + + assert not any(sql == "SHOW TABLES" for sql in _executed_sql(mock_cursor)) + + def _staged_elements(tmp_path: Path, row: dict) -> Path: path = tmp_path / "staged.json" path.write_text(json.dumps([row])) diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index 28336d2c0..1ce0c7ae8 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -114,6 +114,9 @@ def create_destination( ) -> bool: table_name = self.upload_config.table_name or destination_name self.upload_config.table_name = table_name + if self.upload_config.flatten_metadata: + # User manages the table under flatten mode; precheck validates existence. + return False connectors_dir = Path(__file__).parents[1] collection_config_file = connectors_dir / "assets" / "databricks_delta_table_schema.sql" with self.get_cursor() as cursor: @@ -121,12 +124,6 @@ def create_destination( table_names = [r[1] for r in cursor.fetchall()] if table_name in table_names: return False - if self.upload_config.flatten_metadata: - raise ValueError( - f"Table {table_name!r} does not exist. With flatten_metadata=true, " - "the destination table must be pre-created — auto-create is disabled " - "to prevent silent data loss." - ) with collection_config_file.open() as schema_file: data_lines = schema_file.readlines() data_lines[0] = data_lines[0].replace("elements", table_name) @@ -154,6 +151,17 @@ def precheck(self) -> None: self.upload_config.database, ", ".join(databases) ) ) + if self.upload_config.flatten_metadata: + cursor.execute(f"USE DATABASE {quote_identifier(self.upload_config.database)}") + cursor.execute("SHOW TABLES") + table_names = [r[1] for r in cursor.fetchall()] + table_name = self.upload_config.table_name or "unstructuredautocreated" + if table_name not in table_names: + raise ValueError( + f"Table {table_name!r} does not exist. With flatten_metadata=true, " + "the destination table must be pre-created — auto-create is " + "disabled to prevent silent data loss." + ) def get_output_path(self, file_data: FileData, suffix: str = ".json") -> str: filename = Path(file_data.source_identifiers.filename) From d6952d6bb6fdbc2e02a3c283beaff12daa5c4e93 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Fri, 22 May 2026 13:19:41 -0400 Subject: [PATCH 13/13] chore(PLU-161): tighten missing-table precheck message Co-Authored-By: Claude Opus 4.7 (1M context) --- test/unit/connectors/databricks/test_volumes_table.py | 2 +- .../processes/connectors/databricks/volumes_table.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/test/unit/connectors/databricks/test_volumes_table.py b/test/unit/connectors/databricks/test_volumes_table.py index 0331b7d22..e636f09f3 100644 --- a/test/unit/connectors/databricks/test_volumes_table.py +++ b/test/unit/connectors/databricks/test_volumes_table.py @@ -287,7 +287,7 @@ def test_precheck_flatten_true_missing_table_raises( catalogs=["cat"], databases=["db"], tables=["other_table"] ) - with pytest.raises(IngestValueError, match="auto-create is disabled"): + with pytest.raises(IngestValueError, match="must be pre-created"): uploader.precheck() diff --git a/unstructured_ingest/processes/connectors/databricks/volumes_table.py b/unstructured_ingest/processes/connectors/databricks/volumes_table.py index 1ce0c7ae8..2998cf0d0 100644 --- a/unstructured_ingest/processes/connectors/databricks/volumes_table.py +++ b/unstructured_ingest/processes/connectors/databricks/volumes_table.py @@ -159,8 +159,7 @@ def precheck(self) -> None: if table_name not in table_names: raise ValueError( f"Table {table_name!r} does not exist. With flatten_metadata=true, " - "the destination table must be pre-created — auto-create is " - "disabled to prevent silent data loss." + "the destination table must be pre-created." ) def get_output_path(self, file_data: FileData, suffix: str = ".json") -> str: