diff --git a/src/databricks/labs/lakebridge/reconcile/recon_capture.py b/src/databricks/labs/lakebridge/reconcile/recon_capture.py index 4ef602c3da..e5f4a7f1a0 100644 --- a/src/databricks/labs/lakebridge/reconcile/recon_capture.py +++ b/src/databricks/labs/lakebridge/reconcile/recon_capture.py @@ -90,8 +90,23 @@ def _get_uc_volume_path(self): f"{self._metadata_config.volume}" ) + @staticmethod + def strip_char_varchar_constraints(df: DataFrame) -> DataFrame: + """Strip CHAR(n)/VARCHAR(n) length constraints from DataFrame columns. + + When reconciling data from external sources (e.g., Teradata via Lakehouse Federation), + CHAR columns may contain space-padded values that exceed the declared length limit, + causing DELTA_EXCEED_CHAR_VARCHAR_LIMIT errors when writing to Delta. + + Delta enforces length constraints via column metadata (__CHAR_VARCHAR_TYPE_STRING), + not just the column type. This method strips all column metadata to remove those + constraints. + """ + return df.select(*[col(f.name).alias(f.name, metadata={}) for f in df.schema.fields]) + def _write_df_to_volumes(self, df: DataFrame, path: str) -> None: logger.debug(f"Writing DF on {self._format} to path: {path}") + df = self.strip_char_varchar_constraints(df) df.write.format(self._format).save(path) logger.info(f"Wrote DF on {self._format}") @@ -117,6 +132,7 @@ def write_and_read_df_with_volumes( def _write_df_to_delta(df: DataFrame, table_name: str, mode="append"): try: + df = ReconIntermediatePersist.strip_char_varchar_constraints(df) df.write.mode(mode).saveAsTable(table_name) logger.info(f"Data written to {table_name} successfully.") except Exception as e: diff --git a/tests/unit/reconcile/test_recon_capture.py b/tests/unit/reconcile/test_recon_capture.py new file mode 100644 index 0000000000..4edbfca1ab --- /dev/null +++ b/tests/unit/reconcile/test_recon_capture.py @@ -0,0 +1,50 @@ +from unittest.mock import MagicMock + +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +from databricks.labs.lakebridge.reconcile.recon_capture import ReconIntermediatePersist + + +def _make_df(schema_fields): + """Create a mock DataFrame with given schema fields.""" + schema = StructType(schema_fields) + df = MagicMock() + df.schema = schema + df.columns = [f.name for f in schema_fields] + + def mock_select(*_cols): + # select with alias strips metadata — return df with empty metadata on all fields + stripped_fields = [StructField(f.name, f.dataType, f.nullable, metadata={}) for f in schema_fields] + return _make_df(stripped_fields) + + df.select = mock_select + return df + + +def test_strip_char_varchar_constraints_strips_metadata(): + """Column metadata should be stripped to remove CHAR/VARCHAR constraints.""" + df = _make_df( + [ + StructField("id", IntegerType(), False, metadata={}), + StructField("name", StringType(), True, metadata={"__CHAR_VARCHAR_TYPE_STRING": "char(16)"}), + ] + ) + + result = ReconIntermediatePersist.strip_char_varchar_constraints(df) + + assert result.schema.fields[1].metadata == {} + + +def test_strip_char_varchar_constraints_preserves_types(): + """Column types should be preserved — only metadata is stripped.""" + df = _make_df( + [ + StructField("id", IntegerType(), False), + StructField("name", StringType(), True), + ] + ) + + result = ReconIntermediatePersist.strip_char_varchar_constraints(df) + + assert result.schema.fields[0].dataType == IntegerType() + assert result.schema.fields[1].dataType == StringType()