Skip to content

Commit 7a2b542

Browse files
feat: add BigQuery partitioning and clustering to data_monitoring_metrics (#972)
- Rename get_default_partition_by -> get_partition_by(column='created_at') with a column parameter so models can specify their partition column. A backward-compatible alias get_default_partition_by is kept. - Create new get_cluster_by(columns) macro with adapter dispatch pattern. Returns columns list on BigQuery, none on other adapters. Controlled by a separate bigquery_disable_clustering flag. - Apply partition_by (on bucket_end) and cluster_by (full_table_name, metric_name) to data_monitoring_metrics. - Update existing callers (dbt_run_results, dbt_invocations) to use the renamed get_partition_by macro. - Add integration tests for data_monitoring_metrics partitioning and clustering on BigQuery. Resolves ELE-5300 Co-authored-by: Itamar Hartstein <haritamar@gmail.com> Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent f820e11 commit 7a2b542

File tree

7 files changed

+65
-7
lines changed

7 files changed

+65
-7
lines changed

integration_tests/tests/test_dbt_artifacts/test_artifacts.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def test_run_results_partitioned(dbt_project: DbtProject):
222222
)
223223
assert len(results) >= 1
224224

225-
# Verify the partition column is created_at in BigQuery
225+
# Verify the partition column is created_at in BigQuery (uses get_partition_by default)
226226
partition_cols = dbt_project.run_query(
227227
"SELECT column_name "
228228
"FROM `{{ ref('dbt_run_results').database }}.{{ ref('dbt_run_results').schema }}.INFORMATION_SCHEMA.COLUMNS` "
@@ -255,3 +255,39 @@ def test_dbt_invocations_partitioned(dbt_project: DbtProject):
255255
assert [row["column_name"] for row in partition_cols] == [
256256
"created_at"
257257
], "dbt_invocations should be partitioned by created_at in BigQuery"
258+
259+
260+
@pytest.mark.only_on_targets(["bigquery"])
261+
def test_data_monitoring_metrics_partitioned(dbt_project: DbtProject):
262+
# data_monitoring_metrics is partitioned by bucket_end on BigQuery.
263+
# Full-refresh to ensure the table is created with partitioning.
264+
dbt_project.dbt_runner.run(select="data_monitoring_metrics", full_refresh=True)
265+
266+
partition_cols = dbt_project.run_query(
267+
"SELECT column_name "
268+
"FROM `{{ ref('data_monitoring_metrics').database }}.{{ ref('data_monitoring_metrics').schema }}.INFORMATION_SCHEMA.COLUMNS` "
269+
"WHERE table_name = '{{ ref('data_monitoring_metrics').identifier }}' "
270+
"AND is_partitioning_column = 'YES'"
271+
)
272+
assert [row["column_name"] for row in partition_cols] == [
273+
"bucket_end"
274+
], "data_monitoring_metrics should be partitioned by bucket_end in BigQuery"
275+
276+
277+
@pytest.mark.only_on_targets(["bigquery"])
278+
def test_data_monitoring_metrics_clustered(dbt_project: DbtProject):
279+
# data_monitoring_metrics is clustered by full_table_name and metric_name on BigQuery.
280+
# Full-refresh to ensure the table is created with clustering.
281+
dbt_project.dbt_runner.run(select="data_monitoring_metrics", full_refresh=True)
282+
283+
clustering_cols = dbt_project.run_query(
284+
"SELECT column_name "
285+
"FROM `{{ ref('data_monitoring_metrics').database }}.{{ ref('data_monitoring_metrics').schema }}.INFORMATION_SCHEMA.COLUMNS` "
286+
"WHERE table_name = '{{ ref('data_monitoring_metrics').identifier }}' "
287+
"AND clustering_ordinal_position IS NOT NULL "
288+
"ORDER BY clustering_ordinal_position"
289+
)
290+
assert [row["column_name"] for row in clustering_cols] == [
291+
"full_table_name",
292+
"metric_name",
293+
], "data_monitoring_metrics should be clustered by full_table_name, metric_name in BigQuery"

macros/edr/system/system_utils/get_config_var.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
"disable_samples_on_pii_tags": false,
145145
"pii_tags": ["pii"],
146146
"bigquery_disable_partitioning": false,
147+
"bigquery_disable_clustering": false,
147148
"upload_only_current_project_artifacts": false,
148149
} %}
149150
{{- return(default_config) -}}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{% macro get_cluster_by(columns) %}
2+
{% do return(adapter.dispatch("get_cluster_by", "elementary")(columns)) %}
3+
{% endmacro %}
4+
5+
{%- macro bigquery__get_cluster_by(columns) %}
6+
{% if not elementary.get_config_var("bigquery_disable_clustering") %}
7+
{% do return(columns) %}
8+
{% endif %}
9+
{% do return(none) %}
10+
{% endmacro %}
11+
12+
{% macro default__get_cluster_by(columns) %} {% do return(none) %} {% endmacro %}
Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
1+
{% macro get_partition_by(column="created_at") %}
2+
{% do return(adapter.dispatch("get_partition_by", "elementary")(column)) %}
3+
{% endmacro %}
4+
5+
{# Backward-compatible alias so existing user overrides / references keep working. #}
16
{% macro get_default_partition_by() %}
2-
{% do return(adapter.dispatch("get_default_partition_by", "elementary")()) %}
7+
{% do return(elementary.get_partition_by()) %}
38
{% endmacro %}
49

5-
{%- macro bigquery__get_default_partition_by() %}
10+
{%- macro bigquery__get_partition_by(column) %}
611
{% if not elementary.get_config_var("bigquery_disable_partitioning") %}
712
{% do return(
813
{
9-
"field": "created_at",
14+
"field": column,
1015
"data_type": "timestamp",
1116
"granularity": "day",
1217
}
@@ -15,4 +20,4 @@
1520
{% do return(none) %}
1621
{% endmacro %}
1722

18-
{% macro default__get_default_partition_by() %} {% do return(none) %} {% endmacro %}
23+
{% macro default__get_partition_by(column) %} {% do return(none) %} {% endmacro %}

models/edr/data_monitoring/data_monitoring/data_monitoring_metrics.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
"timestamp_column": "created_at",
1414
"prev_timestamp_column": "updated_at",
1515
},
16+
partition_by=elementary.get_partition_by(column="bucket_end"),
17+
cluster_by=elementary.get_cluster_by(
18+
columns=["full_table_name", "metric_name"]
19+
),
1620
table_type=elementary.get_default_table_type(),
1721
incremental_strategy=elementary.get_default_incremental_strategy(),
1822
)

models/edr/dbt_artifacts/dbt_invocations.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
transient=False,
55
unique_key="invocation_id",
66
on_schema_change="append_new_columns",
7-
partition_by=elementary.get_default_partition_by(),
7+
partition_by=elementary.get_partition_by(),
88
full_refresh=elementary.get_config_var("elementary_full_refresh"),
99
meta={
1010
"timestamp_column": "created_at",

models/edr/dbt_artifacts/dbt_run_results.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
if target.type == "postgres"
1414
else []
1515
),
16-
partition_by=elementary.get_default_partition_by(),
16+
partition_by=elementary.get_partition_by(),
1717
full_refresh=elementary.get_config_var("elementary_full_refresh"),
1818
meta={
1919
"dedup_by_column": "model_execution_id",

0 commit comments

Comments
 (0)