From 9ab7b3b542788f03ad0c14959a0610edc0b1c51e Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Wed, 22 Apr 2026 14:16:52 +0900 Subject: [PATCH 1/3] Fix CHAR/VARCHAR length overflow by stripping column metadata Some data sources (e.g., Teradata) return CHAR(n) values with space padding via JDBC, resulting in values that exceed the declared column length. Delta enforces CHAR/VARCHAR length constraints through column metadata (__CHAR_VARCHAR_TYPE_STRING), causing writes to fail for these padded values. Strip all column metadata via col.alias(metadata={}) before writing intermediate DataFrames to Delta. This removes the constraint that Delta uses for length enforcement. Observed with Teradata via Lakehouse Federation but not with Lakebase (PostgreSQL) via Lakehouse Federation. Co-authored-by: Isaac --- .../lakebridge/reconcile/recon_capture.py | 17 ++++++ tests/unit/reconcile/test_recon_capture.py | 52 +++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 tests/unit/reconcile/test_recon_capture.py diff --git a/src/databricks/labs/lakebridge/reconcile/recon_capture.py b/src/databricks/labs/lakebridge/reconcile/recon_capture.py index f9425cc3a2..c168337ed1 100644 --- a/src/databricks/labs/lakebridge/reconcile/recon_capture.py +++ b/src/databricks/labs/lakebridge/reconcile/recon_capture.py @@ -8,6 +8,7 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import col, collect_list, create_map, lit +from pyspark.sql.types import StringType from pyspark.errors import PySparkException from sqlglot import Dialect @@ -87,8 +88,23 @@ def _get_uc_volume_path(self): f"{self._metadata_config.volume}" ) + @staticmethod + def _strip_char_varchar_constraints(df: DataFrame) -> DataFrame: + """Strip CHAR(n)/VARCHAR(n) length constraints from DataFrame columns. + + When reconciling data from external sources (e.g., Teradata via Lakehouse Federation), + CHAR columns may contain space-padded values that exceed the declared length limit, + causing DELTA_EXCEED_CHAR_VARCHAR_LIMIT errors when writing to Delta. + + Delta enforces length constraints via column metadata (__CHAR_VARCHAR_TYPE_STRING), + not just the column type. This method strips all column metadata to remove those + constraints. + """ + return df.select(*[col(f.name).alias(f.name, metadata={}) for f in df.schema.fields]) + def _write_df_to_volumes(self, df: DataFrame, path: str) -> None: logger.debug(f"Writing DF on {self._format} to path: {path}") + df = self._strip_char_varchar_constraints(df) df.write.format(self._format).save(path) logger.info(f"Wrote DF on {self._format}") @@ -114,6 +130,7 @@ def write_and_read_df_with_volumes( def _write_df_to_delta(df: DataFrame, table_name: str, mode="append"): try: + df = ReconIntermediatePersist._strip_char_varchar_constraints(df) df.write.mode(mode).saveAsTable(table_name) logger.info(f"Data written to {table_name} successfully.") except Exception as e: diff --git a/tests/unit/reconcile/test_recon_capture.py b/tests/unit/reconcile/test_recon_capture.py new file mode 100644 index 0000000000..4c8c212651 --- /dev/null +++ b/tests/unit/reconcile/test_recon_capture.py @@ -0,0 +1,52 @@ +from unittest.mock import MagicMock + +from pyspark.sql.types import StructType, StructField, StringType, IntegerType + +from databricks.labs.lakebridge.reconcile.recon_capture import ReconIntermediatePersist + + +def _make_df(schema_fields): + """Create a mock DataFrame with given schema fields.""" + schema = StructType(schema_fields) + df = MagicMock() + df.schema = schema + df.columns = [f.name for f in schema_fields] + + def mock_select(*cols): + # select with alias strips metadata — return df with empty metadata on all fields + stripped_fields = [ + StructField(f.name, f.dataType, f.nullable, metadata={}) + for f in schema_fields + ] + return _make_df(stripped_fields) + + df.select = mock_select + return df + + +def test_strip_char_varchar_constraints_strips_metadata(): + """Column metadata should be stripped to remove CHAR/VARCHAR constraints.""" + df = _make_df([ + StructField("id", IntegerType(), False, metadata={}), + StructField("name", StringType(), True, metadata={"__CHAR_VARCHAR_TYPE_STRING": "char(16)"}), + ]) + + result = ReconIntermediatePersist._strip_char_varchar_constraints(df) + + assert result.schema.fields[1].metadata == {} + + +def test_strip_char_varchar_constraints_preserves_types(): + """Column types should be preserved — only metadata is stripped.""" + df = _make_df([ + StructField("id", IntegerType(), False), + StructField("name", StringType(), True), + ]) + + result = ReconIntermediatePersist._strip_char_varchar_constraints(df) + + assert result.schema.fields[0].dataType == IntegerType() + assert result.schema.fields[1].dataType == StringType() + + + From dc7938450f101dbf879fee4440f7d588533d2f91 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Sat, 9 May 2026 09:55:29 +0900 Subject: [PATCH 2/3] Apply black/ruff formatting after main merge - black reformats list comprehension to single line in test helper - ruff removes unused StringType import (was used in main, dropped after merge) Co-authored-by: Isaac --- .../lakebridge/reconcile/recon_capture.py | 1 - tests/unit/reconcile/test_recon_capture.py | 28 +++++++++---------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/databricks/labs/lakebridge/reconcile/recon_capture.py b/src/databricks/labs/lakebridge/reconcile/recon_capture.py index 9eb869e721..88e2d2b7d8 100644 --- a/src/databricks/labs/lakebridge/reconcile/recon_capture.py +++ b/src/databricks/labs/lakebridge/reconcile/recon_capture.py @@ -8,7 +8,6 @@ from pyspark.sql import DataFrame, SparkSession from pyspark.sql.functions import col, collect_list, create_map, lit -from pyspark.sql.types import StringType from pyspark.errors import PySparkException from sqlglot import Dialect diff --git a/tests/unit/reconcile/test_recon_capture.py b/tests/unit/reconcile/test_recon_capture.py index 4c8c212651..7a8697512a 100644 --- a/tests/unit/reconcile/test_recon_capture.py +++ b/tests/unit/reconcile/test_recon_capture.py @@ -14,10 +14,7 @@ def _make_df(schema_fields): def mock_select(*cols): # select with alias strips metadata — return df with empty metadata on all fields - stripped_fields = [ - StructField(f.name, f.dataType, f.nullable, metadata={}) - for f in schema_fields - ] + stripped_fields = [StructField(f.name, f.dataType, f.nullable, metadata={}) for f in schema_fields] return _make_df(stripped_fields) df.select = mock_select @@ -26,10 +23,12 @@ def mock_select(*cols): def test_strip_char_varchar_constraints_strips_metadata(): """Column metadata should be stripped to remove CHAR/VARCHAR constraints.""" - df = _make_df([ - StructField("id", IntegerType(), False, metadata={}), - StructField("name", StringType(), True, metadata={"__CHAR_VARCHAR_TYPE_STRING": "char(16)"}), - ]) + df = _make_df( + [ + StructField("id", IntegerType(), False, metadata={}), + StructField("name", StringType(), True, metadata={"__CHAR_VARCHAR_TYPE_STRING": "char(16)"}), + ] + ) result = ReconIntermediatePersist._strip_char_varchar_constraints(df) @@ -38,15 +37,14 @@ def test_strip_char_varchar_constraints_strips_metadata(): def test_strip_char_varchar_constraints_preserves_types(): """Column types should be preserved — only metadata is stripped.""" - df = _make_df([ - StructField("id", IntegerType(), False), - StructField("name", StringType(), True), - ]) + df = _make_df( + [ + StructField("id", IntegerType(), False), + StructField("name", StringType(), True), + ] + ) result = ReconIntermediatePersist._strip_char_varchar_constraints(df) assert result.schema.fields[0].dataType == IntegerType() assert result.schema.fields[1].dataType == StringType() - - - From 2ef8da0db2e352243aec083f696941ec84d78ef7 Mon Sep 17 00:00:00 2001 From: Noritaka Sekiyama Date: Sat, 9 May 2026 10:19:49 +0900 Subject: [PATCH 3/3] Make strip_char_varchar_constraints public to satisfy pylint `_write_df_to_delta` is a module-level function and accessed ReconIntermediatePersist._strip_char_varchar_constraints from outside the class, which pylint flags as protected-access. Rename to public since the helper is effectively a utility. Also rename mock_select unused arg to *_cols and fix test fn names. Co-authored-by: Isaac --- src/databricks/labs/lakebridge/reconcile/recon_capture.py | 6 +++--- tests/unit/reconcile/test_recon_capture.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/databricks/labs/lakebridge/reconcile/recon_capture.py b/src/databricks/labs/lakebridge/reconcile/recon_capture.py index 88e2d2b7d8..e5f4a7f1a0 100644 --- a/src/databricks/labs/lakebridge/reconcile/recon_capture.py +++ b/src/databricks/labs/lakebridge/reconcile/recon_capture.py @@ -91,7 +91,7 @@ def _get_uc_volume_path(self): ) @staticmethod - def _strip_char_varchar_constraints(df: DataFrame) -> DataFrame: + def strip_char_varchar_constraints(df: DataFrame) -> DataFrame: """Strip CHAR(n)/VARCHAR(n) length constraints from DataFrame columns. When reconciling data from external sources (e.g., Teradata via Lakehouse Federation), @@ -106,7 +106,7 @@ def _strip_char_varchar_constraints(df: DataFrame) -> DataFrame: def _write_df_to_volumes(self, df: DataFrame, path: str) -> None: logger.debug(f"Writing DF on {self._format} to path: {path}") - df = self._strip_char_varchar_constraints(df) + df = self.strip_char_varchar_constraints(df) df.write.format(self._format).save(path) logger.info(f"Wrote DF on {self._format}") @@ -132,7 +132,7 @@ def write_and_read_df_with_volumes( def _write_df_to_delta(df: DataFrame, table_name: str, mode="append"): try: - df = ReconIntermediatePersist._strip_char_varchar_constraints(df) + df = ReconIntermediatePersist.strip_char_varchar_constraints(df) df.write.mode(mode).saveAsTable(table_name) logger.info(f"Data written to {table_name} successfully.") except Exception as e: diff --git a/tests/unit/reconcile/test_recon_capture.py b/tests/unit/reconcile/test_recon_capture.py index 7a8697512a..4edbfca1ab 100644 --- a/tests/unit/reconcile/test_recon_capture.py +++ b/tests/unit/reconcile/test_recon_capture.py @@ -12,7 +12,7 @@ def _make_df(schema_fields): df.schema = schema df.columns = [f.name for f in schema_fields] - def mock_select(*cols): + def mock_select(*_cols): # select with alias strips metadata — return df with empty metadata on all fields stripped_fields = [StructField(f.name, f.dataType, f.nullable, metadata={}) for f in schema_fields] return _make_df(stripped_fields) @@ -30,7 +30,7 @@ def test_strip_char_varchar_constraints_strips_metadata(): ] ) - result = ReconIntermediatePersist._strip_char_varchar_constraints(df) + result = ReconIntermediatePersist.strip_char_varchar_constraints(df) assert result.schema.fields[1].metadata == {} @@ -44,7 +44,7 @@ def test_strip_char_varchar_constraints_preserves_types(): ] ) - result = ReconIntermediatePersist._strip_char_varchar_constraints(df) + result = ReconIntermediatePersist.strip_char_varchar_constraints(df) assert result.schema.fields[0].dataType == IntegerType() assert result.schema.fields[1].dataType == StringType()