-
Notifications
You must be signed in to change notification settings - Fork 57
fix(teradata): validate destination table schema in precheck and upload #686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1f34202
dbdda69
21c0e67
853547d
a4dfe43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| __version__ = "1.4.20" # pragma: no cover | ||
| __version__ = "1.4.21" # pragma: no cover |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,11 @@ | |
| from pydantic import Field, Secret, field_validator | ||
|
|
||
| from unstructured_ingest.data_types.file_data import FileData | ||
| from unstructured_ingest.error import DestinationConnectionError, SourceConnectionError | ||
| from unstructured_ingest.error import ( | ||
| DestinationConnectionError, | ||
| DestinationSchemaError, | ||
| SourceConnectionError, | ||
| ) | ||
| from unstructured_ingest.logger import logger | ||
| from unstructured_ingest.processes.connector_registry import ( | ||
| DestinationRegistryEntry, | ||
|
|
@@ -37,6 +41,10 @@ | |
|
|
||
| CONNECTOR_TYPE = "teradata" | ||
| DEFAULT_TABLE_NAME = "unstructuredautocreated" | ||
| # Columns that must exist in the destination table for uploads to succeed. | ||
| # record_id is needed for delete-before-upsert; element_id, text, and type | ||
| # carry the core document content. | ||
| REQUIRED_DESTINATION_COLUMNS = {"id", "record_id", "element_id", "text", "type"} | ||
|
|
||
|
|
||
| def _summarize_error(host: str, raw: Exception, context: str = "") -> str: | ||
|
|
@@ -378,6 +386,40 @@ def precheck(self) -> None: | |
| logger.error(f"failed to validate connection: {e}", exc_info=True) | ||
| raise DestinationConnectionError(_summarize_error(self.connection_config.host, e)) | ||
|
|
||
| if table_name: | ||
| self._validate_table_schema(table_name) | ||
|
|
||
| def _validate_table_schema(self, table_name: str) -> None: | ||
| """Check that an existing table has the required columns for uploads.""" | ||
| try: | ||
| with self.get_cursor() as cursor: | ||
| cursor.execute("SELECT DATABASE") | ||
| current_db = cursor.fetchone()[0].strip() | ||
| cursor.execute( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to also check for columns in the schema which are |
||
| "SELECT ColumnName FROM DBC.ColumnsV " | ||
| "WHERE TableName = ? AND DatabaseName = ?", | ||
| [table_name, current_db], | ||
| ) | ||
| rows = cursor.fetchall() | ||
| if not rows: | ||
| # Table doesn't exist yet — create_destination will handle it. | ||
| return | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| existing = {row[0].strip().lower() for row in rows} | ||
| except Exception: | ||
| # If we can't query DBC metadata, skip validation — the upload | ||
| # will fail with its own error if the schema is truly wrong. | ||
| return | ||
|
|
||
| missing = REQUIRED_DESTINATION_COLUMNS - existing | ||
| if missing: | ||
| raise DestinationSchemaError( | ||
| f"Table '{table_name}' is missing required columns: " | ||
| f"{', '.join(sorted(missing))}. " | ||
| f"Expected columns: {', '.join(sorted(REQUIRED_DESTINATION_COLUMNS))}. " | ||
| "Drop the table and let the connector recreate it, or " | ||
| "add the missing columns." | ||
| ) | ||
|
tabossert marked this conversation as resolved.
|
||
|
|
||
| def get_table_columns(self) -> list[str]: | ||
| if self._columns is None: | ||
| with self.get_cursor() as cursor: | ||
|
|
@@ -412,6 +454,20 @@ def delete_by_record_id(self, file_data: FileData) -> None: | |
| def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None: | ||
| import numpy as np | ||
|
|
||
| table_name = self.upload_config.table_name | ||
|
|
||
| # Validate schema before attempting any writes. | ||
| table_columns_lower = {col.lower() for col in self.get_table_columns()} | ||
| missing = REQUIRED_DESTINATION_COLUMNS - table_columns_lower | ||
| if missing: | ||
| raise DestinationSchemaError( | ||
| f"Table '{table_name}' is missing required columns: " | ||
| f"{', '.join(sorted(missing))}. " | ||
| f"Expected columns: {', '.join(sorted(REQUIRED_DESTINATION_COLUMNS))}. " | ||
| "Drop the table and let the connector recreate it, or " | ||
| "add the missing columns." | ||
| ) | ||
|
|
||
| if self.can_delete(): | ||
| self.delete_by_record_id(file_data=file_data) | ||
| else: | ||
|
|
@@ -429,14 +485,14 @@ def upload_dataframe(self, df: "DataFrame", file_data: FileData) -> None: | |
| quoted_columns = [f'"{col}"' for col in db_columns] | ||
|
|
||
| stmt = "INSERT INTO {table_name} ({columns}) VALUES({values})".format( | ||
| table_name=f'"{self.upload_config.table_name}"', | ||
| table_name=f'"{table_name}"', | ||
| columns=",".join(quoted_columns), | ||
| values=",".join([self.values_delimiter for _ in columns]), | ||
| ) | ||
| logger.info( | ||
| f"writing a total of {len(df)} elements via" | ||
| f" document batches to destination" | ||
| f" table named {self.upload_config.table_name}" | ||
| f" table named {table_name}" | ||
| f" with batch size {self.upload_config.batch_size}" | ||
| ) | ||
| for rows in split_dataframe(df=df, chunk_size=self.upload_config.batch_size): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hardcoded
record_idignores configurablerecord_id_keysettingMedium Severity
REQUIRED_DESTINATION_COLUMNShardcodes"record_id", butSQLUploaderConfig.record_id_keyis a configurable field that users can set to a different column name. Both_validate_table_schemaandupload_dataframecompare against this hardcoded set, so a table with a valid custom record-ID column (e.g."my_record_id") will be falsely rejected as missing"record_id". The required set needs to dynamically includeself.upload_config.record_id_keyinstead.Additional Locations (2)
unstructured_ingest/processes/connectors/sql/teradata.py#L460-L461unstructured_ingest/processes/connectors/sql/teradata.py#L412-L413Reviewed by Cursor Bugbot for commit a4dfe43. Configure here.