Skip to content

Commit ee0d9f8

Browse files
committed
Fix MySQL DELETE using CONCAT_WS preventing index usage (#5711)
Override _replace_by_key in MySQLEngineAdapter to use a JOIN-based DELETE for composite keys instead of CONCAT_WS('__SQLMESH_DELIM__', ...). MySQL/MariaDB cannot use indexes on CONCAT_WS expressions, causing full table scans on every incremental run. The JOIN-based approach allows the engine to use existing composite indexes, reducing query times from hours to seconds on large tables. Single-key DELETEs continue to use the efficient IN-based approach.
1 parent c787415 commit ee0d9f8

File tree

2 files changed

+157
-1
lines changed

2 files changed

+157
-1
lines changed

sqlmesh/core/engine_adapter/mysql.py

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import logging
44
import typing as t
5+
from functools import reduce
56

67
from sqlglot import exp, parse_one
78

@@ -21,7 +22,7 @@
2122
)
2223

2324
if t.TYPE_CHECKING:
24-
from sqlmesh.core._typing import SchemaName, TableName
25+
from sqlmesh.core._typing import QueryOrDF, SchemaName, TableName
2526

2627
logger = logging.getLogger(__name__)
2728

@@ -186,5 +187,84 @@ def _create_table_like(
186187
)
187188
)
188189

190+
def _replace_by_key(
191+
self,
192+
target_table: TableName,
193+
source_table: QueryOrDF,
194+
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]],
195+
key: t.Sequence[exp.Expr],
196+
is_unique_key: bool,
197+
source_columns: t.Optional[t.List[str]] = None,
198+
) -> None:
199+
if len(key) <= 1:
200+
return super()._replace_by_key(
201+
target_table, source_table, target_columns_to_types, key, is_unique_key, source_columns
202+
)
203+
204+
if target_columns_to_types is None:
205+
target_columns_to_types = self.columns(target_table)
206+
207+
temp_table = self._get_temp_table(target_table)
208+
column_names = list(target_columns_to_types or [])
209+
210+
target_alias = "_target"
211+
temp_alias = "_temp"
212+
213+
with self.transaction():
214+
self.ctas(
215+
temp_table,
216+
source_table,
217+
target_columns_to_types=target_columns_to_types,
218+
exists=False,
219+
source_columns=source_columns,
220+
)
221+
222+
try:
223+
# Build a JOIN-based DELETE instead of using CONCAT_WS.
224+
# CONCAT_WS prevents MySQL/MariaDB from using indexes, causing full table scans.
225+
on_condition = reduce(
226+
lambda a, b: exp.And(this=a, expression=b),
227+
[
228+
self._qualify_columns(k, target_alias).eq(
229+
self._qualify_columns(k, temp_alias)
230+
)
231+
for k in key
232+
],
233+
)
234+
235+
target_table_aliased = exp.to_table(target_table).as_(target_alias, quoted=True)
236+
temp_table_aliased = exp.to_table(temp_table).as_(temp_alias, quoted=True)
237+
238+
join = exp.Join(this=temp_table_aliased, kind="INNER", on=on_condition)
239+
target_table_aliased.append("joins", join)
240+
241+
delete_stmt = exp.Delete(
242+
tables=[exp.to_table(target_alias)],
243+
this=target_table_aliased,
244+
)
245+
self.execute(delete_stmt)
246+
247+
insert_query = self._select_columns(target_columns_to_types).from_(temp_table)
248+
if is_unique_key:
249+
insert_query = insert_query.distinct(*key)
250+
251+
insert_statement = exp.insert(
252+
insert_query,
253+
target_table,
254+
columns=column_names,
255+
)
256+
self.execute(insert_statement, track_rows_processed=True)
257+
finally:
258+
self.drop_table(temp_table)
259+
260+
@staticmethod
261+
def _qualify_columns(expr: exp.Expr, table_alias: str) -> exp.Expr:
262+
"""Qualify unqualified column references in an expression with a table alias."""
263+
expr = expr.copy()
264+
for col in expr.find_all(exp.Column):
265+
if not col.table:
266+
col.set("table", exp.to_identifier(table_alias, quoted=True))
267+
return expr
268+
189269
def ping(self) -> None:
190270
self._connection_pool.get().ping(reconnect=False)

