55from dbt_project import DbtProject
66
77
8- def _microbatch_model_sql () -> str :
8+ def _microbatch_source_model_sql () -> str :
9+ return """
10+ {{ config(event_time='order_date') }}
11+
12+ select
13+ 1 as order_id,
14+ 1 as customer_id,
15+ 42 as amount,
16+ cast('2024-01-01 00:00:00' as timestamp) as order_date
17+ union all
18+ select
19+ 2 as order_id,
20+ 2 as customer_id,
21+ 84 as amount,
22+ cast('2025-01-01 00:00:00' as timestamp) as order_date
23+ """
24+
25+
26+ def _microbatch_model_sql (source_model_name : str ) -> str :
927 return """
1028{% set model_config = {
1129 "materialized": "incremental",
1230 "incremental_strategy": "microbatch",
1331 "event_time": "order_date",
1432 "batch_size": "year",
15- "begin": "2025-03 -01",
33+ "begin": "2024-01 -01",
1634 "unique_key": "order_id"
1735} %}
1836{% if target.type == "bigquery" %}
@@ -26,20 +44,36 @@ def _microbatch_model_sql() -> str:
2644{{ config(**model_config) }}
2745
2846select
29- 1 as order_id,
30- 1 as customer_id,
31- 42 as amount,
32- {{ dbt.current_timestamp() }} as order_date
33- from {{ ref('one') }}
34- """
47+ order_id,
48+ customer_id,
49+ amount,
50+ order_date
51+ from {{ ref('__MICROBATCH_SOURCE_MODEL__') }}
52+ """ .replace ("__MICROBATCH_SOURCE_MODEL__" , source_model_name )
53+
54+
55+ @contextmanager
56+ def _with_microbatch_test_models (dbt_project : DbtProject , test_id : str ):
57+ source_model_name = f"{ test_id } _source"
58+ source_model_path = dbt_project .tmp_models_dir_path .joinpath (f"{ source_model_name } .sql" )
59+ target_model_path = dbt_project .tmp_models_dir_path .joinpath (f"{ test_id } .sql" )
60+
61+ source_model_path .write_text (_microbatch_source_model_sql ())
62+ target_model_path .write_text (_microbatch_model_sql (source_model_name ))
63+ relative_target_model_path = target_model_path .relative_to (dbt_project .project_dir_path )
64+ try :
65+ yield relative_target_model_path
66+ finally :
67+ if source_model_path .exists ():
68+ source_model_path .unlink ()
69+ if target_model_path .exists ():
70+ target_model_path .unlink ()
3571
3672
3773def _run_microbatch_model_and_get_latest_success_result (
3874 dbt_project : DbtProject , test_id : str
3975):
40- with dbt_project .create_temp_model_for_existing_table (
41- test_id , raw_code = _microbatch_model_sql ()
42- ) as model_path :
76+ with _with_microbatch_test_models (dbt_project , test_id ) as model_path :
4377 dbt_project .dbt_runner .run (select = str (model_path ))
4478
4579 unique_id = f"model.elementary_tests.{ test_id } "
0 commit comments