diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d154716f..791086848 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## [1.4.21] + +### Fixes + +- **fix(teradata):** validate destination table schema in precheck and upload — reject tables missing required columns with a clear error message instead of failing silently at runtime + ## [1.4.20] ### Fixes diff --git a/test/unit/connectors/sql/test_teradata.py b/test/unit/connectors/sql/test_teradata.py index 50b4eec09..b027140e8 100644 --- a/test/unit/connectors/sql/test_teradata.py +++ b/test/unit/connectors/sql/test_teradata.py @@ -6,7 +6,11 @@ from pytest_mock import MockerFixture from unstructured_ingest.data_types.file_data import FileData, SourceIdentifiers -from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError +from unstructured_ingest.error import ( + DestinationConnectionError, + DestinationSchemaError, + SourceConnectionError, +) from unstructured_ingest.processes.connectors.sql.teradata import ( DEFAULT_TABLE_NAME, TeradataAccessConfig, @@ -340,10 +344,11 @@ def test_teradata_uploader_upload_dataframe_quotes_column_names( "text": ["text1", "text2"], "type": ["Title", "NarrativeText"], "record_id": ["file1", "file1"], + "element_id": ["e1", "e2"], } ) - teradata_uploader._columns = ["id", "text", "type", "record_id"] + teradata_uploader._columns = ["id", "text", "type", "record_id", "element_id"] mocker.patch.object(teradata_uploader, "_fit_to_schema", return_value=df) mocker.patch.object(teradata_uploader, "can_delete", return_value=False) @@ -529,11 +534,19 @@ def test_teradata_uploader_precheck_success( teradata_uploader: TeradataUploader, mock_get_cursor: MagicMock, ): - """Test that uploader precheck only validates connection, not table existence.""" + """Test that uploader precheck validates connection and schema.""" + # Table exists with all required columns. + # fetchone: SELECT DATABASE → ("testdb",) + # fetchall: SELECT ColumnName FROM DBC.ColumnsV → column rows + mock_cursor.fetchone.return_value = ("testdb",) + mock_cursor.fetchall.return_value = [ + (col,) for col in ["id", "record_id", "element_id", "text", "type", "metadata"] + ] teradata_uploader.precheck() - assert mock_cursor.execute.call_count == 1 - assert mock_cursor.execute.call_args[0][0] == "SELECT 1" + calls = [call[0][0] for call in mock_cursor.execute.call_args_list] + assert "SELECT 1" in calls + assert any("DatabaseName" in c for c in calls) def test_teradata_uploader_precheck_connection_failure( @@ -552,17 +565,21 @@ def test_teradata_uploader_precheck_connection_failure( assert mock_cursor.execute.call_count == 1 -def test_teradata_uploader_precheck_does_not_check_table( +def test_teradata_uploader_precheck_skips_schema_check_when_table_missing( mock_cursor: MagicMock, teradata_uploader: TeradataUploader, mock_get_cursor: MagicMock, ): - """Precheck never checks table existence; create_destination handles missing tables.""" + """Precheck skips schema validation when the table doesn't exist yet.""" + # fetchone: SELECT DATABASE → ("testdb",) + # fetchall: DBC.ColumnsV → [] (table doesn't exist) + mock_cursor.fetchone.return_value = ("testdb",) + mock_cursor.fetchall.return_value = [] teradata_uploader.precheck() calls = [call[0][0] for call in mock_cursor.execute.call_args_list] - assert calls == ["SELECT 1"] - assert not any("SELECT TOP" in c for c in calls) + assert "SELECT 1" in calls + assert any("DBC.ColumnsV" in c for c in calls) def test_teradata_uploader_precheck_rejects_dashes_in_table_name( @@ -697,10 +714,11 @@ def test_teradata_uploader_upload_dataframe_uses_db_case_in_sql( "text": ["text1", "text2"], "type": ["Title", "NarrativeText"], "record_id": ["file1", "file1"], + "element_id": ["e1", "e2"], } ) - teradata_uploader._columns = ["ID", "TEXT", "TYPE", "RECORD_ID"] + teradata_uploader._columns = ["ID", "TEXT", "TYPE", "RECORD_ID", "ELEMENT_ID"] mocker.patch.object(teradata_uploader, "_fit_to_schema", return_value=df) mocker.patch.object(teradata_uploader, "can_delete", return_value=False) @@ -1103,3 +1121,89 @@ def test_teradata_uploader_create_destination_rejects_dashes_in_destination_name """create_destination raises when destination_name contains dashes.""" with pytest.raises(DestinationConnectionError, match="cannot contain dashes"): teradata_uploader_auto_create.create_destination(destination_name="my-bad-table") + + +def test_teradata_uploader_precheck_rejects_missing_columns( + mock_cursor: MagicMock, + teradata_uploader: TeradataUploader, + mock_get_cursor: MagicMock, +): + """Precheck raises DestinationSchemaError when table is missing required columns.""" + # Table exists but only has id and text + mock_cursor.fetchone.return_value = ("testdb",) + mock_cursor.fetchall.return_value = [("id",), ("text",)] + + with pytest.raises(DestinationSchemaError, match="missing required columns"): + teradata_uploader.precheck() + + +def test_teradata_uploader_precheck_schema_check_skipped_when_table_name_none( + mock_cursor: MagicMock, + teradata_uploader_auto_create: TeradataUploader, + mock_get_cursor: MagicMock, +): + """Precheck skips schema validation when table_name is None (auto-create mode).""" + teradata_uploader_auto_create.precheck() + + calls = [call[0][0] for call in mock_cursor.execute.call_args_list] + assert calls == ["SELECT 1"] + + +def test_teradata_uploader_precheck_schema_check_tolerates_extra_columns( + mock_cursor: MagicMock, + teradata_uploader: TeradataUploader, + mock_get_cursor: MagicMock, +): + """Precheck passes when table has all required columns plus extras.""" + mock_cursor.fetchone.return_value = ("testdb",) + mock_cursor.fetchall.return_value = [ + (col,) + for col in ["id", "record_id", "element_id", "text", "type", "metadata", "custom_col"] + ] + # Should not raise + teradata_uploader.precheck() + + +def test_teradata_uploader_precheck_schema_check_case_insensitive( + mock_cursor: MagicMock, + teradata_uploader: TeradataUploader, + mock_get_cursor: MagicMock, +): + """Precheck schema validation is case-insensitive (Teradata may store uppercase).""" + mock_cursor.fetchone.side_effect = [("testdb",), (1,)] + mock_cursor.fetchall.return_value = [ + (col,) + for col in ["ID", "RECORD_ID", "ELEMENT_ID", "TEXT", "TYPE", "METADATA"] + ] + # Should not raise + teradata_uploader.precheck() + + +def test_teradata_uploader_upload_dataframe_rejects_missing_columns( + mock_cursor: MagicMock, + teradata_uploader: TeradataUploader, + mock_get_cursor: MagicMock, +): + """upload_dataframe raises DestinationSchemaError when table schema is incomplete.""" + # Table only has id and text columns + mock_cursor.description = [("id",), ("text",)] + mock_cursor.fetchall.return_value = [] + + df = pd.DataFrame( + { + "id": ["1"], + "record_id": ["r1"], + "element_id": ["e1"], + "text": ["hello"], + "type": ["NarrativeText"], + "metadata": ["{}"], + } + ) + file_data = FileData( + identifier="test", + connector_type="teradata", + source_identifiers=SourceIdentifiers(filename="test.txt", fullpath="test.txt"), + ) + + with pytest.raises(DestinationSchemaError, match="missing required columns"): + teradata_uploader.upload_dataframe(df=df, file_data=file_data) diff --git a/unstructured_ingest/__version__.py b/unstructured_ingest/__version__.py index 743cb66a8..8837bc2b4 100644 --- a/unstructured_ingest/__version__.py +++ b/unstructured_ingest/__version__.py @@ -1 +1 @@ -__version__ = "1.4.20" # pragma: no cover +__version__ = "1.4.21" # pragma: no cover diff --git a/unstructured_ingest/error.py b/unstructured_ingest/error.py index e04af8188..1f7b2f41e 100644 --- a/unstructured_ingest/error.py +++ b/unstructured_ingest/error.py @@ -47,6 +47,11 @@ class DestinationConnectionError(ConnectionError): status_code: Optional[int] = 400 +class DestinationSchemaError(UnstructuredIngestError): + error_string = "Destination table schema error: {}" + status_code: Optional[int] = 400 + + class EmbeddingEncoderConnectionError(ConnectionError): error_string = "Error in connecting to the embedding model provider: {}" status_code: Optional[int] = 400 @@ -148,6 +153,7 @@ class IcebergCommitFailedException(UnstructuredIngestError): SourceConnectionError, SourceConnectionNetworkError, DestinationConnectionError, + DestinationSchemaError, EmbeddingEncoderConnectionError, ] diff --git a/unstructured_ingest/processes/connectors/sql/teradata.py b/unstructured_ingest/processes/connectors/sql/teradata.py index b880b140d..70fdf14d6 100644 --- a/unstructured_ingest/processes/connectors/sql/teradata.py +++ b/unstructured_ingest/processes/connectors/sql/teradata.py @@ -8,7 +8,11 @@ from pydantic import Field, Secret, field_validator from unstructured_ingest.data_types.file_data import FileData -from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError +from unstructured_ingest.error import ( + DestinationConnectionError, + DestinationSchemaError, + SourceConnectionError, +) from unstructured_ingest.logger import logger from unstructured_ingest.processes.connector_registry import ( DestinationRegistryEntry, @@ -37,6 +41,10 @@ CONNECTOR_TYPE = "teradata" DEFAULT_TABLE_NAME = "unstructuredautocreated" +# Columns that must exist in the destination table for uploads to succeed. +# record_id is needed for delete-before-upsert; element_id, text, and type +# carry the core document content. +REQUIRED_DESTINATION_COLUMNS = {"id", "record_id", "element_id", "text", "type"} def _summarize_error(host: str, raw: Exception, context: str = "") -> str: @@ -378,6 +386,40 @@ def precheck(self) -> None: logger.error(f"failed to validate connection: {e}", exc_info=True) raise DestinationConnectionError(_summarize_error(self.connection_config.host, e)) + if table_name: + self._validate_table_schema(table_name) + + def _validate_table_schema(self, table_name: str) -> None: + """Check that an existing table has the required columns for uploads.""" + try: + with self.get_cursor() as cursor: + cursor.execute("SELECT DATABASE") + current_db = cursor.fetchone()[0].strip() + cursor.execute( + "SELECT ColumnName FROM DBC.ColumnsV " + "WHERE TableName = ? AND DatabaseName = ?", + [table_name, current_db], + ) + rows = cursor.fetchall() + if not rows: + # Table doesn't exist yet — create_destination will handle it. + return + existing = {row[0].strip().lower() for row in rows} + except Exception: + # If we can't query DBC metadata, skip validation — the upload + # will fail with its own error if the schema is truly wrong. + return + + missing = REQUIRED_DESTINATION_COLUMNS - existing + if missing: + raise DestinationSchemaError( + f"Table '{table_name}' is missing required columns: " + f"{', '.join(sorted(missing))}. " + f"Expected columns: {', '.join(sorted(REQUIRED_DESTINATION_COLUMNS))}. " + "Drop the table and let the connector recreate it, or " + "add the missing columns." + ) + def get_table_columns(self) -> list[str]: if self._columns is None: with self.get_cursor() as cursor: @@ -412,6 +454,20 @@ def delete_by_record_id(self, file_data: FileData) -> None: def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None: import numpy as np + table_name = self.upload_config.table_name + + # Validate schema before attempting any writes. + table_columns_lower = {col.lower() for col in self.get_table_columns()} + missing = REQUIRED_DESTINATION_COLUMNS - table_columns_lower + if missing: + raise DestinationSchemaError( + f"Table '{table_name}' is missing required columns: " + f"{', '.join(sorted(missing))}. " + f"Expected columns: {', '.join(sorted(REQUIRED_DESTINATION_COLUMNS))}. " + "Drop the table and let the connector recreate it, or " + "add the missing columns." + ) + if self.can_delete(): self.delete_by_record_id(file_data=file_data) else: @@ -429,14 +485,14 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None: quoted_columns = [f'"{col}"' for col in db_columns] stmt = "INSERT INTO {table_name} ({columns}) VALUES({values})".format( - table_name=f'"{self.upload_config.table_name}"', + table_name=f'"{table_name}"', columns=",".join(quoted_columns), values=",".join([self.values_delimiter for _ in columns]), ) logger.info( f"writing a total of {len(df)} elements via" f" document batches to destination" - f" table named {self.upload_config.table_name}" + f" table named {table_name}" f" with batch size {self.upload_config.batch_size}" ) for rows in split_dataframe(df=df, chunk_size=self.upload_config.batch_size):