88def _microbatch_source_model_sql () -> str :
99 return """
1010{{ config(event_time='order_date') }}
11+ {% set event_time_data_type = 'datetime2' if target.type == 'sqlserver' else 'timestamp' %}
1112
1213select
1314 1 as order_id,
1415 1 as customer_id,
1516 42 as amount,
16- cast('2024-01-01 00:00:00' as timestamp ) as order_date
17+ cast('2024-01-01 00:00:00' as {{ event_time_data_type }} ) as order_date
1718union all
1819select
1920 2 as order_id,
2021 2 as customer_id,
2122 84 as amount,
22- cast('2025-01-01 00:00:00' as timestamp ) as order_date
23+ cast('2025-01-01 00:00:00' as {{ event_time_data_type }} ) as order_date
2324"""
2425
2526
@@ -30,8 +31,7 @@ def _microbatch_model_sql(source_model_name: str) -> str:
3031 "incremental_strategy": "microbatch",
3132 "event_time": "order_date",
3233 "batch_size": "year",
33- "begin": "2024-01-01",
34- "unique_key": "order_id"
34+ "begin": "2024-01-01"
3535} %}
3636{% if target.type == "bigquery" %}
3737 {% do model_config.update(
@@ -41,6 +41,9 @@ def _microbatch_model_sql(source_model_name: str) -> str:
4141{% if target.type == "athena" %}
4242 {% do model_config.update({"partitioned_by": ["order_date"]}) %}
4343{% endif %}
44+ {% if target.type != "duckdb" %}
45+ {% do model_config.update({"unique_key": "order_id"}) %}
46+ {% endif %}
4447{{ config(**model_config) }}
4548
4649select
@@ -53,17 +56,18 @@ def _microbatch_model_sql(source_model_name: str) -> str:
5356
5457
5558@contextmanager
56- def _with_microbatch_test_models (dbt_project : DbtProject , test_id : str ):
57- source_model_name = f"{ test_id } _source"
59+ def _with_microbatch_test_models (dbt_project : DbtProject , model_suffix : str ):
60+ source_model_name = f"mb_src_{ model_suffix } "
61+ target_model_name = f"mb_tgt_{ model_suffix } "
5862 source_model_path = dbt_project .tmp_models_dir_path .joinpath (f"{ source_model_name } .sql" )
59- target_model_path = dbt_project .tmp_models_dir_path .joinpath (f"{ test_id } .sql" )
63+ target_model_path = dbt_project .tmp_models_dir_path .joinpath (f"{ target_model_name } .sql" )
6064
6165 source_model_path .write_text (_microbatch_source_model_sql ())
6266 target_model_path .write_text (_microbatch_model_sql (source_model_name ))
6367 relative_source_model_path = source_model_path .relative_to (dbt_project .project_dir_path )
6468 relative_target_model_path = target_model_path .relative_to (dbt_project .project_dir_path )
6569 try :
66- yield relative_source_model_path , relative_target_model_path
70+ yield relative_source_model_path , relative_target_model_path , target_model_name
6771 finally :
6872 if source_model_path .exists ():
6973 source_model_path .unlink ()
@@ -72,17 +76,18 @@ def _with_microbatch_test_models(dbt_project: DbtProject, test_id: str):
7276
7377
7478def _run_microbatch_model_and_get_latest_success_result (
75- dbt_project : DbtProject , test_id : str
79+ dbt_project : DbtProject , model_suffix : str
7680):
77- with _with_microbatch_test_models (dbt_project , test_id ) as (
81+ with _with_microbatch_test_models (dbt_project , model_suffix ) as (
7882 source_model_path ,
7983 model_path ,
84+ target_model_name ,
8085 ):
8186 dbt_project .dbt_runner .run (
8287 select = f"{ source_model_path } { model_path } "
8388 )
8489
85- unique_id = f"model.elementary_tests.{ test_id } "
90+ unique_id = f"model.elementary_tests.{ target_model_name } "
8691 run_results = dbt_project .read_table (
8792 "dbt_run_results" ,
8893 where = f"unique_id = '{ unique_id } ' and status = 'success'" ,
@@ -113,35 +118,34 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str):
113118 macro_path .unlink ()
114119
115120
116- @pytest .mark .skip_targets (["vertica" ])
117- @pytest .mark .skip_for_dbt_fusion
118- def test_microbatch_run_results_has_compiled_code (test_id : str , dbt_project : DbtProject ):
119- dbt_project .dbt_runner .vars ["disable_run_results" ] = False
120-
121- with _with_microbatch_macro_file (dbt_project , "get_incremental_microbatch_sql" ):
122- run_results = _run_microbatch_model_and_get_latest_success_result (
123- dbt_project , test_id
124- )
125- assert run_results , "Expected a successful run result row for microbatch model"
126- assert run_results [0 ]["compiled_code" ], (
127- "Expected compiled_code to be populated for successful microbatch model run result"
128- )
129-
130-
131- @pytest .mark .skip_targets (["vertica" ])
121+ @pytest .mark .skip_targets (["vertica" , "bigquery" , "athena" , "clickhouse" ])
132122@pytest .mark .skip_for_dbt_fusion
133- def test_microbatch_run_results_without_override_has_empty_compiled_code (
134- test_id : str , dbt_project : DbtProject
123+ @pytest .mark .parametrize (
124+ "macro_name,expected_compiled_code,model_suffix" ,
125+ [
126+ ("get_incremental_microbatch_sql" , True , "with_override" ),
127+ ("get_incremental_microbatch_sql_not_used" , False , "without_override" ),
128+ ],
129+ ids = ["with_override" , "without_override" ],
130+ )
131+ def test_microbatch_run_results_compiled_code_behavior (
132+ dbt_project : DbtProject ,
133+ macro_name : str ,
134+ expected_compiled_code : bool ,
135+ model_suffix : str ,
135136):
136137 dbt_project .dbt_runner .vars ["disable_run_results" ] = False
137138
138- with _with_microbatch_macro_file (
139- dbt_project , "get_incremental_microbatch_sql_not_used"
140- ):
139+ with _with_microbatch_macro_file (dbt_project , macro_name ):
141140 run_results = _run_microbatch_model_and_get_latest_success_result (
142- dbt_project , test_id
141+ dbt_project , model_suffix
143142 )
144143 assert run_results , "Expected a successful run result row for microbatch model"
145- assert not run_results [0 ]["compiled_code" ], (
146- "Expected compiled_code to stay empty when microbatch override macro is absent"
147- )
144+ if expected_compiled_code :
145+ assert run_results [0 ]["compiled_code" ], (
146+ "Expected compiled_code to be populated when override macro is present"
147+ )
148+ else :
149+ assert not run_results [0 ]["compiled_code" ], (
150+ "Expected compiled_code to stay empty when override macro is absent"
151+ )
0 commit comments