feat(ingestion): incremental metadata extraction for Unity Catalog#28380
feat(ingestion): incremental metadata extraction for Unity Catalog#28380ulixius9 wants to merge 1 commit into
Conversation
Detect changed tables via information_schema.tables.last_altered and deleted tables via system.access.audit deleteTable events, mirroring the Snowflake/BigQuery incremental pattern. Degrades to no-op delete detection when the audit schema is unavailable. Tables present in both the changed and deleted sets (dropped and recreated within the window) are kept rather than deleted. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| AND action_name = 'deleteTable' | ||
| AND event_date >= date(timestamp_millis({start_timestamp})) | ||
| AND event_time >= timestamp_millis({start_timestamp}) | ||
| AND request_params.full_name_arg LIKE '{catalog}.%%' |
There was a problem hiding this comment.
⚠️ Bug: SQL LIKE treats _ in catalog name as single-char wildcard
In UNITY_CATALOG_GET_DELETED_TABLES, the filter request_params.full_name_arg LIKE '{catalog}.%%' uses the catalog name directly in a LIKE pattern. Since _ is a single-character wildcard in SQL, a catalog named e.g. my_catalog will also match myXcatalog.schema.table or any other catalog where the underscore position has an arbitrary character. This could cause tables from other catalogs to be incorrectly marked as deleted.
Unity Catalog catalog names commonly contain underscores, so this is a realistic scenario.
Fix 1: Use ESCAPE clause -- but this only helps if you also escape _ and % in the catalog name at format-time.
AND request_params.full_name_arg LIKE '{catalog}.%%' ESCAPE '\'
- Apply fix
Fix 2: Use an exact match on the first segment of the dot-delimited name instead of LIKE. This avoids wildcard issues entirely and is clearer in intent.
AND split(request_params.full_name_arg, '.')[0] = '{catalog}'
- Apply fix
Check a box to apply a fix or reply for a change | Was this helpful? React with 👍 / 👎
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
There was a problem hiding this comment.
Pull request overview
This PR adds incremental metadata extraction support to the Unity Catalog ingestion connector, aligning it with the existing incremental framework used by other database connectors. After an initial full run, the source can fetch only tables changed since the last successful workflow run and explicitly mark dropped tables as deleted.
Changes:
- Add Unity Catalog incremental discovery path that fetches only changed tables (via
information_schema.tables.last_altered) and tracks deleted tables (viasystem.access.auditdeleteTableevents). - Introduce a dedicated
UnityCatalogIncrementalTableProcessorto populate changed/deleted table maps per catalog. - Add unit tests covering changed/deleted detection, graceful degradation when system schemas aren’t available, and incremental vs full-path behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py |
Wires IncrementalConfig into the Unity Catalog source, adds incremental table listing + explicit delete marking. |
ingestion/src/metadata/ingestion/source/database/unitycatalog/queries.py |
Adds SQL templates to detect changed tables and deleted tables since the incremental watermark. |
ingestion/src/metadata/ingestion/source/database/unitycatalog/incremental_table_processor.py |
New helper to execute the incremental queries and bucket results into per-schema changed/deleted sets. |
ingestion/tests/unit/topology/database/test_unitycatalog_incremental.py |
New unit tests validating incremental processor parsing, degradation, and source incremental flow. |
| self.status.failed( | ||
| StackTraceError( | ||
| name=table.name, | ||
| error=f"Unexpected exception to get table [{table.name}]: {exc}", |
| table_map: SchemaToTables = {} | ||
| try: | ||
| rows = self.connection.execute(text(query.format(catalog=catalog, start_timestamp=start_timestamp))) | ||
| for row in rows or []: |
| UNITY_CATALOG_GET_CHANGED_TABLES = textwrap.dedent( | ||
| """ | ||
| SELECT | ||
| table_schema, | ||
| table_name | ||
| FROM `{catalog}`.information_schema.tables | ||
| WHERE last_altered >= timestamp_millis({start_timestamp}) | ||
| """ | ||
| ) | ||
|
|
||
| UNITY_CATALOG_GET_DELETED_TABLES = textwrap.dedent( | ||
| """ | ||
| SELECT DISTINCT request_params.full_name_arg AS table_full_name | ||
| FROM system.access.audit | ||
| WHERE service_name = 'unityCatalog' | ||
| AND action_name = 'deleteTable' | ||
| AND event_date >= date(timestamp_millis({start_timestamp})) | ||
| AND event_time >= timestamp_millis({start_timestamp}) | ||
| AND request_params.full_name_arg LIKE '{catalog}.%%' | ||
| """ | ||
| ) |
|
🟡 Playwright Results — all passed (16 flaky)✅ 4239 passed · ❌ 0 failed · 🟡 16 flaky · ⏭️ 87 skipped
🟡 16 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |


Description
Adds incremental metadata extraction to the Unity Catalog connector, matching the existing Snowflake/BigQuery pattern. After a first full run, only tables changed since the last successful run are processed, and dropped tables are detected explicitly.
information_schema.tables.last_altered, then each changed table is fetched individually via the Databricks SDK (tables.get) — avoids enumerating the whole catalog every run.system.access.auditdeleteTableevents (request_params.full_name_arg). Degrades gracefully (warns and skips delete detection) when thesystem.accessschema is not available.information_schemaonly lists tables that currently exist.IncrementalConfigframework; no JSON schema change (theincrementalconfig already applies to every database connector viasourceConfig.config.incremental).Type of change
How was this tested
test_unitycatalog_incremental.py(17 cases) covering the processor's parsing and graceful degradation, the incremental vs. full discovery path, delete handling, the dropped-and-recreated exclusion, themark_tables_as_deletedincremental/full branches, andcreate()wiring. Full Unity Catalog + incremental suite is green (42 tests).🤖 Generated with Claude Code