-
Notifications
You must be signed in to change notification settings - Fork 378
Expand file tree
/
Copy pathtest_dbt.py
More file actions
102 lines (82 loc) · 3.48 KB
/
test_dbt.py
File metadata and controls
102 lines (82 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from __future__ import annotations
import typing as t
import pytest
import time_machine
from sqlmesh.core.context import Context
from sqlmesh.core.model import (
IncrementalUnmanagedKind,
)
from sqlmesh.core.snapshot import (
DeployabilityIndex,
)
if t.TYPE_CHECKING:
pass
pytestmark = pytest.mark.slow
@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_dbt_is_incremental_table_is_missing(sushi_test_dbt_context: Context):
context = sushi_test_dbt_context
model = context.get_model("sushi.waiter_revenue_by_day_v2")
model = model.copy(update={"kind": IncrementalUnmanagedKind(), "start": "2023-01-01"})
context.upsert_model(model)
context._standalone_audits["sushi.test_top_waiters"].start = "2023-01-01"
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
snapshot = context.get_snapshot("sushi.waiter_revenue_by_day_v2")
assert snapshot
# Manually drop the table
context.engine_adapter.drop_table(snapshot.table_name())
context.snapshot_evaluator.evaluate(
snapshot,
start="2023-01-01",
end="2023-01-08",
execution_time="2023-01-08 15:00:00",
snapshots={s.name: s for s in context.snapshots.values()},
deployability_index=DeployabilityIndex.all_deployable(),
)
# Make sure the table was recreated
assert context.engine_adapter.table_exists(snapshot.table_name())
def test_model_attr(sushi_test_dbt_context: Context, assert_exp_eq):
context = sushi_test_dbt_context
model = context.get_model("sushi.top_waiters")
assert_exp_eq(
model.render_query(),
"""
SELECT
CAST("waiter_id" AS INT) AS "waiter_id",
CAST("revenue" AS DOUBLE) AS "revenue",
3 AS "model_columns"
FROM "memory"."sushi"."waiter_revenue_by_day_v2" AS "waiter_revenue_by_day_v2"
WHERE
"ds" = (
SELECT
MAX("ds")
FROM "memory"."sushi"."waiter_revenue_by_day_v2" AS "waiter_revenue_by_day_v2"
)
ORDER BY
"revenue" DESC NULLS FIRST
LIMIT 10
""",
)
@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_dbt_requirements(sushi_dbt_context: Context):
assert set(sushi_dbt_context.requirements) == {"dbt-core", "dbt-duckdb"}
assert sushi_dbt_context.requirements["dbt-core"].startswith("1.")
assert sushi_dbt_context.requirements["dbt-duckdb"].startswith("1.")
@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_dbt_dialect_with_normalization_strategy(init_and_plan_context: t.Callable):
context, _ = init_and_plan_context(
"tests/fixtures/dbt/sushi_test", config="test_config_with_normalization_strategy"
)
assert context.default_dialect == "duckdb,normalization_strategy=LOWERCASE"
@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_dbt_before_all_with_var_ref_source(init_and_plan_context: t.Callable):
_, plan = init_and_plan_context(
"tests/fixtures/dbt/sushi_test", config="test_config_with_normalization_strategy"
)
environment_statements = plan.to_evaluatable().environment_statements
assert environment_statements
rendered_statements = [e.render_before_all(dialect="duckdb") for e in environment_statements]
assert rendered_statements[0] == [
"CREATE TABLE IF NOT EXISTS analytic_stats (physical_table TEXT, evaluation_time TEXT)",
"CREATE TABLE IF NOT EXISTS to_be_executed_last (col TEXT)",
"SELECT 1 AS var, 'items' AS src, 'waiters' AS ref",
]