Skip to content

Commit 7d65c72

Browse files
feat: add weekly cleanup workflow for stale CI schemas (#943)
1 parent 64e62eb commit 7d65c72

8 files changed

Lines changed: 332 additions & 0 deletions

File tree

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
name: Cleanup stale CI schemas
2+
3+
on:
4+
schedule:
5+
# Every Sunday at 03:00 UTC
6+
- cron: "0 3 * * 0"
7+
workflow_dispatch:
8+
inputs:
9+
max-age-hours:
10+
type: string
11+
required: false
12+
default: "24"
13+
description: Drop schemas older than this many hours
14+
15+
env:
16+
TESTS_DIR: ${{ github.workspace }}/dbt-data-reliability/integration_tests
17+
18+
jobs:
19+
cleanup:
20+
runs-on: ubuntu-latest
21+
strategy:
22+
fail-fast: false
23+
matrix:
24+
warehouse-type:
25+
- snowflake
26+
- bigquery
27+
- redshift
28+
- databricks_catalog
29+
- athena
30+
steps:
31+
- name: Checkout dbt package
32+
uses: actions/checkout@v4
33+
with:
34+
path: dbt-data-reliability
35+
36+
- name: Setup Python
37+
uses: actions/setup-python@v6
38+
with:
39+
python-version: "3.10"
40+
cache: "pip"
41+
42+
- name: Install dbt
43+
run: >
44+
pip install
45+
"dbt-core"
46+
"dbt-${{ (matrix.warehouse-type == 'databricks_catalog' && 'databricks') || (matrix.warehouse-type == 'athena' && 'athena-community') || matrix.warehouse-type }}"
47+
48+
- name: Write dbt profiles
49+
env:
50+
CI_WAREHOUSE_SECRETS: ${{ secrets.CI_WAREHOUSE_SECRETS || '' }}
51+
run: |
52+
# The cleanup job doesn't create schemas, but generate_profiles.py
53+
# requires --schema-name. Use a dummy value.
54+
python "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/generate_profiles.py" \
55+
--template "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/profiles.yml.j2" \
56+
--output ~/.dbt/profiles.yml \
57+
--schema-name "cleanup_placeholder"
58+
59+
- name: Install dbt deps
60+
working-directory: ${{ env.TESTS_DIR }}/dbt_project
61+
run: dbt deps
62+
63+
- name: Symlink local elementary package
64+
run: ln -sfn ${{ github.workspace }}/dbt-data-reliability ${{ env.TESTS_DIR }}/dbt_project/dbt_packages/elementary
65+
66+
- name: Drop stale CI schemas
67+
working-directory: ${{ env.TESTS_DIR }}/dbt_project
68+
# Only dbt_ prefixed schemas are created in this repo's CI.
69+
# The elementary repo has its own workflow for py_ prefixed schemas.
70+
run: >
71+
dbt run-operation drop_stale_ci_schemas
72+
--args '{prefixes: ["dbt_"], max_age_hours: ${{ inputs.max-age-hours || '24' }}}'
73+
-t "${{ matrix.warehouse-type }}"
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{#
2+
drop_stale_ci_schemas – clean up timestamped CI schemas.
3+
4+
Schema naming convention produced by CI:
5+
<prefix><YYMMDD_HHMMSS>_<branch>_<hash>
6+
Examples:
7+
dbt_260228_112345_master_abcd1234
8+
py_260228_112345_master_abcd1234
9+
dbt_260228_112345_master_abcd1234_elementary (suffixed variant)
10+
11+
Call from a GitHub Actions workflow via:
12+
dbt run-operation drop_stale_ci_schemas \
13+
--args '{prefixes: ["dbt_", "py_"], max_age_hours: 24}'
14+
15+
Generic schema helpers (edr_list_schemas, edr_schema_exists) live in the
16+
schema_utils/ folder. edr_drop_schema lives in clear_env.sql. CI-specific
17+
helpers (parse_timestamp_from_ci_schema_name) live alongside this file.
18+
#}
19+
20+
{% macro drop_stale_ci_schemas(prefixes=none, max_age_hours=24) %}
21+
{% if prefixes is none or prefixes is string or prefixes | length == 0 %}
22+
{{ exceptions.raise_compiler_error(
23+
"drop_stale_ci_schemas: 'prefixes' is required and must be a "
24+
"non-empty list (e.g. ['dbt_', 'py_'])."
25+
) }}
26+
{% endif %}
27+
28+
{% set max_age_hours = max_age_hours | int %}
29+
{% set database = elementary.target_database() %}
30+
{% set all_schemas = edr_list_schemas(database) %}
31+
{# utcnow() is deprecated in Python 3.12+ but modules.datetime.timezone is not
32+
available in dbt's Jinja context. Both now and the constructed datetime are
33+
naive, so comparisons are safe. #}
34+
{% set now = modules.datetime.datetime.utcnow() %}
35+
{% set max_age_seconds = max_age_hours * 3600 %}
36+
{% set ns = namespace(dropped=0) %}
37+
38+
{{ log("CI schema cleanup: scanning " ~ all_schemas | length ~ " schema(s) in database '" ~ database ~ "' for prefixes " ~ prefixes | string, info=true) }}
39+
40+
{% for schema_name in all_schemas | sort %}
41+
{% set schema_ts = parse_timestamp_from_ci_schema_name(schema_name, prefixes) %}
42+
{% if schema_ts is not none %}
43+
{% set age_seconds = (now - schema_ts).total_seconds() %}
44+
{% if age_seconds > max_age_seconds %}
45+
{{ log(" DROP " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }}
46+
{% do edr_drop_schema(database, schema_name) %}
47+
{% set ns.dropped = ns.dropped + 1 %}
48+
{% else %}
49+
{{ log(" keep " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }}
50+
{% endif %}
51+
{% endif %}
52+
{% endfor %}
53+
54+
{{ log("CI schema cleanup complete. Dropped " ~ ns.dropped ~ " stale schema(s).", info=true) }}
55+
{% endmacro %}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{#
2+
Parses a CI schema name and returns the embedded timestamp as a datetime,
3+
or none if the name doesn't match the expected CI naming pattern.
4+
5+
Schema naming convention:
6+
<prefix><YY><MM><DD>_<HH><MI><SS>_<branch>_<hash>[_<suffix>]
7+
#}
8+
9+
{% macro parse_timestamp_from_ci_schema_name(schema_name, prefixes) %}
10+
{% set schema_lower = schema_name.lower() %}
11+
{% for prefix in prefixes %}
12+
{% if schema_lower.startswith(prefix.lower()) %}
13+
{% set remainder = schema_lower[prefix | length :] %}
14+
{% set match = modules.re.match(
15+
'^(?P<yy>\\d{2})(?P<mm>\\d{2})(?P<dd>\\d{2})_(?P<HH>\\d{2})(?P<MI>\\d{2})(?P<SS>\\d{2})_.+',
16+
remainder
17+
) %}
18+
{% if match %}
19+
{% set yy = match.group('yy') | int %}
20+
{% set mm = match.group('mm') | int %}
21+
{% set dd = match.group('dd') | int %}
22+
{% set HH = match.group('HH') | int %}
23+
{% set MI = match.group('MI') | int %}
24+
{% set SS = match.group('SS') | int %}
25+
{% if 1 <= mm <= 12 and 1 <= dd <= 31 and 0 <= HH <= 23 and 0 <= MI <= 59 and 0 <= SS <= 59 %}
26+
{% do return(modules.datetime.datetime(2000 + yy, mm, dd, HH, MI, SS)) %}
27+
{% endif %}
28+
{% endif %}
29+
{% endif %}
30+
{% endfor %}
31+
{% do return(none) %}
32+
{% endmacro %}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{#
2+
Integration-test helper for drop_stale_ci_schemas.
3+
4+
Creates two CI-style schemas (one with an old timestamp, one recent),
5+
runs the cleanup macro, checks which schemas survived, cleans up,
6+
and returns a JSON result dict.
7+
#}
8+
9+
{% macro test_drop_stale_ci_schemas() %}
10+
{% set database = elementary.target_database() %}
11+
{% set now = modules.datetime.datetime.utcnow() %}
12+
13+
{# Old schema: timestamp in the past (2020-01-01 00:00:00) #}
14+
{% set old_schema = 'dbt_200101_000000_citest_00000000' %}
15+
{# Recent schema: timestamp = now #}
16+
{% set recent_ts = now.strftime('%y%m%d_%H%M%S') %}
17+
{% set recent_schema = 'dbt_' ~ recent_ts ~ '_citest_11111111' %}
18+
19+
{{ log("TEST: creating old schema: " ~ old_schema, info=true) }}
20+
{{ log("TEST: creating recent schema: " ~ recent_schema, info=true) }}
21+
22+
{# ── Create both schemas ───────────────────────────────────────────── #}
23+
{% do edr_create_schema(database, old_schema) %}
24+
{% do edr_create_schema(database, recent_schema) %}
25+
26+
{# ── Verify both exist before running cleanup ──────────────────────── #}
27+
{% set old_exists_before = edr_schema_exists(database, old_schema) %}
28+
{% set recent_exists_before = edr_schema_exists(database, recent_schema) %}
29+
{{ log("TEST: old_exists_before=" ~ old_exists_before ~ ", recent_exists_before=" ~ recent_exists_before, info=true) }}
30+
31+
{# ── Run cleanup with a large threshold so only the artificially old
32+
schema (year 2020) is caught, and real CI schemas from parallel
33+
workers are safely below the threshold. ──────────────────────────── #}
34+
{% do drop_stale_ci_schemas(prefixes=['dbt_'], max_age_hours=8760) %}
35+
36+
{# ── Check which schemas survived ─────────────────────────────────── #}
37+
{% set old_exists_after = edr_schema_exists(database, old_schema) %}
38+
{% set recent_exists_after = edr_schema_exists(database, recent_schema) %}
39+
{{ log("TEST: old_exists_after=" ~ old_exists_after ~ ", recent_exists_after=" ~ recent_exists_after, info=true) }}
40+
41+
{# ── Cleanup: drop any remaining test schemas ─────────────────────── #}
42+
{% if old_exists_after is true %}
43+
{% do edr_drop_schema(database, old_schema) %}
44+
{% endif %}
45+
{% if recent_exists_after %}
46+
{% do edr_drop_schema(database, recent_schema) %}
47+
{% endif %}
48+
49+
{# ── Return results ────────────────────────────────────────────────── #}
50+
{% set results = {
51+
"old_exists_before": old_exists_before,
52+
"recent_exists_before": recent_exists_before,
53+
"old_dropped": not old_exists_after,
54+
"recent_kept": recent_exists_after
55+
} %}
56+
{% do return(results) %}
57+
{% endmacro %}
58+
59+
60+
{# ── Per-adapter schema creation ─────────────────────────────────────── #}
61+
62+
{% macro edr_create_schema(database, schema_name) %}
63+
{% do return(adapter.dispatch('edr_create_schema', 'elementary_tests')(database, schema_name)) %}
64+
{% endmacro %}
65+
66+
{% macro default__edr_create_schema(database, schema_name) %}
67+
{% set schema_relation = api.Relation.create(database=database, schema=schema_name) %}
68+
{% do dbt.create_schema(schema_relation) %}
69+
{% do adapter.commit() %}
70+
{% endmacro %}
71+
72+
{% macro clickhouse__edr_create_schema(database, schema_name) %}
73+
{% do run_query("CREATE DATABASE IF NOT EXISTS `" ~ schema_name ~ "`") %}
74+
{% do adapter.commit() %}
75+
{% endmacro %}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
{#
2+
Per-adapter schema listing, scoped to a given database.
3+
4+
Uses SQL queries instead of adapter.list_schemas() because that method
5+
is not available in dbt's run-operation context (RuntimeDatabaseWrapper
6+
does not expose it).
7+
#}
8+
9+
{% macro edr_list_schemas(database) %}
10+
{% do return(adapter.dispatch('edr_list_schemas', 'elementary_tests')(database)) %}
11+
{% endmacro %}
12+
13+
{% macro default__edr_list_schemas(database) %}
14+
{% set safe_db = database | replace("'", "''") %}
15+
{% set results = run_query("SELECT schema_name FROM information_schema.schemata WHERE lower(catalog_name) = lower('" ~ safe_db ~ "')") %}
16+
{% set schemas = [] %}
17+
{% for row in results %}
18+
{% do schemas.append(row[0]) %}
19+
{% endfor %}
20+
{% do return(schemas) %}
21+
{% endmacro %}
22+
23+
{% macro bigquery__edr_list_schemas(database) %}
24+
{% set safe_db = database | replace("`", "\`") %}
25+
{% set results = run_query("SELECT schema_name FROM `" ~ safe_db ~ "`.INFORMATION_SCHEMA.SCHEMATA") %}
26+
{% set schemas = [] %}
27+
{% for row in results %}
28+
{% do schemas.append(row[0]) %}
29+
{% endfor %}
30+
{% do return(schemas) %}
31+
{% endmacro %}
32+
33+
{% macro clickhouse__edr_list_schemas(database) %}
34+
{% set results = run_query('SHOW DATABASES') %}
35+
{% set schemas = [] %}
36+
{% for row in results %}
37+
{% do schemas.append(row[0]) %}
38+
{% endfor %}
39+
{% do return(schemas) %}
40+
{% endmacro %}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{#
2+
Per-adapter schema existence check, scoped to a given database.
3+
4+
Uses SQL queries instead of adapter.check_schema_exists() because that
5+
method is not available in dbt's run-operation context
6+
(RuntimeDatabaseWrapper does not expose it).
7+
#}
8+
9+
{% macro edr_schema_exists(database, schema_name) %}
10+
{% do return(adapter.dispatch('edr_schema_exists', 'elementary_tests')(database, schema_name)) %}
11+
{% endmacro %}
12+
13+
{% macro default__edr_schema_exists(database, schema_name) %}
14+
{% set safe_db = database | replace("'", "''") %}
15+
{% set safe_schema = schema_name | replace("'", "''") %}
16+
{% set result = run_query("SELECT schema_name FROM information_schema.schemata WHERE lower(catalog_name) = lower('" ~ safe_db ~ "') AND lower(schema_name) = lower('" ~ safe_schema ~ "')") %}
17+
{% do return(result | length > 0) %}
18+
{% endmacro %}
19+
20+
{% macro bigquery__edr_schema_exists(database, schema_name) %}
21+
{% set safe_db = database | replace("`", "\`") %}
22+
{% set safe_schema = schema_name | replace("'", "''") %}
23+
{% set result = run_query("SELECT schema_name FROM `" ~ safe_db ~ "`.INFORMATION_SCHEMA.SCHEMATA WHERE lower(schema_name) = lower('" ~ safe_schema ~ "')") %}
24+
{% do return(result | length > 0) %}
25+
{% endmacro %}
26+
27+
{% macro clickhouse__edr_schema_exists(database, schema_name) %}
28+
{% set safe_schema = schema_name | replace("'", "''") %}
29+
{% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ safe_schema ~ "' LIMIT 1") %}
30+
{% do return(result | length > 0) %}
31+
{% endmacro %}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
"""Integration test for elementary.drop_stale_ci_schemas macro."""
2+
3+
import json
4+
5+
import pytest
6+
from dbt_project import DbtProject
7+
8+
9+
@pytest.mark.skip_targets(["dremio"])
10+
def test_drop_stale_ci_schemas(dbt_project: DbtProject):
11+
"""Verify that old CI schemas are dropped and recent ones are kept."""
12+
result = dbt_project.dbt_runner.run_operation(
13+
"elementary_tests.test_drop_stale_ci_schemas",
14+
)
15+
assert result, "run_operation returned no output"
16+
data = json.loads(result[0])
17+
18+
assert data["old_exists_before"], "Setup failed: old schema was not created"
19+
assert data["recent_exists_before"], "Setup failed: recent schema was not created"
20+
assert data["old_dropped"], "Old schema should have been dropped by cleanup"
21+
assert data["recent_kept"], "Recent schema should have been kept by cleanup"

macros/utils/cross_db_utils/schema_exists.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
{% do return(adapter.check_schema_exists(database, schema)) %}
3030
{% endmacro %}
3131

32+
{% macro clickhouse__schema_exists(database, schema) %}
33+
{% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ schema ~ "' LIMIT 1") %}
34+
{% do return(result | length > 0) %}
35+
{% endmacro %}
36+
3237
{% macro default__schema_exists(database, schema) %}
3338
{% do return(adapter.check_schema_exists(database, schema)) %}
3439
{% endmacro %}

0 commit comments

Comments
 (0)