-
Notifications
You must be signed in to change notification settings - Fork 134
feat: add weekly cleanup workflow for stale CI schemas #943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
213d12a
4605b7f
fbe4059
a124fbe
fb87fe6
f327572
c373f8a
a9f398d
fe9ff60
5c58b2e
572eeca
dcd447a
1c6b4db
f85599c
5adb93f
dbd9047
3265525
1001681
357d9a2
14a0995
6913c74
3ca9f64
45c65ef
320a250
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| name: Cleanup stale CI schemas | ||
|
|
||
| on: | ||
| schedule: | ||
| # Every Sunday at 03:00 UTC | ||
| - cron: "0 3 * * 0" | ||
| workflow_dispatch: | ||
| inputs: | ||
| max-age-hours: | ||
| type: string | ||
| required: false | ||
| default: "24" | ||
| description: Drop schemas older than this many hours | ||
|
|
||
| env: | ||
| TESTS_DIR: ${{ github.workspace }}/dbt-data-reliability/integration_tests | ||
|
|
||
| jobs: | ||
| cleanup: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| fail-fast: false | ||
| matrix: | ||
| warehouse-type: | ||
| - snowflake | ||
| - bigquery | ||
| - redshift | ||
| - databricks_catalog | ||
| - athena | ||
| steps: | ||
| - name: Checkout dbt package | ||
| uses: actions/checkout@v4 | ||
| with: | ||
| path: dbt-data-reliability | ||
|
|
||
| - name: Setup Python | ||
| uses: actions/setup-python@v6 | ||
| with: | ||
| python-version: "3.10" | ||
| cache: "pip" | ||
|
|
||
| - name: Install dbt | ||
| run: > | ||
| pip install | ||
| "dbt-core" | ||
| "dbt-${{ (matrix.warehouse-type == 'databricks_catalog' && 'databricks') || (matrix.warehouse-type == 'athena' && 'athena-community') || matrix.warehouse-type }}" | ||
|
|
||
| - name: Write dbt profiles | ||
| env: | ||
| CI_WAREHOUSE_SECRETS: ${{ secrets.CI_WAREHOUSE_SECRETS || '' }} | ||
| run: | | ||
| # The cleanup job doesn't create schemas, but generate_profiles.py | ||
| # requires --schema-name. Use a dummy value. | ||
| python "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/generate_profiles.py" \ | ||
| --template "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/profiles.yml.j2" \ | ||
| --output ~/.dbt/profiles.yml \ | ||
| --schema-name "cleanup_placeholder" | ||
|
|
||
| - name: Install dbt deps | ||
| working-directory: ${{ env.TESTS_DIR }}/dbt_project | ||
| run: dbt deps | ||
|
|
||
| - name: Symlink local elementary package | ||
| run: ln -sfn ${{ github.workspace }}/dbt-data-reliability ${{ env.TESTS_DIR }}/dbt_project/dbt_packages/elementary | ||
|
|
||
| - name: Drop stale CI schemas | ||
| working-directory: ${{ env.TESTS_DIR }}/dbt_project | ||
| run: > | ||
| dbt run-operation elementary.drop_stale_ci_schemas | ||
| --args '{prefixes: ["dbt_"], max_age_hours: ${{ inputs.max-age-hours || '24' }}}' | ||
| -t "${{ matrix.warehouse-type }}" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| {# | ||
| Integration-test helper for elementary.drop_stale_ci_schemas. | ||
|
|
||
| Creates two CI-style schemas (one with an old timestamp, one recent), | ||
| runs the cleanup macro, checks which schemas survived, cleans up, | ||
| and returns a JSON result dict. | ||
| #} | ||
|
|
||
| {% macro test_drop_stale_ci_schemas() %} | ||
| {% set database = elementary.target_database() %} | ||
| {% set now = modules.datetime.datetime.utcnow() %} | ||
|
|
||
| {# Old schema: timestamp in the past (2020-01-01 00:00:00) #} | ||
| {% set old_schema = 'dbt_200101_000000_citest_00000000' %} | ||
| {# Recent schema: timestamp = now #} | ||
| {% set recent_ts = now.strftime('%y%m%d_%H%M%S') %} | ||
| {% set recent_schema = 'dbt_' ~ recent_ts ~ '_citest_11111111' %} | ||
|
|
||
| {{ log("TEST: creating old schema: " ~ old_schema, info=true) }} | ||
| {{ log("TEST: creating recent schema: " ~ recent_schema, info=true) }} | ||
|
|
||
| {# ── Create both schemas ───────────────────────────────────────────── #} | ||
| {% do elementary_tests.edr_create_schema(database, old_schema) %} | ||
| {% do elementary_tests.edr_create_schema(database, recent_schema) %} | ||
|
|
||
| {# ── Verify both exist before running cleanup ──────────────────────── #} | ||
| {% set old_exists_before = adapter.check_schema_exists(database, old_schema) %} | ||
| {% set recent_exists_before = adapter.check_schema_exists(database, recent_schema) %} | ||
| {{ log("TEST: old_exists_before=" ~ old_exists_before ~ ", recent_exists_before=" ~ recent_exists_before, info=true) }} | ||
|
|
||
| {# ── Run cleanup with 1-hour threshold ─────────────────────────────── #} | ||
| {% do elementary.drop_stale_ci_schemas(prefixes=['dbt_'], max_age_hours=1) %} | ||
|
|
||
| {# ── Check which schemas survived ─────────────────────────────────── #} | ||
| {% set old_exists_after = adapter.check_schema_exists(database, old_schema) %} | ||
| {% set recent_exists_after = adapter.check_schema_exists(database, recent_schema) %} | ||
| {{ log("TEST: old_exists_after=" ~ old_exists_after ~ ", recent_exists_after=" ~ recent_exists_after, info=true) }} | ||
|
|
||
| {# ── Cleanup: drop any remaining test schemas ─────────────────────── #} | ||
| {% if old_exists_after %} | ||
| {% do elementary.drop_ci_schema(database, old_schema) %} | ||
| {% endif %} | ||
| {% if recent_exists_after %} | ||
| {% do elementary.drop_ci_schema(database, recent_schema) %} | ||
| {% endif %} | ||
|
|
||
| {# ── Return results ────────────────────────────────────────────────── #} | ||
| {% set results = { | ||
| "old_exists_before": old_exists_before, | ||
| "recent_exists_before": recent_exists_before, | ||
| "old_dropped": not old_exists_after, | ||
| "recent_kept": recent_exists_after | ||
| } %} | ||
| {% do return(tojson(results)) %} | ||
| {% endmacro %} | ||
|
|
||
|
|
||
| {# ── Per-adapter schema creation ─────────────────────────────────────── #} | ||
|
|
||
| {% macro edr_create_schema(database, schema_name) %} | ||
| {% do return(adapter.dispatch('edr_create_schema', 'elementary_tests')(database, schema_name)) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro default__edr_create_schema(database, schema_name) %} | ||
| {% set schema_relation = api.Relation.create(database=database, schema=schema_name) %} | ||
| {% do dbt.create_schema(schema_relation) %} | ||
| {% do adapter.commit() %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro clickhouse__edr_create_schema(database, schema_name) %} | ||
| {% do run_query("CREATE DATABASE IF NOT EXISTS " ~ schema_name) %} | ||
| {% do adapter.commit() %} | ||
| {% endmacro %} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| """Integration test for elementary.drop_stale_ci_schemas macro.""" | ||
|
|
||
| import json | ||
|
|
||
| import pytest | ||
| from dbt_project import DbtProject | ||
|
|
||
|
|
||
| # Dremio does not support DROP SCHEMA reliably; skip it. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that Dremio has "DROP FOLDER", should we perhaps use that?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! I looked into it — Dremio's Looking at the existing codebase, The skip was a conservative choice since I wasn't sure if Dremio could create arbitrary schemas via
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes let's try
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — removed the Dremio skip in fe9ff60. CI will tell us if it works. Also fixed two other issues in the same push:\n- Double-encoding bug (test macro was returning |
||
| # ClickHouse uses a custom dispatch (CREATE/DROP DATABASE) which is tested implicitly. | ||
| @pytest.mark.skip_targets(["dremio"]) | ||
| def test_drop_stale_ci_schemas(dbt_project: DbtProject): | ||
| """Verify that old CI schemas are dropped and recent ones are kept.""" | ||
| result = dbt_project.dbt_runner.run_operation( | ||
| "elementary_tests.test_drop_stale_ci_schemas", | ||
| ) | ||
| assert result, "run_operation returned no output" | ||
| data = json.loads(result[0]) | ||
|
|
||
| assert data["old_exists_before"], "Setup failed: old schema was not created" | ||
| assert data["recent_exists_before"], "Setup failed: recent schema was not created" | ||
| assert data["old_dropped"], "Old schema should have been dropped by cleanup" | ||
| assert data["recent_kept"], "Recent schema should have been kept by cleanup" | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,80 @@ | ||||||||||||||||
| {# | ||||||||||||||||
| drop_stale_ci_schemas – clean up timestamped CI schemas. | ||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this macro be in the integration tests project?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — moved the entire macro to |
||||||||||||||||
|
|
||||||||||||||||
| Schema naming convention produced by CI: | ||||||||||||||||
| <prefix><YYMMDD_HHMMSS>_<branch>_<hash> | ||||||||||||||||
| Examples: | ||||||||||||||||
| dbt_260228_112345_master_abcd1234 | ||||||||||||||||
| py_260228_112345_master_abcd1234 | ||||||||||||||||
| dbt_260228_112345_master_abcd1234_elementary (suffixed variant) | ||||||||||||||||
|
|
||||||||||||||||
| Call from a GitHub Actions workflow via: | ||||||||||||||||
| dbt run-operation elementary.drop_stale_ci_schemas \ | ||||||||||||||||
| --args '{prefixes: ["dbt_", "py_"], max_age_hours: 24}' | ||||||||||||||||
| #} | ||||||||||||||||
|
|
||||||||||||||||
| {% macro drop_stale_ci_schemas(prefixes=None, max_age_hours=24) %} | ||||||||||||||||
|
haritamar marked this conversation as resolved.
Outdated
|
||||||||||||||||
| {% if prefixes is none %} | ||||||||||||||||
| {% set prefixes = ['dbt_', 'py_'] %} | ||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's make prefixes a mandatory input, and fail if it is not specified
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — |
||||||||||||||||
| {% endif %} | ||||||||||||||||
|
|
||||||||||||||||
| {% set database = elementary.target_database() %} | ||||||||||||||||
| {% set all_schemas = adapter.list_schemas(database) %} | ||||||||||||||||
| {% set now = modules.datetime.datetime.utcnow() %} | ||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: In Python 3.12, Replacementfrom datetime import datetime, UTC
now_utc = datetime.now(UTC)If you need compatibility with older Python versionsfrom datetime import datetime, timezone
now_utc = datetime.now(timezone.utc)Related:
|
||||||||||||||||
| {% set max_age_seconds = max_age_hours * 3600 %} | ||||||||||||||||
| {% set ns = namespace(dropped=0) %} | ||||||||||||||||
|
|
||||||||||||||||
| {{ log("CI schema cleanup: scanning " ~ all_schemas | length ~ " schema(s) in database '" ~ database ~ "' for prefixes " ~ prefixes | string, info=true) }} | ||||||||||||||||
|
|
||||||||||||||||
| {% for schema_name in all_schemas | sort %} | ||||||||||||||||
| {% set schema_lower = schema_name.lower() %} | ||||||||||||||||
| {% for prefix in prefixes %} | ||||||||||||||||
| {% if schema_lower.startswith(prefix.lower()) %} | ||||||||||||||||
| {% set remainder = schema_lower[prefix | length :] %} | ||||||||||||||||
| {# Timestamp format: YYMMDD_HHMMSS (13 chars) followed by _ #} | ||||||||||||||||
| {% if remainder | length >= 14 and remainder[6:7] == '_' and remainder[13:14] == '_' %} | ||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — replaced the manual character-by-character validation with Named it |
||||||||||||||||
| {% set ts_str = remainder[:13] %} | ||||||||||||||||
| {# Validate: positions 0-5 and 7-12 must be digits #} | ||||||||||||||||
| {% set digits = ts_str[:6] ~ ts_str[7:] %} | ||||||||||||||||
| {% set ns_valid = namespace(ok=true) %} | ||||||||||||||||
| {% for c in digits %} | ||||||||||||||||
| {% if c not in '0123456789' %} | ||||||||||||||||
| {% set ns_valid.ok = false %} | ||||||||||||||||
| {% endif %} | ||||||||||||||||
| {% endfor %} | ||||||||||||||||
| {% if ns_valid.ok %} | ||||||||||||||||
| {% set schema_ts = modules.datetime.datetime.strptime(ts_str, '%y%m%d_%H%M%S') %} | ||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The digit validation ensures characters are numeric but doesn't validate actual date ranges. For example, Consider wrapping in a try/except to skip malformed schemas gracefully: 🛡️ Suggested defensive handling {% if ns_valid.ok %}
- {% set schema_ts = modules.datetime.datetime.strptime(ts_str, '%y%m%d_%H%M%S') %}
- {% set age_seconds = (now - schema_ts).total_seconds() %}
- {% if age_seconds > max_age_seconds %}
+ {% set schema_ts = none %}
+ {% try %}
+ {% set schema_ts = modules.datetime.datetime.strptime(ts_str, '%y%m%d_%H%M%S') %}
+ {% except %}
+ {{ log(" skip " ~ schema_name ~ " (invalid timestamp: " ~ ts_str ~ ")", info=true) }}
+ {% endtry %}
+ {% if schema_ts is not none %}
+ {% set age_seconds = (now - schema_ts).total_seconds() %}
+ {% if age_seconds > max_age_seconds %}Note: Jinja2 doesn't have native try/except. You may need to validate month (01-12) and day (01-31) ranges explicitly, or accept the current behavior given CI schemas are auto-generated with valid timestamps. 🤖 Prompt for AI Agents |
||||||||||||||||
| {% set age_seconds = (now - schema_ts).total_seconds() %} | ||||||||||||||||
| {% if age_seconds > max_age_seconds %} | ||||||||||||||||
| {{ log(" DROP " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }} | ||||||||||||||||
| {% do elementary.drop_ci_schema(database, schema_name) %} | ||||||||||||||||
| {% set ns.dropped = ns.dropped + 1 %} | ||||||||||||||||
| {% else %} | ||||||||||||||||
| {{ log(" keep " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }} | ||||||||||||||||
| {% endif %} | ||||||||||||||||
| {% endif %} | ||||||||||||||||
| {% endif %} | ||||||||||||||||
| {% endif %} | ||||||||||||||||
| {% endfor %} | ||||||||||||||||
| {% endfor %} | ||||||||||||||||
|
|
||||||||||||||||
| {{ log("CI schema cleanup complete. Dropped " ~ ns.dropped ~ " stale schema(s).", info=true) }} | ||||||||||||||||
| {% endmacro %} | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| {# ── Per-adapter schema drop ─────────────────────────────────────────── #} | ||||||||||||||||
|
|
||||||||||||||||
| {% macro drop_ci_schema(database, schema_name) %} | ||||||||||||||||
| {% do return(adapter.dispatch('drop_ci_schema', 'elementary')(database, schema_name)) %} | ||||||||||||||||
| {% endmacro %} | ||||||||||||||||
|
|
||||||||||||||||
| {% macro default__drop_ci_schema(database, schema_name) %} | ||||||||||||||||
| {% set schema_relation = api.Relation.create(database=database, schema=schema_name) %} | ||||||||||||||||
| {% do dbt.drop_schema(schema_relation) %} | ||||||||||||||||
| {% do adapter.commit() %} | ||||||||||||||||
| {% endmacro %} | ||||||||||||||||
|
|
||||||||||||||||
| {% macro clickhouse__drop_ci_schema(database, schema_name) %} | ||||||||||||||||
| {% do run_query("DROP DATABASE IF EXISTS " ~ schema_name) %} | ||||||||||||||||
| {% do adapter.commit() %} | ||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quote the ClickHouse database identifier before executing Line 100 concatenates ♻️ Proposed fix {% macro clickhouse__drop_ci_schema(database, schema_name) %}
- {% do run_query("DROP DATABASE IF EXISTS " ~ schema_name) %}
+ {% do run_query("DROP DATABASE IF EXISTS " ~ adapter.quote(schema_name)) %}
{% do adapter.commit() %}
{% endmacro %}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||
| {% endmacro %} | ||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 149
🌐 Web query:
Python 3.12 datetime.utcnow() deprecation removal💡 Result:
datetime.datetime.utcnow()(andutcfromtimestamp()) were deprecated in Python 3.12 and now emit aDeprecationWarningsaying they’re “scheduled for removal in a future version.” The official replacement is timezone-aware UTC datetimes, e.g.datetime.datetime.now(tz=datetime.UTC). [1]utcnow()is still present (still marked “Deprecated since version 3.12”), which indicates it has not been removed yet, and no specific removal version is stated in the docs. [2]Recommended migration:
Sources: [1] [2] [3]
🏁 Script executed:
cat -n ./integration_tests/dbt_project/macros/test_drop_stale_ci_schemas.sql | head -30Repository: elementary-data/dbt-data-reliability
Length of output: 1691
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 818
🌐 Web query:
dbt Jinja modules.datetime.timezone Python version compatibility💡 Result:
modules.datetime.timezoneis part of Python’s standard librarydatetimemodule, and it’s available in all modern Python 3 versions that dbt supports—so it’s not a dbt/Python-version incompatibility issue in practice.modulesobject exposes Python stdlib modules, andmodules.datetimeis commonly used for date/time logic in macros (e.g.,modules.datetime.datetime.now(),modules.datetime.timedelta(...)). That same object should also exposemodules.datetime.timezone(e.g.,modules.datetime.timezone.utc). [3]datetime.timezonewill be present. [1]datetime.timezoneexists. [2]If you’re seeing an error like “
timezoneis undefined” or “has no attributetimezone”, it’s more likely due to how the Jinja context is being executed (e.g., a non-dbt Jinja renderer, older tooling that doesn’t providemodules, or a linter/templater mismatch), not Python’sdatetimemodule.Sources: [1] [2] [3]
Use timezone-aware UTC instead of
utcnow()in the test macro.The
datetime.utcnow()method is deprecated in Python 3.12+ and will be removed in a future version. Replace it withdatetime.now(timezone.utc)to maintain forward compatibility and avoid runtime warnings.♻️ Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents