Skip to content

Commit 43ab1d5

Browse files
izeigermanz3z1ma
andauthored
Feat!: Use pre / post SQL statements instead of hooks (#910)
* feat: improve pre and post hook impl * Drop support for hooks. Rely on pre/post SQL statements instead --------- Co-authored-by: z3z1ma <butler.alex2010@gmail.com>
1 parent 9884cc3 commit 43ab1d5

34 files changed

Lines changed: 671 additions & 585 deletions

docs/concepts/hooks.md

Lines changed: 0 additions & 3 deletions
This file was deleted.

docs/concepts/models/overview.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,22 @@ SELECT UDF(x)::int AS x
132132
FROM y
133133
```
134134

135+
Additional statements can also be provided **after** the main query, in which case they will run after each evaluation of the SELECT query.
136+
137+
```sql linenums="1" hl_lines="5-7"
138+
MODEL (
139+
...
140+
);
141+
142+
...
143+
144+
SELECT UDF(x)::int AS x
145+
FROM y;
146+
147+
-- Cleanup statements
148+
DROP TABLE temp_table;
149+
```
150+
135151
## Time column
136152
Models that are loaded incrementally require a time column to partition data.
137153

examples/sushi/hooks/hooks.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

examples/sushi/models/customers.sql

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ MODEL (
33
kind FULL,
44
owner jen,
55
cron '@daily',
6-
pre NOOP(x=1),
7-
post (noop(), noop(y=['a', 2])),
86
tags (pii, fact)
97
);
108

sqlmesh/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from sqlmesh.core.config import Config
1717
from sqlmesh.core.context import Context, ExecutionContext
1818
from sqlmesh.core.engine_adapter import EngineAdapter
19-
from sqlmesh.core.hooks import hook
2019
from sqlmesh.core.macros import macro
2120
from sqlmesh.core.model import Model, model
2221
from sqlmesh.core.snapshot import Snapshot

sqlmesh/core/constants.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
"""The default maximum row limit that is used when evaluating a model."""
3333

3434
AUDITS = "audits"
35-
HOOKS = "hooks"
3635
MACROS = "macros"
3736
MODELS = "models"
3837
TESTS = "tests"

sqlmesh/core/context.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
from sqlmesh.core.dialect import format_model_expressions, pandas_to_sql, parse
5353
from sqlmesh.core.engine_adapter import EngineAdapter
5454
from sqlmesh.core.environment import Environment
55-
from sqlmesh.core.hooks import hook
5655
from sqlmesh.core.loader import Loader, SqlMeshLoader, update_model_schemas
5756
from sqlmesh.core.macros import ExecutableOrMacro
5857
from sqlmesh.core.model import Model
@@ -216,7 +215,6 @@ def __init__(
216215
self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
217216
self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros")
218217
self._jinja_macros = JinjaMacroRegistry()
219-
self._hooks: UniqueKeyDict[str, hook] = UniqueKeyDict("hooks")
220218

221219
self.path, self.config = t.cast(t.Tuple[Path, Config], next(iter(self.configs.items())))
222220
self.gateway = gateway
@@ -342,7 +340,6 @@ def load(self, update_schemas: bool = True) -> Context:
342340
"""Load all files in the context's path."""
343341
with sys_path(*self.configs):
344342
project = self._loader.load(self, update_schemas)
345-
self._hooks = project.hooks
346343
self._macros = project.macros
347344
self._models = project.models
348345
self._audits = project.audits

sqlmesh/core/engine_adapter/base.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import pandas as pd
1818
from sqlglot import Dialect, exp
1919
from sqlglot.errors import ErrorLevel
20+
from sqlglot.helper import ensure_list
2021

2122
from sqlmesh.core.dialect import pandas_to_sql
2223
from sqlmesh.core.engine_adapter.shared import DataObject, TransactionType
@@ -766,17 +767,19 @@ def supports_transactions(self, transaction_type: TransactionType) -> bool:
766767

767768
def execute(
768769
self,
769-
sql: t.Union[str, exp.Expression],
770+
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
770771
ignore_unsupported_errors: bool = False,
771772
**kwargs: t.Any,
772773
) -> None:
773774
"""Execute a sql query."""
774775
to_sql_kwargs = (
775776
{"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {}
776777
)
777-
sql = self._to_sql(sql, **to_sql_kwargs) if isinstance(sql, exp.Expression) else sql
778-
logger.debug(f"Executing SQL:\n{sql}")
779-
self.cursor.execute(sql, **kwargs)
778+
779+
for e in ensure_list(expressions):
780+
sql = self._to_sql(e, **to_sql_kwargs) if isinstance(e, exp.Expression) else e
781+
logger.debug(f"Executing SQL:\n{sql}")
782+
self.cursor.execute(sql, **kwargs)
780783

781784
def _create_table_properties(
782785
self,

sqlmesh/core/engine_adapter/base_postgres.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def create_view(
8888

8989
def execute(
9090
self,
91-
sql: t.Union[str, exp.Expression],
91+
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
9292
ignore_unsupported_errors: bool = False,
9393
**kwargs: t.Any,
9494
) -> None:
@@ -98,7 +98,7 @@ def execute(
9898
9999
Reference: https://www.psycopg.org/psycopg3/docs/basic/transactions.html
100100
"""
101-
super().execute(sql, ignore_unsupported_errors=ignore_unsupported_errors, **kwargs)
101+
super().execute(expressions, ignore_unsupported_errors=ignore_unsupported_errors, **kwargs)
102102
if not self._connection_pool.is_transaction_active:
103103
self._connection_pool.commit()
104104

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pandas as pd
88
from sqlglot import exp
99
from sqlglot.errors import ErrorLevel
10+
from sqlglot.helper import ensure_list
1011
from sqlglot.transforms import remove_precision_parameterized_types
1112

1213
from sqlmesh.core.engine_adapter.base import EngineAdapter
@@ -383,7 +384,7 @@ def _retryable_execute(
383384

384385
def execute(
385386
self,
386-
sql: t.Union[str, exp.Expression],
387+
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
387388
ignore_unsupported_errors: bool = False,
388389
**kwargs: t.Any,
389390
) -> None:
@@ -393,14 +394,16 @@ def execute(
393394
to_sql_kwargs = (
394395
{"unsupported_level": ErrorLevel.IGNORE} if ignore_unsupported_errors else {}
395396
)
396-
sql = self._to_sql(sql, **to_sql_kwargs) if isinstance(sql, exp.Expression) else sql
397-
logger.debug(f"Executing SQL:\n{sql}")
398-
retry.retry_target(
399-
target=functools.partial(self._retryable_execute, sql=sql),
400-
predicate=_ErrorCounter(self._extra_config["job_retries"]).should_retry,
401-
sleep_generator=retry.exponential_sleep_generator(initial=1.0, maximum=3.0),
402-
deadline=self._extra_config.get("job_retry_deadline_seconds"),
403-
)
397+
398+
for e in ensure_list(expressions):
399+
sql = self._to_sql(e, **to_sql_kwargs) if isinstance(e, exp.Expression) else e
400+
logger.debug(f"Executing SQL:\n{sql}")
401+
retry.retry_target(
402+
target=functools.partial(self._retryable_execute, sql=sql),
403+
predicate=_ErrorCounter(self._extra_config["job_retries"]).should_retry,
404+
sleep_generator=retry.exponential_sleep_generator(initial=1.0, maximum=3.0),
405+
deadline=self._extra_config.get("job_retry_deadline_seconds"),
406+
)
404407

405408
def _get_data_objects(
406409
self, schema_name: str, catalog_name: t.Optional[str] = None

0 commit comments

Comments
 (0)