Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/databricks/labs/lakebridge/reconcile/recon_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,23 @@ def _get_uc_volume_path(self):
f"{self._metadata_config.volume}"
)

@staticmethod
def strip_char_varchar_constraints(df: DataFrame) -> DataFrame:
"""Strip CHAR(n)/VARCHAR(n) length constraints from DataFrame columns.

When reconciling data from external sources (e.g., Teradata via Lakehouse Federation),
CHAR columns may contain space-padded values that exceed the declared length limit,
causing DELTA_EXCEED_CHAR_VARCHAR_LIMIT errors when writing to Delta.

Delta enforces length constraints via column metadata (__CHAR_VARCHAR_TYPE_STRING),
not just the column type. This method strips all column metadata to remove those
constraints.
"""
return df.select(*[col(f.name).alias(f.name, metadata={}) for f in df.schema.fields])

def _write_df_to_volumes(self, df: DataFrame, path: str) -> None:
logger.debug(f"Writing DF on {self._format} to path: {path}")
df = self.strip_char_varchar_constraints(df)
df.write.format(self._format).save(path)
logger.info(f"Wrote DF on {self._format}")

Expand All @@ -117,6 +132,7 @@ def write_and_read_df_with_volumes(

def _write_df_to_delta(df: DataFrame, table_name: str, mode="append"):
try:
df = ReconIntermediatePersist.strip_char_varchar_constraints(df)
df.write.mode(mode).saveAsTable(table_name)
logger.info(f"Data written to {table_name} successfully.")
except Exception as e:
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/reconcile/test_recon_capture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from unittest.mock import MagicMock

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

from databricks.labs.lakebridge.reconcile.recon_capture import ReconIntermediatePersist


def _make_df(schema_fields):
"""Create a mock DataFrame with given schema fields."""
schema = StructType(schema_fields)
df = MagicMock()
df.schema = schema
df.columns = [f.name for f in schema_fields]

def mock_select(*_cols):
# select with alias strips metadata — return df with empty metadata on all fields
stripped_fields = [StructField(f.name, f.dataType, f.nullable, metadata={}) for f in schema_fields]
return _make_df(stripped_fields)

df.select = mock_select
return df


def test_strip_char_varchar_constraints_strips_metadata():
"""Column metadata should be stripped to remove CHAR/VARCHAR constraints."""
df = _make_df(
[
StructField("id", IntegerType(), False, metadata={}),
StructField("name", StringType(), True, metadata={"__CHAR_VARCHAR_TYPE_STRING": "char(16)"}),
]
)

result = ReconIntermediatePersist.strip_char_varchar_constraints(df)

assert result.schema.fields[1].metadata == {}


def test_strip_char_varchar_constraints_preserves_types():
"""Column types should be preserved — only metadata is stripped."""
df = _make_df(
[
StructField("id", IntegerType(), False),
StructField("name", StringType(), True),
]
)

result = ReconIntermediatePersist.strip_char_varchar_constraints(df)

assert result.schema.fields[0].dataType == IntegerType()
assert result.schema.fields[1].dataType == StringType()
Loading