Skip to content

Commit eb49dfc

Browse files
committed
Fix CHAR/VARCHAR length overflow when writing reconcile intermediate data
When reconciling data from external sources (e.g., Teradata via Lakehouse Federation), CHAR columns may contain space-padded values that exceed the declared VARCHAR(n) length limit, causing DELTA_EXCEED_CHAR_VARCHAR_LIMIT errors when writing intermediate DataFrames to Delta volumes or tables. Cast all CHAR(n)/VARCHAR(n) columns to STRING (unbounded) before writing to avoid length constraint violations. Co-authored-by: Isaac
1 parent df7e9f6 commit eb49dfc

2 files changed

Lines changed: 90 additions & 0 deletions

File tree

src/databricks/labs/lakebridge/reconcile/recon_capture.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from pyspark.sql import DataFrame, SparkSession
1010
from pyspark.sql.functions import col, collect_list, create_map, lit
11+
from pyspark.sql.types import StringType
1112
from pyspark.errors import PySparkException
1213
from sqlglot import Dialect
1314

@@ -87,8 +88,23 @@ def _get_uc_volume_path(self):
8788
f"{self._metadata_config.volume}"
8889
)
8990

91+
@staticmethod
92+
def _cast_char_varchar_to_string(df: DataFrame) -> DataFrame:
93+
"""Cast CHAR(n)/VARCHAR(n) columns to STRING to avoid length constraint violations.
94+
95+
When reconciling data from external sources (e.g., Teradata via Lakehouse Federation),
96+
CHAR columns may contain space-padded values that exceed the declared length limit,
97+
causing DELTA_EXCEED_CHAR_VARCHAR_LIMIT errors when writing to Delta.
98+
"""
99+
for field in df.schema.fields:
100+
type_name = field.dataType.simpleString().lower()
101+
if type_name.startswith("varchar") or type_name.startswith("char"):
102+
df = df.withColumn(field.name, col(field.name).cast(StringType()))
103+
return df
104+
90105
def _write_df_to_volumes(self, df: DataFrame, path: str) -> None:
91106
logger.debug(f"Writing DF on {self._format} to path: {path}")
107+
df = self._cast_char_varchar_to_string(df)
92108
df.write.format(self._format).save(path)
93109
logger.info(f"Wrote DF on {self._format}")
94110

@@ -114,6 +130,7 @@ def write_and_read_df_with_volumes(
114130

115131
def _write_df_to_delta(df: DataFrame, table_name: str, mode="append"):
116132
try:
133+
df = ReconIntermediatePersist._cast_char_varchar_to_string(df)
117134
df.write.mode(mode).saveAsTable(table_name)
118135
logger.info(f"Data written to {table_name} successfully.")
119136
except Exception as e:
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from unittest.mock import MagicMock
2+
3+
import pytest
4+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
5+
6+
from databricks.labs.lakebridge.reconcile.recon_capture import ReconIntermediatePersist
7+
8+
9+
def _make_df(schema_fields):
10+
"""Create a mock DataFrame with given schema fields."""
11+
schema = StructType(schema_fields)
12+
df = MagicMock()
13+
df.schema = schema
14+
df.columns = [f.name for f in schema_fields]
15+
16+
def mock_with_column(name, col_expr):
17+
# Return a new mock with updated schema (field cast to StringType)
18+
new_fields = []
19+
for f in df.schema.fields:
20+
if f.name == name:
21+
new_fields.append(StructField(name, StringType(), f.nullable))
22+
else:
23+
new_fields.append(f)
24+
return _make_df(new_fields)
25+
26+
df.withColumn = mock_with_column
27+
return df
28+
29+
30+
def test_cast_char_varchar_to_string_converts_varchar():
31+
"""VARCHAR(n) columns should be cast to STRING."""
32+
df = _make_df([
33+
StructField("id", IntegerType(), False),
34+
StructField("name", StringType(), True),
35+
])
36+
# Simulate VARCHAR(16) by overriding simpleString
37+
varchar_type = MagicMock()
38+
varchar_type.simpleString.return_value = "varchar(16)"
39+
df.schema.fields[1].dataType = varchar_type
40+
41+
result = ReconIntermediatePersist._cast_char_varchar_to_string(df)
42+
43+
# The result should have STRING type for the 'name' field
44+
assert result.schema.fields[1].dataType == StringType()
45+
46+
47+
def test_cast_char_varchar_to_string_converts_char():
48+
"""CHAR(n) columns should be cast to STRING."""
49+
df = _make_df([
50+
StructField("id", IntegerType(), False),
51+
StructField("code", StringType(), True),
52+
])
53+
char_type = MagicMock()
54+
char_type.simpleString.return_value = "char(10)"
55+
df.schema.fields[1].dataType = char_type
56+
57+
result = ReconIntermediatePersist._cast_char_varchar_to_string(df)
58+
59+
assert result.schema.fields[1].dataType == StringType()
60+
61+
62+
def test_cast_char_varchar_to_string_leaves_other_types():
63+
"""Non-CHAR/VARCHAR columns should not be modified."""
64+
df = _make_df([
65+
StructField("id", IntegerType(), False),
66+
StructField("name", StringType(), True),
67+
])
68+
69+
result = ReconIntermediatePersist._cast_char_varchar_to_string(df)
70+
71+
# Schema should be unchanged
72+
assert result.schema.fields[0].dataType == IntegerType()
73+
assert result.schema.fields[1].dataType == StringType()

0 commit comments

Comments
 (0)