tests/core/engine_adapter/test_mysql.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# type: ignore
22
import typing as t
3+
from unittest.mock import call
34

45
from pytest_mock.plugin import MockerFixture
56
from sqlglot import exp, parse_one
@@ -84,3 +85,78 @@ def test_create_table_like(make_mocked_engine_adapter: t.Callable):
8485
adapter.cursor.execute.assert_called_once_with(
8586
"CREATE TABLE IF NOT EXISTS `target_table` LIKE `source_table`"
8687
)
88+
89+
90+
def test_replace_by_key_composite_uses_join_delete(
91+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
92+
):
93+
"""Composite key DELETE uses JOIN instead of CONCAT_WS to allow index usage."""
94+
adapter = make_mocked_engine_adapter(MySQLEngineAdapter)
95+
temp_table_mock = mocker.patch(
96+
"sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table"
97+
)
98+
temp_table_mock.return_value = exp.to_table("temporary")
99+
100+
adapter.merge(
101+
target_table="target",
102+
source_table=t.cast(exp.Select, parse_one("SELECT id, ts, val FROM source")),
103+
target_columns_to_types={
104+
"id": exp.DataType(this=exp.DataType.Type.INT),
105+
"ts": exp.DataType(this=exp.DataType.Type.TIMESTAMP),
106+
"val": exp.DataType(this=exp.DataType.Type.INT),
107+
},
108+
unique_key=[parse_one("id"), parse_one("ts")],
109+
)
110+
111+
sql_calls = to_sql_calls(adapter)
112+
113+
# The DELETE should use a JOIN instead of CONCAT_WS
114+
assert any("CONCAT_WS" in s for s in sql_calls) is False, (
115+
"DELETE should not use CONCAT_WS for composite keys"
116+
)
117+
assert any("INNER JOIN" in s for s in sql_calls) is True, (
118+
"DELETE should use INNER JOIN for composite keys"
119+
)
120+
121+
# Verify the full sequence of SQL calls
122+
adapter.cursor.execute.assert_has_calls(
123+
[
124+
call(
125+
"CREATE TABLE `temporary` AS SELECT CAST(`id` AS SIGNED) AS `id`, CAST(`ts` AS DATETIME) AS `ts`, CAST(`val` AS SIGNED) AS `val` FROM (SELECT `id`, `ts`, `val` FROM `source`) AS `_subquery`"
126+
),
127+
call(
128+
"DELETE `_target` FROM `target` AS `_target` INNER JOIN `temporary` AS `_temp` ON `_target`.`id` = `_temp`.`id` AND `_target`.`ts` = `_temp`.`ts`"
129+
),
130+
call(
131+
"INSERT INTO `target` (`id`, `ts`, `val`) SELECT `id`, `ts`, `val` FROM (SELECT `id` AS `id`, `ts` AS `ts`, `val` AS `val`, ROW_NUMBER() OVER (PARTITION BY `id`, `ts` ORDER BY `id`, `ts`) AS _row_number FROM `temporary`) AS _t WHERE _row_number = 1"
132+
),
133+
call("DROP TABLE IF EXISTS `temporary`"),
134+
]
135+
)
136+
137+
138+
def test_replace_by_key_single_key_uses_in(
139+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
140+
):
141+
"""Single key DELETE still uses the IN-based approach (indexes work fine for single column)."""
142+
adapter = make_mocked_engine_adapter(MySQLEngineAdapter)
143+
temp_table_mock = mocker.patch(
144+
"sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table"
145+
)
146+
temp_table_mock.return_value = exp.to_table("temporary")
147+
148+
adapter.merge(
149+
target_table="target",
150+
source_table=t.cast(exp.Select, parse_one("SELECT id, val FROM source")),
151+
target_columns_to_types={
152+
"id": exp.DataType(this=exp.DataType.Type.INT),
153+
"val": exp.DataType(this=exp.DataType.Type.INT),
154+
},
155+
unique_key=[parse_one("id")],
156+
)
157+
158+
sql_calls = to_sql_calls(adapter)
159+
160+
# Single key should use IN-based approach, not JOIN
161+
assert any("IN" in s and "DELETE" in s for s in sql_calls) is True
162+
assert any("INNER JOIN" in s for s in sql_calls) is False

0 commit comments

Comments
 (0)