Skip to content

Commit cf534c2

Browse files
authored
Feat!: Make Environment Suffix Target Configurable (#1220)
* wip - add environment suffix target * remove bad import * properly cleanup table suffix views * fix return type * add test covering basic promotion functionality * test cleanup expired views * add context janitor test * only demote if there are removed values * fix migration script * add integration test * update Airflow tests * document environment suffix target
1 parent 79b1183 commit cf534c2

38 files changed

Lines changed: 747 additions & 231 deletions

docs/reference/configuration.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,21 @@ adhere to at your organization and therefore need more control over the schema n
223223

224224
### View Schema Override
225225

226-
Coming soon. Please message us on [slack](https://tobikodata.com/slack) if you are interested in this feature so we can better understand your
227-
use case and make sure the new feature satisfies your needs.
226+
By default SQLMesh appends the environment name to the schema name when creating new environments. This can be changed
227+
to instead append a suffix at the end of table instead. This means that new environment views will be created in the
228+
same schema as production but be differentiated having their names end with `__<env>`.
229+
230+
Config example:
231+
232+
```yaml linenums="1"
233+
environment_suffix_target: table
234+
```
235+
236+
If you had a model name of `db.users`, and you were creating a `dev` environment, then the view would be created as `db.users__dev` instead of the default behavior of `db__dev.users`.
237+
238+
The default behavior of appending the suffix to schemas is recommended because it leaves production with a single clean
239+
interface for accessing the views. However if you are deploying SQLMesh in an environment with tight restrictions on
240+
schema creation then this can be a useful way of reducing the number of schemas that need to be created.
228241

229242
## Additional details
230243

examples/sushi/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
CategorizerConfig,
77
Config,
88
DuckDBConnectionConfig,
9+
EnvironmentSuffixTarget,
910
GatewayConfig,
1011
ModelDefaultsConfig,
1112
SparkConnectionConfig,
@@ -104,3 +105,10 @@
104105
],
105106
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
106107
)
108+
109+
110+
environment_suffix_config = Config(
111+
default_connection=DuckDBConnectionConfig(),
112+
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
113+
environment_suffix_target=EnvironmentSuffixTarget.TABLE,
114+
)

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from sqlmesh.core.config.categorizer import AutoCategorizationMode, CategorizerConfig
2+
from sqlmesh.core.config.common import EnvironmentSuffixTarget
23
from sqlmesh.core.config.connection import (
34
BigQueryConnectionConfig,
45
ConnectionConfig,

sqlmesh/core/config/common.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,31 @@
11
from __future__ import annotations
22

33
import typing as t
4+
from enum import Enum
45

56
from pydantic import validator
67

8+
from sqlmesh.utils import classproperty
79
from sqlmesh.utils.errors import ConfigError
810

911

12+
class EnvironmentSuffixTarget(str, Enum):
13+
SCHEMA = "schema"
14+
TABLE = "table"
15+
16+
@property
17+
def is_schema(self) -> bool:
18+
return self == EnvironmentSuffixTarget.SCHEMA
19+
20+
@property
21+
def is_table(self) -> bool:
22+
return self == EnvironmentSuffixTarget.TABLE
23+
24+
@classproperty
25+
def default(cls) -> EnvironmentSuffixTarget:
26+
return EnvironmentSuffixTarget.SCHEMA
27+
28+
1029
def _concurrent_tasks_validator(v: t.Any) -> int:
1130
if isinstance(v, str):
1231
v = int(v)

sqlmesh/core/config/root.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import typing as t
44

5-
from pydantic import root_validator, validator
5+
from pydantic import Field, root_validator, validator
66

77
from sqlmesh.core import constants as c
8+
from sqlmesh.core.config import EnvironmentSuffixTarget
89
from sqlmesh.core.config.base import BaseConfig, UpdateStrategy
910
from sqlmesh.core.config.categorizer import CategorizerConfig
1011
from sqlmesh.core.config.connection import ConnectionConfig, DuckDBConnectionConfig
@@ -41,6 +42,7 @@ class Config(BaseConfig):
4142
pinned_environments: A list of development environment names that should not be deleted by the janitor task.
4243
model_defaults: Default values for model definitions.
4344
include_unmodified: Indicates whether to include unmodified models in the target development environment.
45+
environment_suffix_target: Indicates whether to append the environment name to the schema or table name.
4446
"""
4547

4648
gateways: t.Union[t.Dict[str, GatewayConfig], GatewayConfig] = GatewayConfig()
@@ -63,6 +65,9 @@ class Config(BaseConfig):
6365
username: str = ""
6466
include_unmodified: bool = False
6567
physical_schema_override: t.Dict[str, str] = {}
68+
environment_suffix_target: EnvironmentSuffixTarget = Field(
69+
default=EnvironmentSuffixTarget.default
70+
)
6671

6772
_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
6873
"gateways": UpdateStrategy.KEY_UPDATE,

sqlmesh/core/console.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from rich.table import Table
2424
from rich.tree import Tree
2525

26-
from sqlmesh.core import constants as c
26+
from sqlmesh.core.environment import EnvironmentNamingInfo
2727
from sqlmesh.core.snapshot import Snapshot, SnapshotChangeCategory
2828
from sqlmesh.core.test import ModelTest
2929
from sqlmesh.utils import rich as srich
@@ -54,7 +54,11 @@ class Console(abc.ABC):
5454
with them when their input is needed."""
5555

5656
@abc.abstractmethod
57-
def start_evaluation_progress(self, batches: t.Dict[Snapshot, int], environment: str) -> None:
57+
def start_evaluation_progress(
58+
self,
59+
batches: t.Dict[Snapshot, int],
60+
environment_naming_info: EnvironmentNamingInfo,
61+
) -> None:
5862
"""Indicates that a new snapshot evaluation progress has begun."""
5963

6064
@abc.abstractmethod
@@ -180,7 +184,7 @@ def __init__(self, console: t.Optional[RichConsole] = None, **kwargs: t.Any) ->
180184
self.evaluation_model_progress: t.Optional[Progress] = None
181185
self.evaluation_model_tasks: t.Dict[str, TaskID] = {}
182186
self.evaluation_model_batches: t.Dict[Snapshot, int] = {}
183-
self.evaluation_environment: str = c.PROD
187+
self.evaluation_environment_naming_info = EnvironmentNamingInfo()
184188

185189
self.creation_progress: t.Optional[Progress] = None
186190
self.creation_task: t.Optional[TaskID] = None
@@ -202,7 +206,11 @@ def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
202206
def _confirm(self, message: str, **kwargs: t.Any) -> bool:
203207
return Confirm.ask(message, console=self.console, **kwargs)
204208

205-
def start_evaluation_progress(self, batches: t.Dict[Snapshot, int], enviornment: str) -> None:
209+
def start_evaluation_progress(
210+
self,
211+
batches: t.Dict[Snapshot, int],
212+
environment_naming_info: EnvironmentNamingInfo,
213+
) -> None:
206214
"""Indicates that a new snapshot evaluation progress has begun."""
207215
if not self.evaluation_progress_live:
208216
self.evaluation_total_progress = Progress(
@@ -234,11 +242,13 @@ def start_evaluation_progress(self, batches: t.Dict[Snapshot, int], enviornment:
234242
)
235243

236244
self.evaluation_model_batches = batches
237-
self.evaluation_environment = enviornment
245+
self.evaluation_environment_naming_info = environment_naming_info
238246

239247
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
240248
if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks:
241-
view_name = snapshot.qualified_view_name.for_environment(self.evaluation_environment)
249+
view_name = snapshot.qualified_view_name.for_environment(
250+
self.evaluation_environment_naming_info
251+
)
242252
self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task(
243253
f"Evaluating {view_name}...",
244254
view_name=view_name,
@@ -273,7 +283,7 @@ def stop_evaluation_progress(self, success: bool = True) -> None:
273283
self.evaluation_model_progress = None
274284
self.evaluation_model_tasks = {}
275285
self.evaluation_model_batches = {}
276-
self.evaluation_environment = c.PROD
286+
self.evaluation_environment_naming_info = EnvironmentNamingInfo()
277287

278288
def start_creation_progress(self, total_tasks: int) -> None:
279289
"""Indicates that a new creation progress has begun."""
@@ -504,7 +514,7 @@ def _show_missing_dates(self, plan: Plan) -> None:
504514
backfill = Tree("[bold]Models needing backfill (missing dates):")
505515
for missing in plan.missing_intervals:
506516
snapshot = plan.context_diff.snapshots[missing.snapshot_name]
507-
view_name = snapshot.qualified_view_name.for_environment(plan.environment_name)
517+
view_name = snapshot.qualified_view_name.for_environment(plan.environment_naming_info)
508518
backfill.add(f"{view_name}: {missing.format_intervals(snapshot.model.interval_unit)}")
509519
self._print(backfill)
510520

@@ -1079,7 +1089,7 @@ def _show_missing_dates(self, plan: Plan) -> None:
10791089
self._print("**Models needing backfill (missing dates):**\n\n")
10801090
for missing in plan.missing_intervals:
10811091
snapshot = plan.context_diff.snapshots[missing.snapshot_name]
1082-
view_name = snapshot.qualified_view_name.for_environment(plan.environment_name)
1092+
view_name = snapshot.qualified_view_name.for_environment(plan.environment_naming_info)
10831093
self._print(
10841094
f"* `{view_name}`: {missing.format_intervals(snapshot.model.interval_unit)}\n"
10851095
)
@@ -1149,13 +1159,19 @@ def _confirm(self, message: str, **kwargs: t.Any) -> bool:
11491159
self._print(message)
11501160
return super()._confirm("", **kwargs)
11511161

1152-
def start_evaluation_progress(self, batches: t.Dict[Snapshot, int], environment: str) -> None:
1162+
def start_evaluation_progress(
1163+
self,
1164+
batches: t.Dict[Snapshot, int],
1165+
environment_naming_info: EnvironmentNamingInfo,
1166+
) -> None:
11531167
self.evaluation_batches = batches
1154-
self.evaluation_environment = environment
1168+
self.evaluation_environment_naming_info = environment_naming_info
11551169

11561170
def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
11571171
if not self.evaluation_batch_progress.get(snapshot.name):
1158-
view_name = snapshot.qualified_view_name.for_environment(self.evaluation_environment)
1172+
view_name = snapshot.qualified_view_name.for_environment(
1173+
self.evaluation_environment_naming_info
1174+
)
11591175
self.evaluation_batch_progress[snapshot.name] = (view_name, 0)
11601176
print(f"Starting '{view_name}', Total batches: {self.evaluation_batches[snapshot]}")
11611177

sqlmesh/core/context.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
parse,
5858
)
5959
from sqlmesh.core.engine_adapter import EngineAdapter
60-
from sqlmesh.core.environment import Environment
60+
from sqlmesh.core.environment import Environment, EnvironmentNamingInfo
6161
from sqlmesh.core.loader import Loader, SqlMeshLoader, update_model_schemas
6262
from sqlmesh.core.macros import ExecutableOrMacro
6363
from sqlmesh.core.metric import Metric
@@ -77,7 +77,12 @@
7777
SnapshotFingerprint,
7878
to_table_mapping,
7979
)
80-
from sqlmesh.core.state_sync import CachingStateSync, StateReader, StateSync
80+
from sqlmesh.core.state_sync import (
81+
CachingStateSync,
82+
StateReader,
83+
StateSync,
84+
cleanup_expired_views,
85+
)
8186
from sqlmesh.core.table_diff import TableDiff
8287
from sqlmesh.core.test import get_all_model_tests, run_model_tests, run_tests
8388
from sqlmesh.core.user import User
@@ -406,7 +411,11 @@ def run(
406411
)
407412
try:
408413
self.scheduler(environment=environment).run(
409-
environment, start, end, execution_time, ignore_cron=ignore_cron
414+
environment,
415+
start=start,
416+
end=end,
417+
execution_time=execution_time,
418+
ignore_cron=ignore_cron,
410419
)
411420
except Exception as e:
412421
self.notification_target_manager.notify(
@@ -773,6 +782,7 @@ def plan(
773782
is_dev=environment != c.PROD,
774783
forward_only=forward_only,
775784
environment_ttl=environment_ttl,
785+
environment_suffix_target=self.config.environment_suffix_target,
776786
categorizer_config=self.auto_categorize_changes,
777787
auto_categorization_enabled=not no_auto_categorization,
778788
effective_from=effective_from,
@@ -1138,7 +1148,11 @@ def _model_tables(self) -> t.Dict[str, str]:
11381148
return {
11391149
name: snapshot.table_name()
11401150
if snapshot.version
1141-
else snapshot.qualified_view_name.for_environment(c.PROD)
1151+
else snapshot.qualified_view_name.for_environment(
1152+
EnvironmentNamingInfo(
1153+
name=c.PROD, suffix_target=self.config.environment_suffix_target
1154+
)
1155+
)
11421156
for name, snapshot in self.snapshots.items()
11431157
}
11441158

@@ -1185,14 +1199,7 @@ def _load_configs(
11851199

11861200
def _run_janitor(self) -> None:
11871201
expired_environments = self.state_sync.delete_expired_environments()
1188-
expired_schemas = {
1189-
snapshot.qualified_view_name.schema_for_environment(expired_environment.name)
1190-
for expired_environment in expired_environments
1191-
for snapshot in expired_environment.snapshots
1192-
}
1193-
for expired_schema in expired_schemas:
1194-
self.engine_adapter.drop_schema(expired_schema, ignore_if_not_exists=True, cascade=True)
1195-
1202+
cleanup_expired_views(self.engine_adapter, expired_environments)
11961203
expired_snapshots = self.state_sync.delete_expired_snapshots()
11971204
self.snapshot_evaluator.cleanup(expired_snapshots)
11981205

0 commit comments

Comments
 (0)