Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## [1.4.21]

### Fixes

- **fix(teradata):** validate destination table schema in precheck and upload — reject tables missing required columns with a clear error message instead of failing silently at runtime

## [1.4.20]

### Fixes
Expand Down
124 changes: 114 additions & 10 deletions test/unit/connectors/sql/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from pytest_mock import MockerFixture

from unstructured_ingest.data_types.file_data import FileData, SourceIdentifiers
from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError
from unstructured_ingest.error import (
DestinationConnectionError,
DestinationSchemaError,
SourceConnectionError,
)
from unstructured_ingest.processes.connectors.sql.teradata import (
DEFAULT_TABLE_NAME,
TeradataAccessConfig,
Expand Down Expand Up @@ -340,10 +344,11 @@ def test_teradata_uploader_upload_dataframe_quotes_column_names(
"text": ["text1", "text2"],
"type": ["Title", "NarrativeText"],
"record_id": ["file1", "file1"],
"element_id": ["e1", "e2"],
}
)

teradata_uploader._columns = ["id", "text", "type", "record_id"]
teradata_uploader._columns = ["id", "text", "type", "record_id", "element_id"]
mocker.patch.object(teradata_uploader, "_fit_to_schema", return_value=df)
mocker.patch.object(teradata_uploader, "can_delete", return_value=False)

