Skip to content

Commit 15c8788

Browse files
treyspeakmanrq
andauthored
Fix(mssql): execute within transactions to prevent hanging (#1532)
* Remove transactiontype * Wrap execute in a transaction * disable transactions duckdb * Remove duckdb transaction test * Fix _transaction typing --------- Co-authored-by: eakmanrq <6326532+eakmanrq@users.noreply.github.com>
1 parent a7e021f commit 15c8788

14 files changed

Lines changed: 39 additions & 120 deletions

File tree

sqlmesh/core/engine_adapter/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from sqlmesh.core.engine_adapter.mysql import MySQLEngineAdapter
1414
from sqlmesh.core.engine_adapter.postgres import PostgresEngineAdapter
1515
from sqlmesh.core.engine_adapter.redshift import RedshiftEngineAdapter
16-
from sqlmesh.core.engine_adapter.shared import TransactionType
1716
from sqlmesh.core.engine_adapter.snowflake import SnowflakeEngineAdapter
1817
from sqlmesh.core.engine_adapter.spark import SparkEngineAdapter
1918

sqlmesh/core/engine_adapter/base.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from sqlglot.optimizer.qualify_columns import quote_identifiers
2727

2828
from sqlmesh.core.dialect import add_table, select_from_values_for_batch_range
29-
from sqlmesh.core.engine_adapter.shared import DataObject, TransactionType
29+
from sqlmesh.core.engine_adapter.shared import DataObject
3030
from sqlmesh.core.model.kind import TimeColumn
3131
from sqlmesh.core.schema_diff import SchemaDiffer
3232
from sqlmesh.utils import double_escape
@@ -117,6 +117,7 @@ class EngineAdapter:
117117
DEFAULT_BATCH_SIZE = 10000
118118
DEFAULT_SQL_GEN_KWARGS: t.Dict[str, str | bool | int] = {}
119119
ESCAPE_JSON = False
120+
SUPPORTS_TRANSACTIONS = True
120121
SUPPORTS_INDEXES = False
121122
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
122123
SUPPORTS_MATERIALIZED_VIEWS = False
@@ -538,7 +539,7 @@ def alter_table(
538539
"""
539540
Performs the required alter statements to change the current table into the structure of the target table.
540541
"""
541-
with self.transaction(TransactionType.DDL):
542+
with self.transaction():
542543
for alter_expression in self.SCHEMA_DIFFER.compare_columns(
543544
current_table_name,
544545
self.columns(current_table_name),
@@ -1216,13 +1217,12 @@ def fetch_pyspark_df(
12161217
@contextlib.contextmanager
12171218
def transaction(
12181219
self,
1219-
transaction_type: TransactionType = TransactionType.DML,
12201220
condition: t.Optional[bool] = None,
12211221
) -> t.Iterator[None]:
12221222
"""A transaction context manager."""
12231223
if (
12241224
self._connection_pool.is_transaction_active
1225-
or not self.supports_transactions(transaction_type)
1225+
or not self.SUPPORTS_TRANSACTIONS
12261226
or (condition is not None and not condition)
12271227
):
12281228
yield
@@ -1236,10 +1236,6 @@ def transaction(
12361236
else:
12371237
self._connection_pool.commit()
12381238

1239-
def supports_transactions(self, transaction_type: TransactionType) -> bool:
1240-
"""Whether or not the engine adapter supports transactions for the given transaction type."""
1241-
return True
1242-
12431239
@contextlib.contextmanager
12441240
def session(self) -> t.Iterator[None]:
12451241
"""A session context manager."""
@@ -1275,14 +1271,15 @@ def execute(
12751271
{"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {}
12761272
)
12771273

1278-
for e in ensure_list(expressions):
1279-
sql = (
1280-
self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs)
1281-
if isinstance(e, exp.Expression)
1282-
else e
1283-
)
1284-
logger.debug(f"Executing SQL:\n{sql}")
1285-
self.cursor.execute(sql, **kwargs)
1274+
with self.transaction():
1275+
for e in ensure_list(expressions):
1276+
sql = (
1277+
self._to_sql(e, quote=quote_identifiers, **to_sql_kwargs)
1278+
if isinstance(e, exp.Expression)
1279+
else e
1280+
)
1281+
logger.debug(f"Executing SQL:\n{sql}")
1282+
self.cursor.execute(sql, **kwargs)
12861283

12871284
@contextlib.contextmanager
12881285
def temp_table(
@@ -1307,7 +1304,7 @@ def temp_table(
13071304
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
13081305
query_or_df, columns_to_types=columns_to_types, target_table=name
13091306
)
1310-
with self.transaction(TransactionType.DDL):
1307+
with self.transaction():
13111308
table = self._get_temp_table(name)
13121309
if table.db:
13131310
self.create_schema(table.db)

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,16 @@
44

55
from sqlglot import exp
66

7-
from sqlmesh.core.engine_adapter.mixins import CommitOnExecuteMixin
8-
from sqlmesh.core.engine_adapter.shared import (
9-
DataObject,
10-
DataObjectType,
11-
TransactionType,
12-
)
7+
from sqlmesh.core.engine_adapter.mixins import EngineAdapter
8+
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
139
from sqlmesh.utils.errors import SQLMeshError
1410

1511
if t.TYPE_CHECKING:
1612
from sqlmesh.core._typing import TableName
1713
from sqlmesh.core.engine_adapter.base import QueryOrDF
1814

1915

20-
class BasePostgresEngineAdapter(CommitOnExecuteMixin):
16+
class BasePostgresEngineAdapter(EngineAdapter):
2117
COLUMNS_TABLE = "information_schema.columns"
2218

2319
def columns(
@@ -85,7 +81,7 @@ def create_view(
8581
8682
Reference: https://www.postgresql.org/docs/current/sql-createview.html
8783
"""
88-
with self.transaction(TransactionType.DDL):
84+
with self.transaction():
8985
if replace:
9086
self.drop_view(view_name, materialized=materialized)
9187
super().create_view(

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,7 @@
1111

1212
from sqlmesh.core.engine_adapter.base import SourceQuery
1313
from sqlmesh.core.engine_adapter.mixins import InsertOverwriteWithMergeMixin
14-
from sqlmesh.core.engine_adapter.shared import (
15-
DataObject,
16-
DataObjectType,
17-
TransactionType,
18-
)
14+
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
1915
from sqlmesh.core.node import IntervalUnit
2016
from sqlmesh.core.schema_diff import SchemaDiffer
2117
from sqlmesh.utils.date import to_datetime
@@ -45,6 +41,7 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin):
4541
DIALECT = "bigquery"
4642
DEFAULT_BATCH_SIZE = 1000
4743
ESCAPE_JSON = True
44+
SUPPORTS_TRANSACTIONS = False
4845
SUPPORTS_MATERIALIZED_VIEWS = True
4946
SUPPORTS_CLONING = True
5047

@@ -478,9 +475,6 @@ def create_state_table(
478475
columns_to_types,
479476
)
480477

481-
def supports_transactions(self, transaction_type: TransactionType) -> bool:
482-
return False
483-
484478
def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any) -> t.Any:
485479
return func(
486480
retry=self.__retry,

sqlmesh/core/engine_adapter/duckdb.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
class DuckDBEngineAdapter(LogicalMergeMixin):
1818
DIALECT = "duckdb"
19+
SUPPORTS_TRANSACTIONS = False
1920

2021
def _df_to_source_queries(
2122
self,

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from sqlglot import exp
77
from sqlglot.optimizer.qualify_columns import quote_identifiers
88

9-
from sqlmesh.core.engine_adapter.base import EngineAdapter, SourceQuery, TransactionType
9+
from sqlmesh.core.engine_adapter.base import EngineAdapter, SourceQuery
1010
from sqlmesh.utils.errors import SQLMeshError
1111

1212
if t.TYPE_CHECKING:
@@ -42,7 +42,7 @@ def merge(
4242
unique_exp = exp.func("CONCAT_WS", "'__SQLMESH_DELIM__'", *unique_key)
4343
column_names = list(columns_to_types or [])
4444

45-
with self.transaction(TransactionType.DML):
45+
with self.transaction():
4646
self.ctas(temp_table, source_table, columns_to_types=columns_to_types, exists=False)
4747
self.execute(
4848
exp.delete(target_table).where(
@@ -77,7 +77,7 @@ def replace_query(
7777

7878
if not self.table_exists(table_name):
7979
return self.ctas(table_name, query_or_df, columns_to_types, exists=False, **kwargs)
80-
with self.transaction(TransactionType.DDL):
80+
with self.transaction():
8181
# TODO: remove quote_identifiers when sqlglot has an expression to represent TRUNCATE
8282
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
8383
query_or_df, columns_to_types, table_name
@@ -125,30 +125,6 @@ def _fetch_native_df(
125125
return read_sql_query(sql, self._connection_pool.get())
126126

127127

128-
class CommitOnExecuteMixin(EngineAdapter):
129-
def execute(
130-
self,
131-
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
132-
ignore_unsupported_errors: bool = False,
133-
quote_identifiers: bool = True,
134-
**kwargs: t.Any,
135-
) -> None:
136-
"""
137-
To make sure that inserts and updates take effect we need to commit explicitly unless the
138-
statement is executed as part of an active transaction.
139-
140-
Reference: https://www.psycopg.org/psycopg3/docs/basic/transactions.html
141-
"""
142-
super().execute(
143-
expressions,
144-
ignore_unsupported_errors=ignore_unsupported_errors,
145-
quote_identifiers=quote_identifiers,
146-
**kwargs,
147-
)
148-
if not self._connection_pool.is_transaction_active:
149-
self._connection_pool.commit()
150-
151-
152128
class InsertOverwriteWithMergeMixin(EngineAdapter):
153129
def _insert_overwrite_by_condition(
154130
self,

sqlmesh/core/engine_adapter/mysql.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import typing as t
44

55
from sqlmesh.core.engine_adapter.mixins import (
6-
CommitOnExecuteMixin,
76
LogicalMergeMixin,
87
LogicalReplaceQueryMixin,
98
PandasNativeFetchDFSupportMixin,
@@ -15,7 +14,6 @@
1514

1615

1716
class MySQLEngineAdapter(
18-
CommitOnExecuteMixin,
1917
LogicalMergeMixin,
2018
LogicalReplaceQueryMixin,
2119
PandasNativeFetchDFSupportMixin,
@@ -54,8 +52,8 @@ def _get_data_objects(
5452
null AS catalog_name,
5553
table_name AS name,
5654
table_schema AS schema_name,
57-
CASE
58-
WHEN table_type = 'BASE TABLE' THEN 'table'
55+
CASE
56+
WHEN table_type = 'BASE TABLE' THEN 'table'
5957
WHEN table_type = 'VIEW' THEN 'view'
6058
ELSE table_type
6159
END AS type

sqlmesh/core/engine_adapter/shared.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,6 @@
88
from sqlmesh.utils.pydantic import PydanticModel
99

1010

11-
class TransactionType(str, Enum):
12-
DDL = "DDL"
13-
DML = "DML"
14-
15-
@property
16-
def is_ddl(self) -> bool:
17-
return self == TransactionType.DDL
18-
19-
@property
20-
def is_dml(self) -> bool:
21-
return self == TransactionType.DML
22-
23-
2411
class DataObjectType(str, Enum):
2512
UNKNOWN = "unknown"
2613
TABLE = "table"

sqlmesh/core/engine_adapter/spark.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,7 @@
1212
InsertOverwriteStrategy,
1313
SourceQuery,
1414
)
15-
from sqlmesh.core.engine_adapter.shared import (
16-
DataObject,
17-
DataObjectType,
18-
TransactionType,
19-
)
15+
from sqlmesh.core.engine_adapter.shared import DataObject, DataObjectType
2016
from sqlmesh.utils import classproperty, nullsafe_join
2117
from sqlmesh.utils.errors import SQLMeshError
2218

@@ -40,6 +36,7 @@
4036
class SparkEngineAdapter(EngineAdapter):
4137
DIALECT = "spark"
4238
ESCAPE_JSON = True
39+
SUPPORTS_TRANSACTIONS = False
4340
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.INSERT_OVERWRITE
4441

4542
@property
@@ -401,6 +398,3 @@ def __table_properties_to_expressions(
401398
return [
402399
exp.Property(this=key, value=value.copy()) for key, value in table_properties.items()
403400
]
404-
405-
def supports_transactions(self, transaction_type: TransactionType) -> bool:
406-
return False

sqlmesh/core/snapshot/evaluator.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from sqlglot.executor import execute
3333

3434
from sqlmesh.core.audit import Audit, AuditResult
35-
from sqlmesh.core.engine_adapter import EngineAdapter, TransactionType
35+
from sqlmesh.core.engine_adapter import EngineAdapter
3636
from sqlmesh.core.engine_adapter.base import InsertOverwriteStrategy
3737
from sqlmesh.core.model import IncrementalUnmanagedKind, Model, SCDType2Kind, ViewKind
3838
from sqlmesh.core.snapshot import (
@@ -160,11 +160,7 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
160160
**common_render_kwargs,
161161
)
162162

163-
with self.adapter.transaction(
164-
transaction_type=TransactionType.DDL
165-
if model.kind.is_view or model.kind.is_full
166-
else TransactionType.DML
167-
), self.adapter.session():
163+
with self.adapter.transaction(), self.adapter.session():
168164
if not limit:
169165
self.adapter.execute(model.render_pre_statements(**render_statements_kwargs))
170166

@@ -418,7 +414,7 @@ def _create_snapshot(
418414

419415
evaluation_strategy = _evaluation_strategy(snapshot, self.adapter)
420416

421-
with self.adapter.transaction(TransactionType.DDL), self.adapter.session():
417+
with self.adapter.transaction(), self.adapter.session():
422418
self.adapter.execute(snapshot.model.render_pre_statements(**render_kwargs))
423419

424420
if is_dev and not snapshot.previous_versions:

0 commit comments

Comments
 (0)