diff --git a/.github/workflows/cleanup-stale-schemas.yml b/.github/workflows/cleanup-stale-schemas.yml new file mode 100644 index 000000000..9498e0a01 --- /dev/null +++ b/.github/workflows/cleanup-stale-schemas.yml @@ -0,0 +1,73 @@ +name: Cleanup stale CI schemas + +on: + schedule: + # Every Sunday at 03:00 UTC + - cron: "0 3 * * 0" + workflow_dispatch: + inputs: + max-age-hours: + type: string + required: false + default: "24" + description: Drop schemas older than this many hours + +env: + TESTS_DIR: ${{ github.workspace }}/dbt-data-reliability/integration_tests + +jobs: + cleanup: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + warehouse-type: + - snowflake + - bigquery + - redshift + - databricks_catalog + - athena + steps: + - name: Checkout dbt package + uses: actions/checkout@v4 + with: + path: dbt-data-reliability + + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.10" + cache: "pip" + + - name: Install dbt + run: > + pip install + "dbt-core" + "dbt-${{ (matrix.warehouse-type == 'databricks_catalog' && 'databricks') || (matrix.warehouse-type == 'athena' && 'athena-community') || matrix.warehouse-type }}" + + - name: Write dbt profiles + env: + CI_WAREHOUSE_SECRETS: ${{ secrets.CI_WAREHOUSE_SECRETS || '' }} + run: | + # The cleanup job doesn't create schemas, but generate_profiles.py + # requires --schema-name. Use a dummy value. + python "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/generate_profiles.py" \ + --template "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/profiles.yml.j2" \ + --output ~/.dbt/profiles.yml \ + --schema-name "cleanup_placeholder" + + - name: Install dbt deps + working-directory: ${{ env.TESTS_DIR }}/dbt_project + run: dbt deps + + - name: Symlink local elementary package + run: ln -sfn ${{ github.workspace }}/dbt-data-reliability ${{ env.TESTS_DIR }}/dbt_project/dbt_packages/elementary + + - name: Drop stale CI schemas + working-directory: ${{ env.TESTS_DIR }}/dbt_project + # Only dbt_ prefixed schemas are created in this repo's CI. + # The elementary repo has its own workflow for py_ prefixed schemas. + run: > + dbt run-operation drop_stale_ci_schemas + --args '{prefixes: ["dbt_"], max_age_hours: ${{ inputs.max-age-hours || '24' }}}' + -t "${{ matrix.warehouse-type }}" diff --git a/integration_tests/dbt_project/macros/ci_schemas_cleanup/drop_stale_ci_schemas.sql b/integration_tests/dbt_project/macros/ci_schemas_cleanup/drop_stale_ci_schemas.sql new file mode 100644 index 000000000..4642dcb98 --- /dev/null +++ b/integration_tests/dbt_project/macros/ci_schemas_cleanup/drop_stale_ci_schemas.sql @@ -0,0 +1,55 @@ +{# + drop_stale_ci_schemas – clean up timestamped CI schemas. + + Schema naming convention produced by CI: + __ + Examples: + dbt_260228_112345_master_abcd1234 + py_260228_112345_master_abcd1234 + dbt_260228_112345_master_abcd1234_elementary (suffixed variant) + + Call from a GitHub Actions workflow via: + dbt run-operation drop_stale_ci_schemas \ + --args '{prefixes: ["dbt_", "py_"], max_age_hours: 24}' + + Generic schema helpers (edr_list_schemas, edr_schema_exists) live in the + schema_utils/ folder. edr_drop_schema lives in clear_env.sql. CI-specific + helpers (parse_timestamp_from_ci_schema_name) live alongside this file. +#} + +{% macro drop_stale_ci_schemas(prefixes=none, max_age_hours=24) %} + {% if prefixes is none or prefixes is string or prefixes | length == 0 %} + {{ exceptions.raise_compiler_error( + "drop_stale_ci_schemas: 'prefixes' is required and must be a " + "non-empty list (e.g. ['dbt_', 'py_'])." + ) }} + {% endif %} + + {% set max_age_hours = max_age_hours | int %} + {% set database = elementary.target_database() %} + {% set all_schemas = edr_list_schemas(database) %} + {# utcnow() is deprecated in Python 3.12+ but modules.datetime.timezone is not + available in dbt's Jinja context. Both now and the constructed datetime are + naive, so comparisons are safe. #} + {% set now = modules.datetime.datetime.utcnow() %} + {% set max_age_seconds = max_age_hours * 3600 %} + {% set ns = namespace(dropped=0) %} + + {{ log("CI schema cleanup: scanning " ~ all_schemas | length ~ " schema(s) in database '" ~ database ~ "' for prefixes " ~ prefixes | string, info=true) }} + + {% for schema_name in all_schemas | sort %} + {% set schema_ts = parse_timestamp_from_ci_schema_name(schema_name, prefixes) %} + {% if schema_ts is not none %} + {% set age_seconds = (now - schema_ts).total_seconds() %} + {% if age_seconds > max_age_seconds %} + {{ log(" DROP " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }} + {% do edr_drop_schema(database, schema_name) %} + {% set ns.dropped = ns.dropped + 1 %} + {% else %} + {{ log(" keep " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }} + {% endif %} + {% endif %} + {% endfor %} + + {{ log("CI schema cleanup complete. Dropped " ~ ns.dropped ~ " stale schema(s).", info=true) }} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/ci_schemas_cleanup/parse_timestamp_from_ci_schema_name.sql b/integration_tests/dbt_project/macros/ci_schemas_cleanup/parse_timestamp_from_ci_schema_name.sql new file mode 100644 index 000000000..ea07f4429 --- /dev/null +++ b/integration_tests/dbt_project/macros/ci_schemas_cleanup/parse_timestamp_from_ci_schema_name.sql @@ -0,0 +1,32 @@ +{# + Parses a CI schema name and returns the embedded timestamp as a datetime, + or none if the name doesn't match the expected CI naming pattern. + + Schema naming convention: +
___[_] +#} + +{% macro parse_timestamp_from_ci_schema_name(schema_name, prefixes) %} + {% set schema_lower = schema_name.lower() %} + {% for prefix in prefixes %} + {% if schema_lower.startswith(prefix.lower()) %} + {% set remainder = schema_lower[prefix | length :] %} + {% set match = modules.re.match( + '^(?P\\d{2})(?P\\d{2})(?P
\\d{2})_(?P\\d{2})(?P\\d{2})(?P\\d{2})_.+', + remainder + ) %} + {% if match %} + {% set yy = match.group('yy') | int %} + {% set mm = match.group('mm') | int %} + {% set dd = match.group('dd') | int %} + {% set HH = match.group('HH') | int %} + {% set MI = match.group('MI') | int %} + {% set SS = match.group('SS') | int %} + {% if 1 <= mm <= 12 and 1 <= dd <= 31 and 0 <= HH <= 23 and 0 <= MI <= 59 and 0 <= SS <= 59 %} + {% do return(modules.datetime.datetime(2000 + yy, mm, dd, HH, MI, SS)) %} + {% endif %} + {% endif %} + {% endif %} + {% endfor %} + {% do return(none) %} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql b/integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql new file mode 100644 index 000000000..1f77f9523 --- /dev/null +++ b/integration_tests/dbt_project/macros/ci_schemas_cleanup/test_drop_stale_ci_schemas.sql @@ -0,0 +1,75 @@ +{# + Integration-test helper for drop_stale_ci_schemas. + + Creates two CI-style schemas (one with an old timestamp, one recent), + runs the cleanup macro, checks which schemas survived, cleans up, + and returns a JSON result dict. +#} + +{% macro test_drop_stale_ci_schemas() %} + {% set database = elementary.target_database() %} + {% set now = modules.datetime.datetime.utcnow() %} + + {# Old schema: timestamp in the past (2020-01-01 00:00:00) #} + {% set old_schema = 'dbt_200101_000000_citest_00000000' %} + {# Recent schema: timestamp = now #} + {% set recent_ts = now.strftime('%y%m%d_%H%M%S') %} + {% set recent_schema = 'dbt_' ~ recent_ts ~ '_citest_11111111' %} + + {{ log("TEST: creating old schema: " ~ old_schema, info=true) }} + {{ log("TEST: creating recent schema: " ~ recent_schema, info=true) }} + + {# ── Create both schemas ───────────────────────────────────────────── #} + {% do edr_create_schema(database, old_schema) %} + {% do edr_create_schema(database, recent_schema) %} + + {# ── Verify both exist before running cleanup ──────────────────────── #} + {% set old_exists_before = edr_schema_exists(database, old_schema) %} + {% set recent_exists_before = edr_schema_exists(database, recent_schema) %} + {{ log("TEST: old_exists_before=" ~ old_exists_before ~ ", recent_exists_before=" ~ recent_exists_before, info=true) }} + + {# ── Run cleanup with a large threshold so only the artificially old + schema (year 2020) is caught, and real CI schemas from parallel + workers are safely below the threshold. ──────────────────────────── #} + {% do drop_stale_ci_schemas(prefixes=['dbt_'], max_age_hours=8760) %} + + {# ── Check which schemas survived ─────────────────────────────────── #} + {% set old_exists_after = edr_schema_exists(database, old_schema) %} + {% set recent_exists_after = edr_schema_exists(database, recent_schema) %} + {{ log("TEST: old_exists_after=" ~ old_exists_after ~ ", recent_exists_after=" ~ recent_exists_after, info=true) }} + + {# ── Cleanup: drop any remaining test schemas ─────────────────────── #} + {% if old_exists_after is true %} + {% do edr_drop_schema(database, old_schema) %} + {% endif %} + {% if recent_exists_after %} + {% do edr_drop_schema(database, recent_schema) %} + {% endif %} + + {# ── Return results ────────────────────────────────────────────────── #} + {% set results = { + "old_exists_before": old_exists_before, + "recent_exists_before": recent_exists_before, + "old_dropped": not old_exists_after, + "recent_kept": recent_exists_after + } %} + {% do return(results) %} +{% endmacro %} + + +{# ── Per-adapter schema creation ─────────────────────────────────────── #} + +{% macro edr_create_schema(database, schema_name) %} + {% do return(adapter.dispatch('edr_create_schema', 'elementary_tests')(database, schema_name)) %} +{% endmacro %} + +{% macro default__edr_create_schema(database, schema_name) %} + {% set schema_relation = api.Relation.create(database=database, schema=schema_name) %} + {% do dbt.create_schema(schema_relation) %} + {% do adapter.commit() %} +{% endmacro %} + +{% macro clickhouse__edr_create_schema(database, schema_name) %} + {% do run_query("CREATE DATABASE IF NOT EXISTS `" ~ schema_name ~ "`") %} + {% do adapter.commit() %} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql b/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql new file mode 100644 index 000000000..932c826e0 --- /dev/null +++ b/integration_tests/dbt_project/macros/schema_utils/list_schemas.sql @@ -0,0 +1,40 @@ +{# + Per-adapter schema listing, scoped to a given database. + + Uses SQL queries instead of adapter.list_schemas() because that method + is not available in dbt's run-operation context (RuntimeDatabaseWrapper + does not expose it). +#} + +{% macro edr_list_schemas(database) %} + {% do return(adapter.dispatch('edr_list_schemas', 'elementary_tests')(database)) %} +{% endmacro %} + +{% macro default__edr_list_schemas(database) %} + {% set safe_db = database | replace("'", "''") %} + {% set results = run_query("SELECT schema_name FROM information_schema.schemata WHERE lower(catalog_name) = lower('" ~ safe_db ~ "')") %} + {% set schemas = [] %} + {% for row in results %} + {% do schemas.append(row[0]) %} + {% endfor %} + {% do return(schemas) %} +{% endmacro %} + +{% macro bigquery__edr_list_schemas(database) %} + {% set safe_db = database | replace("`", "\`") %} + {% set results = run_query("SELECT schema_name FROM `" ~ safe_db ~ "`.INFORMATION_SCHEMA.SCHEMATA") %} + {% set schemas = [] %} + {% for row in results %} + {% do schemas.append(row[0]) %} + {% endfor %} + {% do return(schemas) %} +{% endmacro %} + +{% macro clickhouse__edr_list_schemas(database) %} + {% set results = run_query('SHOW DATABASES') %} + {% set schemas = [] %} + {% for row in results %} + {% do schemas.append(row[0]) %} + {% endfor %} + {% do return(schemas) %} +{% endmacro %} diff --git a/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql b/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql new file mode 100644 index 000000000..dcfba0f69 --- /dev/null +++ b/integration_tests/dbt_project/macros/schema_utils/schema_exists.sql @@ -0,0 +1,31 @@ +{# + Per-adapter schema existence check, scoped to a given database. + + Uses SQL queries instead of adapter.check_schema_exists() because that + method is not available in dbt's run-operation context + (RuntimeDatabaseWrapper does not expose it). +#} + +{% macro edr_schema_exists(database, schema_name) %} + {% do return(adapter.dispatch('edr_schema_exists', 'elementary_tests')(database, schema_name)) %} +{% endmacro %} + +{% macro default__edr_schema_exists(database, schema_name) %} + {% set safe_db = database | replace("'", "''") %} + {% set safe_schema = schema_name | replace("'", "''") %} + {% set result = run_query("SELECT schema_name FROM information_schema.schemata WHERE lower(catalog_name) = lower('" ~ safe_db ~ "') AND lower(schema_name) = lower('" ~ safe_schema ~ "')") %} + {% do return(result | length > 0) %} +{% endmacro %} + +{% macro bigquery__edr_schema_exists(database, schema_name) %} + {% set safe_db = database | replace("`", "\`") %} + {% set safe_schema = schema_name | replace("'", "''") %} + {% set result = run_query("SELECT schema_name FROM `" ~ safe_db ~ "`.INFORMATION_SCHEMA.SCHEMATA WHERE lower(schema_name) = lower('" ~ safe_schema ~ "')") %} + {% do return(result | length > 0) %} +{% endmacro %} + +{% macro clickhouse__edr_schema_exists(database, schema_name) %} + {% set safe_schema = schema_name | replace("'", "''") %} + {% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ safe_schema ~ "' LIMIT 1") %} + {% do return(result | length > 0) %} +{% endmacro %} diff --git a/integration_tests/tests/test_ci_cleanup.py b/integration_tests/tests/test_ci_cleanup.py new file mode 100644 index 000000000..702f62544 --- /dev/null +++ b/integration_tests/tests/test_ci_cleanup.py @@ -0,0 +1,21 @@ +"""Integration test for elementary.drop_stale_ci_schemas macro.""" + +import json + +import pytest +from dbt_project import DbtProject + + +@pytest.mark.skip_targets(["dremio"]) +def test_drop_stale_ci_schemas(dbt_project: DbtProject): + """Verify that old CI schemas are dropped and recent ones are kept.""" + result = dbt_project.dbt_runner.run_operation( + "elementary_tests.test_drop_stale_ci_schemas", + ) + assert result, "run_operation returned no output" + data = json.loads(result[0]) + + assert data["old_exists_before"], "Setup failed: old schema was not created" + assert data["recent_exists_before"], "Setup failed: recent schema was not created" + assert data["old_dropped"], "Old schema should have been dropped by cleanup" + assert data["recent_kept"], "Recent schema should have been kept by cleanup" diff --git a/macros/utils/cross_db_utils/schema_exists.sql b/macros/utils/cross_db_utils/schema_exists.sql index 659958a71..c2876de0c 100644 --- a/macros/utils/cross_db_utils/schema_exists.sql +++ b/macros/utils/cross_db_utils/schema_exists.sql @@ -29,6 +29,11 @@ {% do return(adapter.check_schema_exists(database, schema)) %} {% endmacro %} +{% macro clickhouse__schema_exists(database, schema) %} + {% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ schema ~ "' LIMIT 1") %} + {% do return(result | length > 0) %} +{% endmacro %} + {% macro default__schema_exists(database, schema) %} {% do return(adapter.check_schema_exists(database, schema)) %} {% endmacro %}