-
Notifications
You must be signed in to change notification settings - Fork 130
feat: add weekly cleanup workflow for stale CI schemas #943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
213d12a
4605b7f
fbe4059
a124fbe
fb87fe6
f327572
c373f8a
a9f398d
fe9ff60
5c58b2e
572eeca
dcd447a
1c6b4db
f85599c
5adb93f
dbd9047
3265525
1001681
357d9a2
14a0995
6913c74
3ca9f64
45c65ef
320a250
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| name: Cleanup stale CI schemas | ||
|
|
||
| on: | ||
| schedule: | ||
| # Every Sunday at 03:00 UTC | ||
| - cron: "0 3 * * 0" | ||
| workflow_dispatch: | ||
| inputs: | ||
| max-age-hours: | ||
| type: string | ||
| required: false | ||
| default: "24" | ||
| description: Drop schemas older than this many hours | ||
|
|
||
| env: | ||
| TESTS_DIR: ${{ github.workspace }}/dbt-data-reliability/integration_tests | ||
|
|
||
| jobs: | ||
| cleanup: | ||
| runs-on: ubuntu-latest | ||
| strategy: | ||
| fail-fast: false | ||
| matrix: | ||
| warehouse-type: | ||
| - snowflake | ||
| - bigquery | ||
| - redshift | ||
| - databricks_catalog | ||
| - athena | ||
| steps: | ||
| - name: Checkout dbt package | ||
| uses: actions/checkout@v4 | ||
| with: | ||
| path: dbt-data-reliability | ||
|
|
||
| - name: Setup Python | ||
| uses: actions/setup-python@v6 | ||
| with: | ||
| python-version: "3.10" | ||
| cache: "pip" | ||
|
|
||
| - name: Install dbt | ||
| run: > | ||
| pip install | ||
| "dbt-core" | ||
| "dbt-${{ (matrix.warehouse-type == 'databricks_catalog' && 'databricks') || (matrix.warehouse-type == 'athena' && 'athena-community') || matrix.warehouse-type }}" | ||
|
|
||
| - name: Write dbt profiles | ||
| env: | ||
| CI_WAREHOUSE_SECRETS: ${{ secrets.CI_WAREHOUSE_SECRETS || '' }} | ||
| run: | | ||
| # The cleanup job doesn't create schemas, but generate_profiles.py | ||
| # requires --schema-name. Use a dummy value. | ||
| python "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/generate_profiles.py" \ | ||
| --template "${{ github.workspace }}/dbt-data-reliability/integration_tests/profiles/profiles.yml.j2" \ | ||
| --output ~/.dbt/profiles.yml \ | ||
| --schema-name "cleanup_placeholder" | ||
|
|
||
| - name: Install dbt deps | ||
| working-directory: ${{ env.TESTS_DIR }}/dbt_project | ||
| run: dbt deps | ||
|
|
||
| - name: Symlink local elementary package | ||
| run: ln -sfn ${{ github.workspace }}/dbt-data-reliability ${{ env.TESTS_DIR }}/dbt_project/dbt_packages/elementary | ||
|
|
||
| - name: Drop stale CI schemas | ||
| working-directory: ${{ env.TESTS_DIR }}/dbt_project | ||
| # Only dbt_ prefixed schemas are created in this repo's CI. | ||
| # The elementary repo has its own workflow for py_ prefixed schemas. | ||
| run: > | ||
| dbt run-operation elementary.drop_stale_ci_schemas | ||
| --args '{prefixes: ["dbt_"], max_age_hours: ${{ inputs.max-age-hours || '24' }}}' | ||
| -t "${{ matrix.warehouse-type }}" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| {# | ||
| Integration-test helper for elementary.drop_stale_ci_schemas. | ||
|
|
||
| Creates two CI-style schemas (one with an old timestamp, one recent), | ||
| runs the cleanup macro, checks which schemas survived, cleans up, | ||
| and returns a JSON result dict. | ||
| #} | ||
|
|
||
| {% macro test_drop_stale_ci_schemas() %} | ||
| {% set database = elementary.target_database() %} | ||
| {% set now = modules.datetime.datetime.utcnow() %} | ||
|
|
||
| {# Old schema: timestamp in the past (2020-01-01 00:00:00) #} | ||
| {% set old_schema = 'dbt_200101_000000_citest_00000000' %} | ||
| {# Recent schema: timestamp = now #} | ||
| {% set recent_ts = now.strftime('%y%m%d_%H%M%S') %} | ||
| {% set recent_schema = 'dbt_' ~ recent_ts ~ '_citest_11111111' %} | ||
|
|
||
| {{ log("TEST: creating old schema: " ~ old_schema, info=true) }} | ||
| {{ log("TEST: creating recent schema: " ~ recent_schema, info=true) }} | ||
|
|
||
| {# ── Create both schemas ───────────────────────────────────────────── #} | ||
| {% do elementary_tests.edr_create_schema(database, old_schema) %} | ||
| {% do elementary_tests.edr_create_schema(database, recent_schema) %} | ||
|
|
||
| {# ── Verify both exist before running cleanup ──────────────────────── #} | ||
| {% set old_exists_before = elementary.ci_schema_exists(database, old_schema) %} | ||
| {% set recent_exists_before = elementary.ci_schema_exists(database, recent_schema) %} | ||
| {{ log("TEST: old_exists_before=" ~ old_exists_before ~ ", recent_exists_before=" ~ recent_exists_before, info=true) }} | ||
|
|
||
| {# ── Run cleanup with a large threshold so only the artificially old | ||
| schema (year 2020) is caught, and real CI schemas from parallel | ||
| workers are safely below the threshold. ──────────────────────────── #} | ||
| {% do elementary.drop_stale_ci_schemas(prefixes=['dbt_'], max_age_hours=8760) %} | ||
|
|
||
| {# ── Check which schemas survived ─────────────────────────────────── #} | ||
| {% set old_exists_after = elementary.ci_schema_exists(database, old_schema) %} | ||
| {% set recent_exists_after = elementary.ci_schema_exists(database, recent_schema) %} | ||
| {{ log("TEST: old_exists_after=" ~ old_exists_after ~ ", recent_exists_after=" ~ recent_exists_after, info=true) }} | ||
|
|
||
| {# ── Cleanup: drop any remaining test schemas ─────────────────────── #} | ||
| {% if old_exists_after is true %} | ||
| {% do elementary.drop_ci_schema(database, old_schema) %} | ||
| {% endif %} | ||
| {% if recent_exists_after %} | ||
| {% do elementary.drop_ci_schema(database, recent_schema) %} | ||
| {% endif %} | ||
|
|
||
| {# ── Return results ────────────────────────────────────────────────── #} | ||
| {% set results = { | ||
| "old_exists_before": old_exists_before, | ||
| "recent_exists_before": recent_exists_before, | ||
| "old_dropped": not old_exists_after, | ||
| "recent_kept": recent_exists_after | ||
| } %} | ||
| {% do return(results) %} | ||
| {% endmacro %} | ||
|
|
||
|
|
||
| {# ── Per-adapter schema creation ─────────────────────────────────────── #} | ||
|
|
||
| {% macro edr_create_schema(database, schema_name) %} | ||
| {% do return(adapter.dispatch('edr_create_schema', 'elementary_tests')(database, schema_name)) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro default__edr_create_schema(database, schema_name) %} | ||
| {% set schema_relation = api.Relation.create(database=database, schema=schema_name) %} | ||
| {% do dbt.create_schema(schema_relation) %} | ||
| {% do adapter.commit() %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro clickhouse__edr_create_schema(database, schema_name) %} | ||
| {% do run_query("CREATE DATABASE IF NOT EXISTS `" ~ schema_name ~ "`") %} | ||
| {% do adapter.commit() %} | ||
| {% endmacro %} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| """Integration test for elementary.drop_stale_ci_schemas macro.""" | ||
|
|
||
| import json | ||
|
|
||
| import pytest | ||
| from dbt_project import DbtProject | ||
|
|
||
|
|
||
| @pytest.mark.skip_targets(["dremio"]) | ||
| def test_drop_stale_ci_schemas(dbt_project: DbtProject): | ||
| """Verify that old CI schemas are dropped and recent ones are kept.""" | ||
| result = dbt_project.dbt_runner.run_operation( | ||
| "elementary_tests.test_drop_stale_ci_schemas", | ||
| ) | ||
| assert result, "run_operation returned no output" | ||
| data = json.loads(result[0]) | ||
|
|
||
| assert data["old_exists_before"], "Setup failed: old schema was not created" | ||
| assert data["recent_exists_before"], "Setup failed: recent schema was not created" | ||
| assert data["old_dropped"], "Old schema should have been dropped by cleanup" | ||
| assert data["recent_kept"], "Recent schema should have been kept by cleanup" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| {# | ||
| drop_stale_ci_schemas – clean up timestamped CI schemas. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this macro be in the integration tests project?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — moved the entire macro to |
||
|
|
||
| Schema naming convention produced by CI: | ||
| <prefix><YYMMDD_HHMMSS>_<branch>_<hash> | ||
| Examples: | ||
| dbt_260228_112345_master_abcd1234 | ||
| py_260228_112345_master_abcd1234 | ||
| dbt_260228_112345_master_abcd1234_elementary (suffixed variant) | ||
|
|
||
| Call from a GitHub Actions workflow via: | ||
| dbt run-operation elementary.drop_stale_ci_schemas \ | ||
| --args '{prefixes: ["dbt_", "py_"], max_age_hours: 24}' | ||
| #} | ||
|
|
||
| {% macro drop_stale_ci_schemas(prefixes=None, max_age_hours=24) %} | ||
|
haritamar marked this conversation as resolved.
Outdated
|
||
| {% if prefixes is none %} | ||
| {% set prefixes = ['dbt_', 'py_'] %} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's make prefixes a mandatory input, and fail if it is not specified
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — |
||
| {% endif %} | ||
|
|
||
| {% set max_age_hours = max_age_hours | int %} | ||
| {% set database = elementary.target_database() %} | ||
| {% set all_schemas = elementary.list_ci_schemas(database) %} | ||
| {# utcnow() is deprecated in Python 3.12+ but modules.datetime.timezone is not | ||
| available in dbt's Jinja context. Both now and strptime produce naive datetimes | ||
| so comparisons are safe. #} | ||
| {% set now = modules.datetime.datetime.utcnow() %} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: In Python 3.12, Replacementfrom datetime import datetime, UTC
now_utc = datetime.now(UTC)If you need compatibility with older Python versionsfrom datetime import datetime, timezone
now_utc = datetime.now(timezone.utc)Related:
|
||
| {% set max_age_seconds = max_age_hours * 3600 %} | ||
| {% set ns = namespace(dropped=0) %} | ||
|
|
||
| {{ log("CI schema cleanup: scanning " ~ all_schemas | length ~ " schema(s) in database '" ~ database ~ "' for prefixes " ~ prefixes | string, info=true) }} | ||
|
|
||
| {% for schema_name in all_schemas | sort %} | ||
| {% set schema_lower = schema_name.lower() %} | ||
| {% for prefix in prefixes %} | ||
| {% if schema_lower.startswith(prefix.lower()) %} | ||
| {% set remainder = schema_lower[prefix | length :] %} | ||
| {# Timestamp format: YYMMDD_HHMMSS (13 chars) followed by _ #} | ||
| {% if remainder | length >= 14 and remainder[6:7] == '_' and remainder[13:14] == '_' %} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — replaced the manual character-by-character validation with Named it |
||
| {% set ts_str = remainder[:13] %} | ||
| {# Validate: positions 0-5 and 7-12 must be digits #} | ||
| {% set digits = ts_str[:6] ~ ts_str[7:] %} | ||
| {% set ns_valid = namespace(ok=true) %} | ||
| {% for c in digits %} | ||
| {% if c not in '0123456789' %} | ||
| {% set ns_valid.ok = false %} | ||
| {% endif %} | ||
| {% endfor %} | ||
| {% if ns_valid.ok %} | ||
| {# Validate date component ranges before strptime to avoid ValueError #} | ||
| {% set mm = ts_str[2:4] | int %} | ||
| {% set dd = ts_str[4:6] | int %} | ||
| {% set hh = ts_str[7:9] | int %} | ||
| {% set mi = ts_str[9:11] | int %} | ||
| {% set ss = ts_str[11:13] | int %} | ||
| {% if 1 <= mm <= 12 and 1 <= dd <= 31 and 0 <= hh <= 23 and 0 <= mi <= 59 and 0 <= ss <= 59 %} | ||
| {% set schema_ts = modules.datetime.datetime.strptime(ts_str, '%y%m%d_%H%M%S') %} | ||
| {% set age_seconds = (now - schema_ts).total_seconds() %} | ||
| {% if age_seconds > max_age_seconds %} | ||
| {{ log(" DROP " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }} | ||
| {% do elementary.drop_ci_schema(database, schema_name) %} | ||
| {% set ns.dropped = ns.dropped + 1 %} | ||
| {% else %} | ||
| {{ log(" keep " ~ schema_name ~ " (age: " ~ (age_seconds / 3600) | round(1) ~ " h)", info=true) }} | ||
| {% endif %} | ||
| {% else %} | ||
| {{ log(" skip " ~ schema_name ~ " (invalid timestamp: " ~ ts_str ~ ")", info=true) }} | ||
| {% endif %} | ||
| {% endif %} | ||
| {% endif %} | ||
| {% endif %} | ||
| {% endfor %} | ||
| {% endfor %} | ||
|
|
||
| {{ log("CI schema cleanup complete. Dropped " ~ ns.dropped ~ " stale schema(s).", info=true) }} | ||
| {% endmacro %} | ||
|
|
||
|
|
||
| {# ── Per-adapter schema drop ─────────────────────────────────────────── #} | ||
|
|
||
| {# ── Per-adapter schema listing ─────────────────────────────────────── #} | ||
|
|
||
| {% macro list_ci_schemas(database) %} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this can be in its own file? (also in the integration tests project)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — |
||
| {% do return(adapter.dispatch('list_ci_schemas', 'elementary')(database)) %} | ||
| {% endmacro %} | ||
|
|
||
| {# adapter.list_schemas() is not available in run-operation context | ||
| (RuntimeDatabaseWrapper does not expose it). Use information_schema instead. #} | ||
| {% macro default__list_ci_schemas(database) %} | ||
| {% set results = run_query("SELECT schema_name FROM information_schema.schemata") %} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a safety, can we just list schemas in the CI db? (the one in the profiles)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — added |
||
| {% set schemas = [] %} | ||
| {% for row in results %} | ||
| {% do schemas.append(row[0]) %} | ||
| {% endfor %} | ||
| {% do return(schemas) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro bigquery__list_ci_schemas(database) %} | ||
| {% set results = run_query("SELECT schema_name FROM `" ~ database ~ "`.INFORMATION_SCHEMA.SCHEMATA") %} | ||
| {% set schemas = [] %} | ||
| {% for row in results %} | ||
| {% do schemas.append(row[0]) %} | ||
| {% endfor %} | ||
| {% do return(schemas) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro clickhouse__list_ci_schemas(database) %} | ||
| {% set results = run_query('SHOW DATABASES') %} | ||
| {% set schemas = [] %} | ||
| {% for row in results %} | ||
| {% do schemas.append(row[0]) %} | ||
| {% endfor %} | ||
| {% do return(schemas) %} | ||
| {% endmacro %} | ||
|
|
||
|
|
||
| {# ── Per-adapter schema drop ─────────────────────────────────────────── #} | ||
|
|
||
| {% macro drop_ci_schema(database, schema_name) %} | ||
| {% do return(adapter.dispatch('drop_ci_schema', 'elementary')(database, schema_name)) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro default__drop_ci_schema(database, schema_name) %} | ||
| {% set schema_relation = api.Relation.create(database=database, schema=schema_name) %} | ||
| {% do dbt.drop_schema(schema_relation) %} | ||
| {% do adapter.commit() %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro clickhouse__drop_ci_schema(database, schema_name) %} | ||
| {% do run_query("DROP DATABASE IF EXISTS `" ~ schema_name ~ "`") %} | ||
| {% do adapter.commit() %} | ||
| {% endmacro %} | ||
|
|
||
|
|
||
| {# ── Per-adapter schema existence check (run-operation safe) ──────── #} | ||
| {# adapter.check_schema_exists() is not available in run-operation context. | ||
| This is only used by the integration test; the main macro does not need it. #} | ||
|
|
||
| {% macro ci_schema_exists(database, schema_name) %} | ||
| {% do return(adapter.dispatch('ci_schema_exists', 'elementary')(database, schema_name)) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro default__ci_schema_exists(database, schema_name) %} | ||
| {% set result = run_query("SELECT schema_name FROM information_schema.schemata WHERE lower(schema_name) = lower('" ~ schema_name ~ "')") %} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. likewise, only consider schemas in the CI database.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done — added |
||
| {% do return(result | length > 0) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro bigquery__ci_schema_exists(database, schema_name) %} | ||
| {% set result = run_query("SELECT schema_name FROM `" ~ database ~ "`.INFORMATION_SCHEMA.SCHEMATA WHERE lower(schema_name) = lower('" ~ schema_name ~ "')") %} | ||
| {% do return(result | length > 0) %} | ||
| {% endmacro %} | ||
|
|
||
| {% macro clickhouse__ci_schema_exists(database, schema_name) %} | ||
| {% set result = run_query("SELECT 1 FROM system.databases WHERE name = '" ~ schema_name ~ "' LIMIT 1") %} | ||
| {% do return(result | length > 0) %} | ||
| {% endmacro %} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 149
🌐 Web query:
Python 3.12 datetime.utcnow() deprecation removal💡 Result:
datetime.datetime.utcnow()(andutcfromtimestamp()) were deprecated in Python 3.12 and now emit aDeprecationWarningsaying they’re “scheduled for removal in a future version.” The official replacement is timezone-aware UTC datetimes, e.g.datetime.datetime.now(tz=datetime.UTC). [1]utcnow()is still present (still marked “Deprecated since version 3.12”), which indicates it has not been removed yet, and no specific removal version is stated in the docs. [2]Recommended migration:
Sources: [1] [2] [3]
🏁 Script executed:
cat -n ./integration_tests/dbt_project/macros/test_drop_stale_ci_schemas.sql | head -30Repository: elementary-data/dbt-data-reliability
Length of output: 1691
🏁 Script executed:
Repository: elementary-data/dbt-data-reliability
Length of output: 818
🌐 Web query:
dbt Jinja modules.datetime.timezone Python version compatibility💡 Result:
modules.datetime.timezoneis part of Python’s standard librarydatetimemodule, and it’s available in all modern Python 3 versions that dbt supports—so it’s not a dbt/Python-version incompatibility issue in practice.modulesobject exposes Python stdlib modules, andmodules.datetimeis commonly used for date/time logic in macros (e.g.,modules.datetime.datetime.now(),modules.datetime.timedelta(...)). That same object should also exposemodules.datetime.timezone(e.g.,modules.datetime.timezone.utc). [3]datetime.timezonewill be present. [1]datetime.timezoneexists. [2]If you’re seeing an error like “
timezoneis undefined” or “has no attributetimezone”, it’s more likely due to how the Jinja context is being executed (e.g., a non-dbt Jinja renderer, older tooling that doesn’t providemodules, or a linter/templater mismatch), not Python’sdatetimemodule.Sources: [1] [2] [3]
Use timezone-aware UTC instead of
utcnow()in the test macro.The
datetime.utcnow()method is deprecated in Python 3.12+ and will be removed in a future version. Replace it withdatetime.now(timezone.utc)to maintain forward compatibility and avoid runtime warnings.♻️ Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents