Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
213d12a
feat: add weekly cleanup workflow for stale CI schemas
devin-ai-integration[bot] Feb 28, 2026
4605b7f
fix: remove unused ns.errors variable
devin-ai-integration[bot] Feb 28, 2026
fbe4059
test: add integration test for drop_stale_ci_schemas macro
devin-ai-integration[bot] Feb 28, 2026
a124fbe
fix: guard recent schema cleanup with existence check
devin-ai-integration[bot] Feb 28, 2026
fb87fe6
Merge remote-tracking branch 'origin/master' into devin/1772278180-cl…
devin-ai-integration[bot] Feb 28, 2026
f327572
fix: add ClickHouse-compatible list_ci_schemas dispatch and use eleme…
devin-ai-integration[bot] Feb 28, 2026
c373f8a
fix: add clickhouse__schema_exists implementation
devin-ai-integration[bot] Feb 28, 2026
a9f398d
fix: return dict instead of tojson (double-encoding) and use exact-ma…
devin-ai-integration[bot] Feb 28, 2026
fe9ff60
test: remove Dremio skip to let CI validate schema cleanup
devin-ai-integration[bot] Feb 28, 2026
5c58b2e
fix: address CodeRabbit review comments
devin-ai-integration[bot] Feb 28, 2026
572eeca
fix: revert to utcnow() - modules.datetime.timezone not available in …
devin-ai-integration[bot] Feb 28, 2026
dcd447a
fix: use SQL queries instead of adapter methods in run-operation context
devin-ai-integration[bot] Feb 28, 2026
1c6b4db
fix: add BigQuery-specific list_ci_schemas and ci_schema_exists imple…
devin-ai-integration[bot] Feb 28, 2026
f85599c
test: re-add Dremio skip - DremioRelation path model incompatible wit…
devin-ai-integration[bot] Feb 28, 2026
5adb93f
fix: increase test max_age_hours to 8760 to avoid race with parallel …
devin-ai-integration[bot] Feb 28, 2026
dbd9047
docs: fix comment - year 2020, not year 2000
devin-ai-integration[bot] Feb 28, 2026
3265525
refactor: move cleanup macros to integration tests, use regex parser,…
devin-ai-integration[bot] Feb 28, 2026
1001681
refactor: split helpers into schema_utils/, named regex groups, renam…
devin-ai-integration[bot] Feb 28, 2026
357d9a2
rename: schema_utils/ -> ci_schema_utils/ for clarity
devin-ai-integration[bot] Feb 28, 2026
14a0995
refactor: split into schema_utils/ (generic) + ci_schemas_cleanup/ (C…
devin-ai-integration[bot] Feb 28, 2026
6913c74
fix: rename macros to edr_ prefix to avoid shadowing dbt-databricks i…
devin-ai-integration[bot] Feb 28, 2026
3ca9f64
fix: remove duplicate edr_drop_schema - already exists in clear_env.sql
devin-ai-integration[bot] Feb 28, 2026
45c65ef
fix: add SQL escaping for defense-in-depth (CodeRabbit suggestion)
devin-ai-integration[bot] Feb 28, 2026
320a250
fix: add BigQuery backtick escaping for database identifiers (CodeRab…
devin-ai-integration[bot] Feb 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions .github/workflows/cleanup-stale-schemas.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: Cleanup stale CI schemas

on:
schedule:
# Every Sunday at 03:00 UTC
- cron: "0 3 * * 0"
workflow_dispatch:
inputs:
max-age-hours:
type: string
required: false
default: "24"
description: Drop schemas older than this many hours

env:
TESTS_DIR: ${{ github.workspace }}/dbt-data-reliability/integration_tests

jobs:
cleanup:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
warehouse-type:
- snowflake
- bigquery
- redshift
- databricks_catalog
- athena
steps:
- name: Checkout dbt package
uses: actions/checkout@v4
with:
path: dbt-data-reliability

- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.10"
cache: "pip"

- name: Install dbt
run: >
pip install
"dbt-core"
"dbt-${{ (matrix.warehouse-type == 'databricks_catalog' && 'databricks') || (matrix.warehouse-type == 'athena' && 'athena-community') || matrix.warehouse-type }}"

- name: Write dbt profiles
env:
CI_WAREHOUSE_SECRETS: ${{ secrets.CI_WAREHOUSE_SECRETS || '' }}
run: |
# The cleanup job doesn't create schemas, but generate_profiles.py
# requires --schema-name. Use a dummy value.
python "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/generate_profiles.py" \
--template "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/profiles.yml.j2" \
--output ~/.dbt/profiles.yml \
--schema-name "cleanup_placeholder"

- name: Install dbt deps
working-directory: ${{ env.TESTS_DIR }}/dbt_project
run: dbt deps

- name: Symlink local elementary package
run: ln -sfn ${{ github.workspace }}/dbt-data-reliability ${{ env.TESTS_DIR }}/dbt_project/dbt_packages/elementary

- name: Drop stale CI schemas
working-directory: ${{ env.TESTS_DIR }}/dbt_project
# Only dbt_ prefixed schemas are created in this repo's CI.
# The elementary repo has its own workflow for py_ prefixed schemas.
run: >
dbt run-operation drop_stale_ci_schemas
--args '{prefixes: ["dbt_"], max_age_hours: ${{ inputs.max-age-hours || '24' }}}'
-t "${{ matrix.warehouse-type }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{#
drop_stale_ci_schemas – clean up timestamped CI schemas.

Schema naming convention produced by CI:
<prefix><YYMMDD_HHMMSS>_<branch>_<hash>
Examples:
dbt_260228_112345_master_abcd1234
py_260228_112345_master_abcd1234
dbt_260228_112345_master_abcd1234_elementary (suffixed variant)

Call from a GitHub Actions workflow via:
dbt run-operation drop_stale_ci_schemas \
--args '{prefixes: ["dbt_", "py_"], max_age_hours: 24}'

Generic schema helpers (edr_list_schemas, edr_schema_exists) live in the
schema_utils/ folder. edr_drop_schema lives in clear_env.sql. CI-specific
helpers (parse_timestamp_from_ci_schema_name) live alongside this file.
#}

{% macro drop_stale_ci_schemas(prefixes=none, max_age_hours=24) %}
{% if prefixes is none or prefixes is string or prefixes | length == 0 %}
{{ exceptions.raise_compiler_error(
"drop_stale_ci_schemas: 'prefixes' is required and must be a "
"non-empty list (e.g. ['dbt_', 'py_'])."
) }}
{% endif %}

{% set max_age_hours = max_age_hours | int %}
{% set database = elementary.target_database() %}
{% set all_schemas = edr_list_schemas(database) %}
{# utcnow() is deprecated in Python 3.12+ but modules.datetime.timezone is not
available in dbt's Jinja context. Both now and the constructed datetime are
naive, so comparisons are safe. #}
{% set now = modules.datetime.datetime.utcnow() %}
{% set max_age_seconds = max_age_hours * 3600 %}
{% set ns = namespace(dropped=0) %}

{{ log("CI schema cleanup: scanning " ~ all_schemas | length ~ " schema(s) in database '" ~ database ~ "' for prefixes " ~ prefixes | string, info=true) }}

{% for schema_name in all_schemas | sort %}
{% set schema_ts = parse_timestamp_from_ci_schema_name(schema_name, prefixes) %}
{% if schema_ts is not none %}
{% set age_seconds = (now - schema_ts).total_seconds() %}
{% if age_seconds > max_age_seconds %}
{{ log(" DROP " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }}
{% do edr_drop_schema(database, schema_name) %}
{% set ns.dropped = ns.dropped + 1 %}
{% else %}
{{ log(" keep " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }}
{% endif %}
{% endif %}
{% endfor %}

{{ log("CI schema cleanup complete. Dropped " ~ ns.dropped ~ " stale schema(s).", info=true) }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{#
Parses a CI schema name and returns the embedded timestamp as a datetime,
or none if the name doesn't match the expected CI naming pattern.

Schema naming convention:
<prefix><YY><MM><DD>_<HH><MI><SS>_<branch>_<hash>[_<suffix>]
#}

{% macro parse_timestamp_from_ci_schema_name(schema_name, prefixes) %}
{% set schema_lower = schema_name.lower() %}
{% for prefix in prefixes %}
{% if schema_lower.startswith(prefix.lower()) %}
{% set remainder = schema_lower[prefix | length :] %}
{% set match = modules.re.match(
'^(?P<yy>\\d{2})(?P<mm>\\d{2})(?P<dd>\\d{2})_(?P<HH>\\d{2})(?P<MI>\\d{2})(?P<SS>\\d{2})_.+',
remainder
) %}
{% if match %}
{% set yy = match.group('yy') | int %}
{% set mm = match.group('mm') | int %}
{% set dd = match.group('dd') | int %}
{% set HH = match.group('HH') | int %}
{% set MI = match.group('MI') | int %}
{% set SS = match.group('SS') | int %}
{% if 1 <= mm <= 12 and 1 <= dd <= 31 and 0 <= HH <= 23 and 0 <= MI <= 59 and 0 <= SS <= 59 %}
{% do return(modules.datetime.datetime(2000 + yy, mm, dd, HH, MI, SS)) %}
{% endif %}
{% endif %}
{% endif %}
{% endfor %}
{% do return(none) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{#
Integration-test helper for drop_stale_ci_schemas.

Creates two CI-style schemas (one with an old timestamp, one recent),
runs the cleanup macro, checks which schemas survived, cleans up,
and returns a JSON result dict.
#}

{% macro test_drop_stale_ci_schemas() %}
{% set database = elementary.target_database() %}
{% set now = modules.datetime.datetime.utcnow() %}

{# Old schema: timestamp in the past (2020-01-01 00:00:00) #}
{% set old_schema = 'dbt_200101_000000_citest_00000000' %}
{# Recent schema: timestamp = now #}
{% set recent_ts = now.strftime('%y%m%d_%H%M%S') %}
{% set recent_schema = 'dbt_' ~ recent_ts ~ '_citest_11111111' %}

{{ log("TEST: creating old schema: " ~ old_schema, info=true) }}
{{ log("TEST: creating recent schema: " ~ recent_schema, info=true) }}

{# ── Create both schemas ───────────────────────────────────────────── #}
{% do edr_create_schema(database, old_schema) %}
{% do edr_create_schema(database, recent_schema) %}

{# ── Verify both exist before running cleanup ──────────────────────── #}
{% set old_exists_before = edr_schema_exists(database, old_schema) %}
{% set recent_exists_before = edr_schema_exists(database, recent_schema) %}
{{ log("TEST: old_exists_before=" ~ old_exists_before ~ ", recent_exists_before=" ~ recent_exists_before, info=true) }}

{# ── Run cleanup with a large threshold so only the artificially old
schema (year 2020) is caught, and real CI schemas from parallel
workers are safely below the threshold. ──────────────────────────── #}
{% do drop_stale_ci_schemas(prefixes=['dbt_'], max_age_hours=8760) %}

{# ── Check which schemas survived ─────────────────────────────────── #}
{% set old_exists_after = edr_schema_exists(database, old_schema) %}
{% set recent_exists_after = edr_schema_exists(database, recent_schema) %}
{{ log("TEST: old_exists_after=" ~ old_exists_after ~ ", recent_exists_after=" ~ recent_exists_after, info=true) }}

{# ── Cleanup: drop any remaining test schemas ─────────────────────── #}
{% if old_exists_after is true %}
{% do edr_drop_schema(database, old_schema) %}
{% endif %}
{% if recent_exists_after %}
{% do edr_drop_schema(database, recent_schema) %}
{% endif %}

{# ── Return results ────────────────────────────────────────────────── #}
{% set results = {
"old_exists_before": old_exists_before,
"recent_exists_before": recent_exists_before,
"old_dropped": not old_exists_after,
"recent_kept": recent_exists_after
} %}
{% do return(results) %}
{% endmacro %}


{# ── Per-adapter schema creation ─────────────────────────────────────── #}

{% macro edr_create_schema(database, schema_name) %}
{% do return(adapter.dispatch('edr_create_schema', 'elementary_tests')(database, schema_name)) %}
{% endmacro %}

{% macro default__edr_create_schema(database, schema_name) %}
{% set schema_relation = api.Relation.create(database=database, schema=schema_name) %}
{% do dbt.create_schema(schema_relation) %}
{% do adapter.commit() %}
{% endmacro %}

{% macro clickhouse__edr_create_schema(database, schema_name) %}
{% do run_query("CREATE DATABASE IF NOT EXISTS `" ~ schema_name ~ "`") %}
{% do adapter.commit() %}
{% endmacro %}
Comment on lines +72 to +75
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, locate the file and read the specific lines
find . -name "test_drop_stale_ci_schemas.sql" -type f | head -1

Repository: elementary-data/dbt-data-reliability

Length of output: 149


🏁 Script executed:

# Check the actual content around lines 70-73
if [ -f "integration_tests/dbt_project/macros/test_drop_stale_ci_schemas.sql" ]; then
  cat -n "integration_tests/dbt_project/macros/test_drop_stale_ci_schemas.sql" | sed -n '65,80p'
fi

Repository: elementary-data/dbt-data-reliability

Length of output: 498


🏁 Script executed:

# Search for other ClickHouse macro implementations to understand patterns
rg "clickhouse__" --type sql -A 3 | head -100

Repository: elementary-data/dbt-data-reliability

Length of output: 7839


🏁 Script executed:

# Check for adapter.quote usage in ClickHouse macros
rg "adapter\.quote" --type sql -B 2 -A 2 | head -80

Repository: elementary-data/dbt-data-reliability

Length of output: 707


🏁 Script executed:

# Search for CREATE DATABASE usage in the codebase
rg "CREATE DATABASE" --type sql -B 2 -A 2

Repository: elementary-data/dbt-data-reliability

Length of output: 597


🏁 Script executed:

# Find the default implementation of edr_create_schema
rg "macro edr_create_schema" --type sql -A 5 | grep -v "clickhouse__" | head -20

Repository: elementary-data/dbt-data-reliability

Length of output: 807


🏁 Script executed:

# Check the database parameter usage in similar default macros
rg "edr_drop_schema|edr_create_schema" --type sql -B 1 -A 4 | grep -E "(macro|database)" | head -20

Repository: elementary-data/dbt-data-reliability

Length of output: 2536


Quote the identifier and align with the interface contract.

The schema_name parameter is directly concatenated into the SQL query without quoting. While this works in the test context with programmatically generated names, it's fragile if names contain special characters. More importantly, the database parameter is accepted but unused, inconsistent with the default implementation which properly uses it via api.Relation.create().

♻️ Proposed fix
 {% macro clickhouse__edr_create_schema(database, schema_name) %}
-  {% do run_query("CREATE DATABASE IF NOT EXISTS " ~ schema_name) %}
+  {% do run_query("CREATE DATABASE IF NOT EXISTS " ~ adapter.quote(schema_name)) %}
   {% do adapter.commit() %}
 {% endmacro %}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@integration_tests/dbt_project/macros/test_drop_stale_ci_schemas.sql` around
lines 70 - 73, The macro clickhouse__edr_create_schema currently concatenates
schema_name into SQL and ignores the database param; instead construct a dbt
Relation via api.Relation.create(database=database, schema=schema_name) (or at
minimum use adapter.quote_identifier on both database and schema_name) and use
that quoted identifier in the run_query call, then call adapter.commit(); update
clickhouse__edr_create_schema to build the relation with
api.Relation.create(database=..., schema=...) and use the adapter’s quoting
helpers (adapter.quote_identifier or relation rendering) for the CREATE DATABASE
IF NOT EXISTS statement before committing.

40 changes: 40 additions & 0 deletions integration_tests/dbt_project/macros/schema_utils/list_schemas.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{#
Per-adapter schema listing, scoped to a given database.

Uses SQL queries instead of adapter.list_schemas() because that method
is not available in dbt's run-operation context (RuntimeDatabaseWrapper
does not expose it).
#}

{% macro edr_list_schemas(database) %}
{% do return(adapter.dispatch('edr_list_schemas', 'elementary_tests')(database)) %}
{% endmacro %}

{% macro default__edr_list_schemas(database) %}
{% set safe_db = database | replace("'", "''") %}
{% set results = run_query("SELECT schema_name FROM information_schema.schemata WHERE lower(catalog_name) = lower('" ~ safe_db ~ "')") %}
{% set schemas = [] %}
{% for row in results %}
{% do schemas.append(row[0]) %}
{% endfor %}
{% do return(schemas) %}
{% endmacro %}

{% macro bigquery__edr_list_schemas(database) %}
{% set safe_db = database | replace("`", "\`") %}
{% set results = run_query("SELECT schema_name FROM `" ~ safe_db ~ "`.INFORMATION_SCHEMA.SCHEMATA") %}
{% set schemas = [] %}
{% for row in results %}
{% do schemas.append(row[0]) %}
{% endfor %}
{% do return(schemas) %}
{% endmacro %}

{% macro clickhouse__edr_list_schemas(database) %}
{% set results = run_query('SHOW DATABASES') %}
{% set schemas = [] %}
{% for row in results %}
{% do schemas.append(row[0]) %}
{% endfor %}
{% do return(schemas) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{#
Per-adapter schema existence check, scoped to a given database.

Uses SQL queries instead of adapter.check_schema_exists() because that
method is not available in dbt's run-operation context
(RuntimeDatabaseWrapper does not expose it).
#}
Comment thread
haritamar marked this conversation as resolved.

{% macro edr_schema_exists(database, schema_name) %}
{% do return(adapter.dispatch('edr_schema_exists', 'elementary_tests')(database, schema_name)) %}
{% endmacro %}

{% macro default__edr_schema_exists(database, schema_name) %}
{% set safe_db = database | replace("'", "''") %}
{% set safe_schema = schema_name | replace("'", "''") %}
{% set result = run_query("SELECT schema_name FROM information_schema.schemata WHERE lower(catalog_name) = lower('" ~ safe_db ~ "') AND lower(schema_name) = lower('" ~ safe_schema ~ "')") %}
{% do return(result | length > 0) %}
{% endmacro %}

{% macro bigquery__edr_schema_exists(database, schema_name) %}
{% set safe_db = database | replace("`", "\`") %}
{% set safe_schema = schema_name | replace("'", "''") %}
{% set result = run_query("SELECT schema_name FROM `" ~ safe_db ~ "`.INFORMATION_SCHEMA.SCHEMATA WHERE lower(schema_name) = lower('" ~ safe_schema ~ "')") %}
{% do return(result | length > 0) %}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
{% endmacro %}

{% macro clickhouse__edr_schema_exists(database, schema_name) %}
{% set safe_schema = schema_name | replace("'", "''") %}
{% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ safe_schema ~ "' LIMIT 1") %}
{% do return(result | length > 0) %}
{% endmacro %}
21 changes: 21 additions & 0 deletions integration_tests/tests/test_ci_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""Integration test for elementary.drop_stale_ci_schemas macro."""

import json

import pytest
from dbt_project import DbtProject


@pytest.mark.skip_targets(["dremio"])
def test_drop_stale_ci_schemas(dbt_project: DbtProject):
"""Verify that old CI schemas are dropped and recent ones are kept."""
result = dbt_project.dbt_runner.run_operation(
"elementary_tests.test_drop_stale_ci_schemas",
)
assert result, "run_operation returned no output"
data = json.loads(result[0])

assert data["old_exists_before"], "Setup failed: old schema was not created"
assert data["recent_exists_before"], "Setup failed: recent schema was not created"
assert data["old_dropped"], "Old schema should have been dropped by cleanup"
assert data["recent_kept"], "Recent schema should have been kept by cleanup"
5 changes: 5 additions & 0 deletions macros/utils/cross_db_utils/schema_exists.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
{% do return(adapter.check_schema_exists(database, schema)) %}
{% endmacro %}

{% macro clickhouse__schema_exists(database, schema) %}
{% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ schema ~ "' LIMIT 1") %}
{% do return(result | length > 0) %}
{% endmacro %}

{% macro default__schema_exists(database, schema) %}
{% do return(adapter.check_schema_exists(database, schema)) %}
{% endmacro %}