Expand Down Expand Up @@ -529,11 +534,19 @@ def test_teradata_uploader_precheck_success(
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Test that uploader precheck only validates connection, not table existence."""
"""Test that uploader precheck validates connection and schema."""
# Table exists with all required columns.
# fetchone: SELECT DATABASE → ("testdb",)
# fetchall: SELECT ColumnName FROM DBC.ColumnsV → column rows
mock_cursor.fetchone.return_value = ("testdb",)
mock_cursor.fetchall.return_value = [
(col,) for col in ["id", "record_id", "element_id", "text", "type", "metadata"]
]
teradata_uploader.precheck()

assert mock_cursor.execute.call_count == 1
assert mock_cursor.execute.call_args[0][0] == "SELECT 1"
calls = [call[0][0] for call in mock_cursor.execute.call_args_list]
assert "SELECT 1" in calls
assert any("DatabaseName" in c for c in calls)


def test_teradata_uploader_precheck_connection_failure(
Expand All @@ -552,17 +565,21 @@ def test_teradata_uploader_precheck_connection_failure(
assert mock_cursor.execute.call_count == 1


def test_teradata_uploader_precheck_does_not_check_table(
def test_teradata_uploader_precheck_skips_schema_check_when_table_missing(
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Precheck never checks table existence; create_destination handles missing tables."""
"""Precheck skips schema validation when the table doesn't exist yet."""
# fetchone: SELECT DATABASE → ("testdb",)
# fetchall: DBC.ColumnsV → [] (table doesn't exist)
mock_cursor.fetchone.return_value = ("testdb",)
mock_cursor.fetchall.return_value = []
teradata_uploader.precheck()

calls = [call[0][0] for call in mock_cursor.execute.call_args_list]
assert calls == ["SELECT 1"]
assert not any("SELECT TOP" in c for c in calls)
assert "SELECT 1" in calls
assert any("DBC.ColumnsV" in c for c in calls)


def test_teradata_uploader_precheck_rejects_dashes_in_table_name(
Expand Down Expand Up @@ -697,10 +714,11 @@ def test_teradata_uploader_upload_dataframe_uses_db_case_in_sql(
"text": ["text1", "text2"],
"type": ["Title", "NarrativeText"],
"record_id": ["file1", "file1"],
"element_id": ["e1", "e2"],
}
)

teradata_uploader._columns = ["ID", "TEXT", "TYPE", "RECORD_ID"]
teradata_uploader._columns = ["ID", "TEXT", "TYPE", "RECORD_ID", "ELEMENT_ID"]
mocker.patch.object(teradata_uploader, "_fit_to_schema", return_value=df)
mocker.patch.object(teradata_uploader, "can_delete", return_value=False)

Expand Down Expand Up @@ -1103,3 +1121,89 @@ def test_teradata_uploader_create_destination_rejects_dashes_in_destination_name
"""create_destination raises when destination_name contains dashes."""
with pytest.raises(DestinationConnectionError, match="cannot contain dashes"):
teradata_uploader_auto_create.create_destination(destination_name="my-bad-table")


def test_teradata_uploader_precheck_rejects_missing_columns(
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Precheck raises DestinationSchemaError when table is missing required columns."""
# Table exists but only has id and text
mock_cursor.fetchone.return_value = ("testdb",)
mock_cursor.fetchall.return_value = [("id",), ("text",)]

with pytest.raises(DestinationSchemaError, match="missing required columns"):
teradata_uploader.precheck()


def test_teradata_uploader_precheck_schema_check_skipped_when_table_name_none(
mock_cursor: MagicMock,
teradata_uploader_auto_create: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Precheck skips schema validation when table_name is None (auto-create mode)."""
teradata_uploader_auto_create.precheck()

calls = [call[0][0] for call in mock_cursor.execute.call_args_list]
assert calls == ["SELECT 1"]


def test_teradata_uploader_precheck_schema_check_tolerates_extra_columns(
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Precheck passes when table has all required columns plus extras."""
mock_cursor.fetchone.return_value = ("testdb",)
mock_cursor.fetchall.return_value = [
(col,)
for col in ["id", "record_id", "element_id", "text", "type", "metadata", "custom_col"]
]
# Should not raise
teradata_uploader.precheck()


def test_teradata_uploader_precheck_schema_check_case_insensitive(
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""Precheck schema validation is case-insensitive (Teradata may store uppercase)."""
mock_cursor.fetchone.side_effect = [("testdb",), (1,)]
mock_cursor.fetchall.return_value = [
(col,)
for col in ["ID", "RECORD_ID", "ELEMENT_ID", "TEXT", "TYPE", "METADATA"]
]
# Should not raise
teradata_uploader.precheck()


def test_teradata_uploader_upload_dataframe_rejects_missing_columns(
mock_cursor: MagicMock,
teradata_uploader: TeradataUploader,
mock_get_cursor: MagicMock,
):
"""upload_dataframe raises DestinationSchemaError when table schema is incomplete."""
# Table only has id and text columns
mock_cursor.description = [("id",), ("text",)]
mock_cursor.fetchall.return_value = []

df = pd.DataFrame(
{
"id": ["1"],
"record_id": ["r1"],
"element_id": ["e1"],
"text": ["hello"],
"type": ["NarrativeText"],
"metadata": ["{}"],
}
)
file_data = FileData(
identifier="test",
connector_type="teradata",
source_identifiers=SourceIdentifiers(filename="test.txt", fullpath="test.txt"),
)

with pytest.raises(DestinationSchemaError, match="missing required columns"):
teradata_uploader.upload_dataframe(df=df, file_data=file_data)
2 changes: 1 addition & 1 deletion unstructured_ingest/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.4.20" # pragma: no cover
__version__ = "1.4.21" # pragma: no cover
6 changes: 6 additions & 0 deletions unstructured_ingest/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class DestinationConnectionError(ConnectionError):
status_code: Optional[int] = 400


class DestinationSchemaError(UnstructuredIngestError):
error_string = "Destination table schema error: {}"
status_code: Optional[int] = 400


class EmbeddingEncoderConnectionError(ConnectionError):
error_string = "Error in connecting to the embedding model provider: {}"
status_code: Optional[int] = 400
Expand Down Expand Up @@ -148,6 +153,7 @@ class IcebergCommitFailedException(UnstructuredIngestError):
SourceConnectionError,
SourceConnectionNetworkError,
DestinationConnectionError,
DestinationSchemaError,
EmbeddingEncoderConnectionError,
]

Expand Down
62 changes: 59 additions & 3 deletions unstructured_ingest/processes/connectors/sql/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from pydantic import Field, Secret, field_validator

from unstructured_ingest.data_types.file_data import FileData
from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError
from unstructured_ingest.error import (
DestinationConnectionError,
DestinationSchemaError,
SourceConnectionError,
)
from unstructured_ingest.logger import logger
from unstructured_ingest.processes.connector_registry import (
DestinationRegistryEntry,
Expand Down Expand Up @@ -37,6 +41,10 @@

CONNECTOR_TYPE = "teradata"
DEFAULT_TABLE_NAME = "unstructuredautocreated"
# Columns that must exist in the destination table for uploads to succeed.
# record_id is needed for delete-before-upsert; element_id, text, and type
# carry the core document content.
REQUIRED_DESTINATION_COLUMNS = {"id", "record_id", "element_id", "text", "type"}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded record_id ignores configurable record_id_key setting

Medium Severity

REQUIRED_DESTINATION_COLUMNS hardcodes "record_id", but SQLUploaderConfig.record_id_key is a configurable field that users can set to a different column name. Both _validate_table_schema and upload_dataframe compare against this hardcoded set, so a table with a valid custom record-ID column (e.g. "my_record_id") will be falsely rejected as missing "record_id". The required set needs to dynamically include self.upload_config.record_id_key instead.

Additional Locations (2)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a4dfe43. Configure here.



def _summarize_error(host: str, raw: Exception, context: str = "") -> str:
Expand Down Expand Up @@ -378,6 +386,40 @@ def precheck(self) -> None:
logger.error(f"failed to validate connection: {e}", exc_info=True)
raise DestinationConnectionError(_summarize_error(self.connection_config.host, e))

if table_name:
self._validate_table_schema(table_name)

def _validate_table_schema(self, table_name: str) -> None:
"""Check that an existing table has the required columns for uploads."""
try:
with self.get_cursor() as cursor:
cursor.execute("SELECT DATABASE")
current_db = cursor.fetchone()[0].strip()
cursor.execute(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to also check for columns in the schema which are NOT NULL if we don't have a way to fill them? It's not a perfectly easy check because there could be NOT NULL with a default specified also, in which case we don't want to fail (this pattern is common with last_updated columns, for example).

"SELECT ColumnName FROM DBC.ColumnsV "
"WHERE TableName = ? AND DatabaseName = ?",
[table_name, current_db],
)
rows = cursor.fetchall()
if not rows:
# Table doesn't exist yet — create_destination will handle it.
return
Comment thread
cursor[bot] marked this conversation as resolved.
existing = {row[0].strip().lower() for row in rows}
except Exception:
# If we can't query DBC metadata, skip validation — the upload
# will fail with its own error if the schema is truly wrong.
return

missing = REQUIRED_DESTINATION_COLUMNS - existing
if missing:
raise DestinationSchemaError(
f"Table '{table_name}' is missing required columns: "
f"{', '.join(sorted(missing))}. "
f"Expected columns: {', '.join(sorted(REQUIRED_DESTINATION_COLUMNS))}. "
"Drop the table and let the connector recreate it, or "
"add the missing columns."
)
Comment thread
tabossert marked this conversation as resolved.

def get_table_columns(self) -> list[str]:
if self._columns is None:
with self.get_cursor() as cursor:
Expand Down Expand Up @@ -412,6 +454,20 @@ def delete_by_record_id(self, file_data: FileData) -> None:
def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
import numpy as np

table_name = self.upload_config.table_name

# Validate schema before attempting any writes.
table_columns_lower = {col.lower() for col in self.get_table_columns()}
missing = REQUIRED_DESTINATION_COLUMNS - table_columns_lower
if missing:
raise DestinationSchemaError(
f"Table '{table_name}' is missing required columns: "
f"{', '.join(sorted(missing))}. "
f"Expected columns: {', '.join(sorted(REQUIRED_DESTINATION_COLUMNS))}. "
"Drop the table and let the connector recreate it, or "
"add the missing columns."
)

if self.can_delete():
self.delete_by_record_id(file_data=file_data)
else:
Expand All @@ -429,14 +485,14 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None:
quoted_columns = [f'"{col}"' for col in db_columns]

stmt = "INSERT INTO {table_name} ({columns}) VALUES({values})".format(
table_name=f'"{self.upload_config.table_name}"',
table_name=f'"{table_name}"',
columns=",".join(quoted_columns),
values=",".join([self.values_delimiter for _ in columns]),
)
logger.info(
f"writing a total of {len(df)} elements via"
f" document batches to destination"
f" table named {self.upload_config.table_name}"
f" table named {table_name}"
f" with batch size {self.upload_config.batch_size}"
)
for rows in split_dataframe(df=df, chunk_size=self.upload_config.batch_size):
Expand Down
Loading