Skip to content

Commit d45e070

Browse files
authored
Feat: Introduce an additional 'incremental_unmanaged' model kind to support existing incremental mode in dbt projects (#1173)
1 parent 3306d74 commit d45e070

15 files changed

Lines changed: 204 additions & 53 deletions

File tree

docs/integrations/dbt.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,16 @@ To enable incremental_by_time_range incrementality, the model configuration shou
7777

7878
### Incremental logic
7979

80-
SQLMesh will ignore dbt's incremental jinja block `{% if is_incremental() %}` and requires a new jinja block gated by `{% if sqlmesh_incremental is defined %}`. The new block should contain the `WHERE` clause selecting the time interval.
80+
SQLMesh requires a new jinja block gated by `{% if sqlmesh_incremental is defined %}`. The new block should supersede the existing `{% if is_incremental() %}` block and contain the `WHERE` clause selecting the time interval.
8181

8282
For example, the SQL `WHERE` clause with the "ds" column goes in a new jinja block gated by `{% if sqlmesh_incremental is defined %}` as follows:
8383

8484
```bash
8585
> {% if sqlmesh_incremental is defined %}
8686
> WHERE
8787
> ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
88+
> {% elif is_incremental() %}
89+
> ; < your existing is_incremental block >
8890
> {% endif %}
8991
```
9092

sqlmesh/core/engine_adapter/base.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,16 @@ def _insert_append_pandas_df(
592592
):
593593
self.execute(exp.insert(expression, table_name, columns=column_names))
594594

595+
def insert_overwrite(
596+
self,
597+
table_name: TableName,
598+
query_or_df: QueryOrDF,
599+
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
600+
) -> None:
601+
self._insert_overwrite_by_condition(
602+
table_name, query_or_df, columns_to_types=columns_to_types
603+
)
604+
595605
def insert_overwrite_by_time_partition(
596606
self,
597607
table_name: TableName,

sqlmesh/core/model/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sqlmesh.core.model.kind import (
1515
IncrementalByTimeRangeKind,
1616
IncrementalByUniqueKeyKind,
17+
IncrementalUnmanagedKind,
1718
ModelKind,
1819
ModelKindMixin,
1920
ModelKindName,

sqlmesh/core/model/common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ def parse_bool(v: t.Any) -> bool:
6767
"blocking",
6868
"forward_only",
6969
"disable_restatement",
70+
"insert_overwrite",
7071
pre=True,
7172
allow_reuse=True,
7273
check_fields=False,

sqlmesh/core/model/definition.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
IncrementalByUniqueKeyKind,
3030
ModelKindName,
3131
SeedKind,
32-
_Incremental,
3332
)
3433
from sqlmesh.core.model.meta import ModelMeta
3534
from sqlmesh.core.model.seed import Seed, create_seed
@@ -574,11 +573,11 @@ def depends_on_past(self) -> bool:
574573

575574
@property
576575
def forward_only(self) -> bool:
577-
return isinstance(self.kind, _Incremental) and self.kind.forward_only
576+
return getattr(self.kind, "forward_only", False)
578577

579578
@property
580579
def disable_restatement(self) -> bool:
581-
return isinstance(self.kind, _Incremental) and self.kind.disable_restatement
580+
return getattr(self.kind, "disable_restatement", False)
582581

583582
def validate_definition(self) -> None:
584583
"""Validates the model's definition.
@@ -620,7 +619,17 @@ def validate_definition(self) -> None:
620619

621620
if self.kind.is_incremental_by_time_range and not self.time_column:
622621
raise_config_error(
623-
"Incremental by time range models must have a time_column field.",
622+
"Incremental by time range models must have a time_column field",
623+
self._path,
624+
)
625+
626+
if (
627+
self.kind.is_incremental_unmanaged
628+
and getattr(self.kind, "insert_overwrite", False)
629+
and not self.partitioned_by_
630+
):
631+
raise_config_error(
632+
"Unmanaged incremental models with insert / overwrite enabled must specify the partitioned_by field",
624633
self._path,
625634
)
626635

sqlmesh/core/model/kind.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import sys
34
import typing as t
45
from enum import Enum
56

@@ -13,6 +14,11 @@
1314
from sqlmesh.utils.errors import ConfigError
1415
from sqlmesh.utils.pydantic import PydanticModel
1516

17+
if sys.version_info >= (3, 9):
18+
from typing import Literal
19+
else:
20+
from typing_extensions import Literal
21+
1622

1723
class ModelKindMixin:
1824
@property
@@ -28,6 +34,10 @@ def is_incremental_by_time_range(self) -> bool:
2834
def is_incremental_by_unique_key(self) -> bool:
2935
return self.model_kind_name == ModelKindName.INCREMENTAL_BY_UNIQUE_KEY
3036

37+
@property
38+
def is_incremental_unmanaged(self) -> bool:
39+
return self.model_kind_name == ModelKindName.INCREMENTAL_UNMANAGED
40+
3141
@property
3242
def is_full(self) -> bool:
3343
return self.model_kind_name == ModelKindName.FULL
@@ -60,14 +70,15 @@ def is_materialized(self) -> bool:
6070
@property
6171
def only_latest(self) -> bool:
6272
"""Whether or not this model only cares about latest date to render."""
63-
return self.is_view or self.is_full
73+
return self.is_view or self.is_full or self.is_incremental_unmanaged
6474

6575

6676
class ModelKindName(str, ModelKindMixin, Enum):
6777
"""The kind of model, determining how this data is computed and stored in the warehouse."""
6878

6979
INCREMENTAL_BY_TIME_RANGE = "INCREMENTAL_BY_TIME_RANGE"
7080
INCREMENTAL_BY_UNIQUE_KEY = "INCREMENTAL_BY_UNIQUE_KEY"
81+
INCREMENTAL_UNMANAGED = "INCREMENTAL_UNMANAGED"
7182
FULL = "FULL"
7283
VIEW = "VIEW"
7384
EMBEDDED = "EMBEDDED"
@@ -246,6 +257,14 @@ def _parse_unique_key(cls, v: t.Any) -> t.List[str]:
246257
return [i.this if isinstance(i, exp.Identifier) else str(i) for i in v]
247258

248259

260+
class IncrementalUnmanagedKind(ModelKind):
261+
name: ModelKindName = Field(ModelKindName.INCREMENTAL_UNMANAGED, const=True)
262+
insert_overwrite: bool = False
263+
disable_restatement: Literal[True] = True
264+
265+
_bool_validator = bool_validator
266+
267+
249268
class ViewKind(ModelKind):
250269
name: ModelKindName = Field(ModelKindName.VIEW, const=True)
251270
materialized: bool = False

sqlmesh/core/plan/definition.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,13 +441,18 @@ def _add_restatements(self, restate_models: t.Iterable[str]) -> None:
441441

442442
snapshots = self.context_diff.snapshots
443443
downstream = [
444-
d
445-
for d in downstream
446-
if snapshots[d].is_materialized
447-
and (not snapshots[d].model.disable_restatement or self.is_dev)
448-
and not snapshots[d].is_seed
444+
d for d in downstream if snapshots[d].is_materialized and not snapshots[d].is_seed
449445
]
450446

447+
if not self.is_dev:
448+
models_with_disabled_restatement = [
449+
f"'{d}'" for d in downstream if snapshots[d].model.disable_restatement
450+
]
451+
if models_with_disabled_restatement:
452+
raise PlanError(
453+
f"Restatement is disabled for models: {', '.join(models_with_disabled_restatement)}."
454+
)
455+
451456
if not downstream:
452457
raise PlanError(
453458
f"Cannot restate from '{table}'. Either such model doesn't exist, no other materialized model references it, or restatement was disabled fror this model."

sqlmesh/core/snapshot/evaluator.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from sqlmesh.core.audit import BUILT_IN_AUDITS, AuditResult
3535
from sqlmesh.core.engine_adapter import EngineAdapter, TransactionType
3636
from sqlmesh.core.engine_adapter.base import InsertOverwriteStrategy
37-
from sqlmesh.core.model import Model, ViewKind
37+
from sqlmesh.core.model import IncrementalUnmanagedKind, Model, ViewKind
3838
from sqlmesh.core.snapshot import (
3939
QualifiedViewName,
4040
Snapshot,
@@ -130,6 +130,7 @@ def apply(query_or_df: QueryOrDF, index: int = 0) -> None:
130130
start=start,
131131
end=end,
132132
latest=latest,
133+
has_intervals=bool(snapshot.intervals),
133134
**kwargs,
134135
)
135136

@@ -488,6 +489,8 @@ def _evaluation_strategy(snapshot: SnapshotInfoLike, adapter: EngineAdapter) ->
488489
klass = IncrementalByTimeRangeStrategy
489490
elif snapshot.is_incremental_by_unique_key:
490491
klass = IncrementalByUniqueKeyStrategy
492+
elif snapshot.is_incremental_unmanaged:
493+
klass = IncrementalUnmanagedStrategy
491494
elif snapshot.is_view:
492495
klass = ViewStrategy
493496
else:
@@ -786,6 +789,24 @@ def append(
786789
)
787790

788791

792+
class IncrementalUnmanagedStrategy(MaterializableStrategy):
793+
def insert(
794+
self,
795+
model: Model,
796+
name: str,
797+
query_or_df: QueryOrDF,
798+
snapshots: t.Dict[str, Snapshot],
799+
is_dev: bool,
800+
**kwargs: t.Any,
801+
) -> None:
802+
if isinstance(model.kind, IncrementalUnmanagedKind) and model.kind.insert_overwrite:
803+
self.adapter.insert_overwrite(
804+
name, query_or_df, columns_to_types=model.columns_to_types
805+
)
806+
else:
807+
self.append(model, name, query_or_df, snapshots, is_dev, **kwargs)
808+
809+
789810
class FullRefreshStrategy(MaterializableStrategy):
790811
def insert(
791812
self,

sqlmesh/dbt/builtin.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,6 @@ def env_var(name: str, default: t.Optional[str] = None) -> t.Optional[str]:
145145
return os.environ.get(name, default)
146146

147147

148-
def is_incremental() -> bool:
149-
return False
150-
151-
152148
def log(msg: str, info: bool = False) -> str:
153149
print(msg)
154150
return ""
@@ -264,7 +260,6 @@ def _try_literal_eval(value: str) -> t.Any:
264260
"flags": Flags(),
265261
"fromjson": from_json,
266262
"fromyaml": from_yaml,
267-
"is_incremental": is_incremental,
268263
"log": no_log,
269264
"modules": Modules(),
270265
"print": no_log,
@@ -320,6 +315,9 @@ def create_builtin_globals(
320315
if variables is not None:
321316
builtin_globals["var"] = generate_var(variables)
322317

318+
is_incremental = jinja_globals.pop("has_intervals", False)
319+
builtin_globals["is_incremental"] = lambda: is_incremental
320+
323321
builtin_globals["builtins"] = AttributeDict(
324322
{k: builtin_globals.get(k) for k in ("ref", "source", "config")}
325323
)

sqlmesh/dbt/model.py

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import typing as t
55

6-
from pydantic import Field, validator
6+
from pydantic import validator
77
from sqlglot import exp
88
from sqlglot.helper import ensure_list
99

@@ -12,6 +12,7 @@
1212
from sqlmesh.core.model import (
1313
IncrementalByTimeRangeKind,
1414
IncrementalByUniqueKeyKind,
15+
IncrementalUnmanagedKind,
1516
Model,
1617
ModelKind,
1718
ModelKindName,
@@ -66,7 +67,7 @@ class ModelConfig(BaseModelConfig):
6667

6768
# sqlmesh fields
6869
sql: SqlStr = SqlStr("")
69-
time_column_: t.Optional[str] = Field(None, alias="time_column")
70+
time_column: t.Optional[str] = None
7071
cron: t.Optional[str] = None
7172
dialect: t.Optional[str] = None
7273
batch_size: t.Optional[int] = None
@@ -115,22 +116,10 @@ def _validate_partition_by(cls, v: t.Any) -> t.Union[t.List[str], t.Dict[str, t.
115116
**BaseModelConfig._FIELD_UPDATE_STRATEGY,
116117
**{
117118
"sql": UpdateStrategy.IMMUTABLE,
118-
"time_column_": UpdateStrategy.IMMUTABLE,
119+
"time_column": UpdateStrategy.IMMUTABLE,
119120
},
120121
}
121122

122-
@property
123-
def time_column(self) -> t.Optional[str]:
124-
if self.time_column_:
125-
return self.time_column_
126-
if (
127-
isinstance(self.partition_by, dict)
128-
and self.partition_by["data_type"] != "int64"
129-
and self.incremental_strategy in INCREMENTAL_BY_TIME_STRATEGIES
130-
):
131-
return self.partition_by["field"]
132-
return None
133-
134123
@property
135124
def model_dialect(self) -> t.Optional[str]:
136125
return self.dialect or self.meta.get("dialect", None)
@@ -165,7 +154,7 @@ def model_kind(self, target: TargetConfig) -> ModelKind:
165154
is_supported = True
166155
if strategy not in INCREMENTAL_BY_TIME_STRATEGIES:
167156
logger.warning(
168-
"SQLMesh IncrementalByTime is not compatible with '%s' incremental strategy in model '%s'. Supported strategies include %s.",
157+
"SQLMesh incremental by time strategy is not compatible with '%s' incremental strategy in model '%s'. Supported strategies include %s.",
169158
strategy,
170159
self.sql_name,
171160
collection_to_str(INCREMENTAL_BY_TIME_STRATEGIES),
@@ -178,6 +167,7 @@ def model_kind(self, target: TargetConfig) -> ModelKind:
178167
forward_only=not is_supported,
179168
disable_restatement=not is_supported,
180169
)
170+
181171
if self.unique_key:
182172
strategy = self.incremental_strategy or target.default_incremental_strategy(
183173
IncrementalByUniqueKeyKind
@@ -187,16 +177,21 @@ def model_kind(self, target: TargetConfig) -> ModelKind:
187177
and strategy not in INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES
188178
):
189179
raise ConfigError(
190-
f"{self.sql_name}: SQLMesh IncrementalByUniqueKey is not compatible with '{strategy}'"
180+
f"{self.sql_name}: SQLMesh incremental by unique key strategy is not compatible with '{strategy}'"
191181
f" incremental strategy. Supported strategies include {collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES)}."
192182
)
193183
return IncrementalByUniqueKeyKind(unique_key=self.unique_key, **incremental_kwargs)
194184

195-
raise ConfigError(
196-
f"{self.sql_name}: Incremental materialization requires either a "
197-
f"time_column ({collection_to_str(INCREMENTAL_BY_TIME_STRATEGIES)}) or a "
198-
f"unique_key ({collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES.union(['none']))}) configuration."
185+
logger.warning(
186+
"Using unmanaged incremental materialization for model '%s'. Some features might not be available. Consider adding either a time_column (%s) or a unique_key (%s) configuration to mitigate this",
187+
self.sql_name,
188+
collection_to_str(INCREMENTAL_BY_TIME_STRATEGIES),
189+
collection_to_str(INCREMENTAL_BY_UNIQUE_KEY_STRATEGIES.union(["none"])),
190+
)
191+
strategy = self.incremental_strategy or target.default_incremental_strategy(
192+
IncrementalUnmanagedKind
199193
)
194+
return IncrementalUnmanagedKind(insert_overwrite=strategy == "insert_overwrite")
200195
if materialization == Materialization.EPHEMERAL:
201196
return ModelKind(name=ModelKindName.EMBEDDED)
202197
raise ConfigError(f"{materialization.value} materialization not supported.")

0 commit comments

Comments
 (0)