-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat: implement fast FQN prefix-based bulk deletion to fix race conditions and slow cascade #27108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a5f7bc5
b9eb147
5a0d7d2
4c2c690
6b266b8
40583bc
7fe9212
9cd10af
8343288
29cb761
9970d96
670e3d2
7181840
64735da
da2f672
170e0ed
a5625ec
f13f96a
ca4183b
9321871
a570eb1
01be3c6
7bd398b
a70140b
16db11d
682f0f6
9227427
9380afa
9c3e4d6
e13ce2e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -150,3 +150,20 @@ FROM user_entity ue, role_entity re | |||||||||||||||||||||||||||
| WHERE ue.name = 'mcpapplicationbot' | ||||||||||||||||||||||||||||
| AND re.name = 'ApplicationBotImpersonationRole' | ||||||||||||||||||||||||||||
| ON CONFLICT DO NOTHING; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| -- Add FQN hash columns to entity_relationship to enable fast prefix-based bulk deletion. | ||||||||||||||||||||||||||||
| -- This allows deleting all relationships for an entire entity subtree in a single indexed query | ||||||||||||||||||||||||||||
| -- instead of walking the tree entity-by-entity. | ||||||||||||||||||||||||||||
| ALTER TABLE entity_relationship | ||||||||||||||||||||||||||||
| ADD COLUMN IF NOT EXISTS fromFQNHash VARCHAR(768) DEFAULT NULL, | ||||||||||||||||||||||||||||
| ADD COLUMN IF NOT EXISTS toFQNHash VARCHAR(768) DEFAULT NULL; | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| CREATE INDEX IF NOT EXISTS idx_er_from_fqn_hash ON entity_relationship (fromFQNHash); | ||||||||||||||||||||||||||||
| CREATE INDEX IF NOT EXISTS idx_er_to_fqn_hash ON entity_relationship (toFQNHash); | ||||||||||||||||||||||||||||
|
Comment on lines
+154
to
+162
|
||||||||||||||||||||||||||||
| -- Add FQN hash columns to entity_relationship to enable fast prefix-based bulk deletion. | |
| -- This allows deleting all relationships for an entire entity subtree in a single indexed query | |
| -- instead of walking the tree entity-by-entity. | |
| ALTER TABLE entity_relationship | |
| ADD COLUMN IF NOT EXISTS fromFQNHash VARCHAR(768) DEFAULT NULL, | |
| ADD COLUMN IF NOT EXISTS toFQNHash VARCHAR(768) DEFAULT NULL; | |
| CREATE INDEX IF NOT EXISTS idx_er_from_fqn_hash ON entity_relationship (fromFQNHash); | |
| CREATE INDEX IF NOT EXISTS idx_er_to_fqn_hash ON entity_relationship (toFQNHash); | |
| -- NOTE: | |
| -- Do not add new entity_relationship schema changes to this historical 1.13.0 migration. | |
| -- The fromFQNHash/toFQNHash columns and their indexes must be added in the current | |
| -- release migration so existing installations receive them during upgrade. |
Copilot
AI
Apr 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding new DDL to an older, already-shipped migration version (1.13.0) is unsafe for upgrades: databases that have already applied 1.13.0 will not re-run this file, so they’ll never get the new fromFQNHash/toFQNHash columns/indexes required by prefix deletion. Please introduce this DDL in the latest migration version (and keep 1.13.0 immutable) so upgrades are deterministic.
| -- Add FQN hash columns to entity_relationship to enable fast prefix-based bulk deletion. | |
| -- This allows deleting all relationships for an entire entity subtree in a single indexed query | |
| -- instead of walking the tree entity-by-entity. | |
| ALTER TABLE entity_relationship | |
| ADD COLUMN IF NOT EXISTS fromFQNHash VARCHAR(768) DEFAULT NULL, | |
| ADD COLUMN IF NOT EXISTS toFQNHash VARCHAR(768) DEFAULT NULL; | |
| CREATE INDEX IF NOT EXISTS idx_er_from_fqn_hash ON entity_relationship (fromFQNHash); | |
| CREATE INDEX IF NOT EXISTS idx_er_to_fqn_hash ON entity_relationship (toFQNHash); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| -- No schema changes in this version. | ||
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| -- No schema changes in this version. | ||
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,257 @@ | ||
| /* | ||
| * Copyright 2021 Collate | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.openmetadata.it.tests; | ||
|
|
||
| import java.net.URI; | ||
| import java.net.http.HttpClient; | ||
| import java.net.http.HttpRequest; | ||
| import java.net.http.HttpResponse; | ||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
| import org.awaitility.Awaitility; | ||
| import org.junit.jupiter.api.BeforeAll; | ||
| import org.junit.jupiter.api.Disabled; | ||
| import org.junit.jupiter.api.Tag; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.extension.ExtendWith; | ||
| import org.openmetadata.it.factories.DatabaseSchemaTestFactory; | ||
| import org.openmetadata.it.factories.DatabaseServiceTestFactory; | ||
| import org.openmetadata.it.factories.DatabaseTestFactory; | ||
| import org.openmetadata.it.factories.TableTestFactory; | ||
| import org.openmetadata.it.util.SdkClients; | ||
| import org.openmetadata.it.util.TestNamespace; | ||
| import org.openmetadata.it.util.TestNamespaceExtension; | ||
| import org.openmetadata.schema.entity.data.Database; | ||
| import org.openmetadata.schema.entity.data.DatabaseSchema; | ||
| import org.openmetadata.schema.entity.services.DatabaseService; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Benchmark comparing old recursive hard delete vs new FQN prefix hard delete. | ||
| * | ||
| * <p>Default topology: 5 databases × 5 schemas × 400 tables = 10,000 tables per service | ||
| * (~10,031 total entities including service, databases, schemas). | ||
| * | ||
| * <p>Run manually against a local stack: | ||
| * | ||
| * <pre> | ||
| * mvn verify -pl openmetadata-integration-tests \ | ||
| * -Dgroups=benchmark \ | ||
| * -Dit.test=PrefixDeletionBenchmarkIT \ | ||
| * -Dtest.databases=5 # databases per service (default: 5) | ||
| * -Dtest.schemas=5 # schemas per database (default: 5) | ||
| * -Dtest.tables=400 # tables per schema (default: 400) | ||
| * -Dtest.seedThreads=32 # parallel seed threads (default: 32) | ||
| * </pre> | ||
| * | ||
| * <p>NOTE: Setup creates entities in parallel (default 32 threads, tunable via | ||
| * -Dtest.seedThreads). At ~50ms/call and 32 threads, 10k tables seed in ~20 s. | ||
| * | ||
| * <p>Both deletions are timed end-to-end: the old delete is synchronous; the new prefix | ||
| * delete is async (202), so we poll until the service is gone before recording elapsed time. | ||
| */ | ||
| @Tag("benchmark") | ||
| @Disabled("Manual benchmark — run explicitly against a local mysql/postgres stack") | ||
| @ExtendWith(TestNamespaceExtension.class) | ||
| class PrefixDeletionBenchmarkIT { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(PrefixDeletionBenchmarkIT.class); | ||
|
|
||
| private static final int DATABASES_PER_SERVICE = Integer.getInteger("test.databases", 5); | ||
| private static final int SCHEMAS_PER_DATABASE = Integer.getInteger("test.schemas", 5); | ||
| private static final int TABLES_PER_SCHEMA = Integer.getInteger("test.tables", 400); | ||
| private static final int SEED_THREADS = Integer.getInteger("test.seedThreads", 32); | ||
|
|
||
| private static final Duration DELETE_POLL_TIMEOUT = Duration.ofMinutes(10); | ||
| private static final Duration DELETE_POLL_INTERVAL = Duration.ofSeconds(2); | ||
|
|
||
| @BeforeAll | ||
| static void setup() { | ||
| SdkClients.adminClient(); | ||
| } | ||
|
|
||
| @Test | ||
| void benchmark_oldRecursiveHardDelete_vs_newPrefixDelete(TestNamespace ns) throws Exception { | ||
| int totalTables = DATABASES_PER_SERVICE * SCHEMAS_PER_DATABASE * TABLES_PER_SCHEMA; | ||
| int totalEntities = | ||
| 1 + DATABASES_PER_SERVICE + DATABASES_PER_SERVICE * SCHEMAS_PER_DATABASE + totalTables; | ||
| LOG.info( | ||
| "Benchmark topology: {} databases × {} schemas × {} tables = {} tables, {} total entities per service", | ||
| DATABASES_PER_SERVICE, | ||
| SCHEMAS_PER_DATABASE, | ||
| TABLES_PER_SCHEMA, | ||
| totalTables, | ||
| totalEntities); | ||
|
|
||
| DatabaseService oldService = buildHierarchy(ns, "old"); | ||
| long oldMs = timeOldDelete(oldService); | ||
|
|
||
| DatabaseService newService = buildHierarchy(ns, "new"); | ||
| long newMs = timeNewDelete(newService); | ||
|
|
||
| double speedup = (double) oldMs / Math.max(newMs, 1); | ||
| LOG.info("=== Deletion Benchmark Results ({} entities per service) ===", totalEntities); | ||
| LOG.info(" Old recursive hard delete : {} ms", oldMs); | ||
| LOG.info(" New FQN prefix hard delete: {} ms", newMs); | ||
| LOG.info(" Speedup : {}x", String.format("%.2f", speedup)); | ||
| } | ||
|
|
||
| private DatabaseService buildHierarchy(TestNamespace ns, String tag) throws Exception { | ||
| DatabaseService service = DatabaseServiceTestFactory.createPostgres(ns); | ||
| int totalEntities = | ||
| 1 | ||
| + DATABASES_PER_SERVICE | ||
| + DATABASES_PER_SERVICE * SCHEMAS_PER_DATABASE | ||
| + DATABASES_PER_SERVICE * SCHEMAS_PER_DATABASE * TABLES_PER_SCHEMA; | ||
| LOG.info( | ||
| "[{}] Seeding {} entities under service {} using {} threads ...", | ||
| tag, | ||
| totalEntities, | ||
| service.getName(), | ||
| SEED_THREADS); | ||
| long seedStart = System.currentTimeMillis(); | ||
|
|
||
| ExecutorService pool = Executors.newFixedThreadPool(SEED_THREADS); | ||
| try { | ||
| List<Future<Database>> dbFutures = new ArrayList<>(); | ||
| for (int d = 0; d < DATABASES_PER_SERVICE; d++) { | ||
| final int dIdx = d; | ||
| dbFutures.add( | ||
| pool.submit( | ||
| () -> | ||
| DatabaseTestFactory.createWithName( | ||
| ns, service.getFullyQualifiedName(), tag + "db" + dIdx))); | ||
| } | ||
| List<Database> databases = new ArrayList<>(); | ||
| for (Future<Database> f : dbFutures) { | ||
| databases.add(f.get()); | ||
| } | ||
|
|
||
| List<Future<DatabaseSchema>> schemaFutures = new ArrayList<>(); | ||
| for (int d = 0; d < databases.size(); d++) { | ||
| final Database database = databases.get(d); | ||
| final int dIdx = d; | ||
| for (int s = 0; s < SCHEMAS_PER_DATABASE; s++) { | ||
| final int sIdx = s; | ||
| schemaFutures.add( | ||
| pool.submit( | ||
| () -> | ||
| DatabaseSchemaTestFactory.createWithName( | ||
| ns, database.getFullyQualifiedName(), tag + "sc" + dIdx + "x" + sIdx))); | ||
| } | ||
| } | ||
| List<DatabaseSchema> schemas = new ArrayList<>(); | ||
| for (Future<DatabaseSchema> f : schemaFutures) { | ||
| schemas.add(f.get()); | ||
| } | ||
|
|
||
| List<Future<?>> tableFutures = new ArrayList<>(); | ||
| for (int s = 0; s < schemas.size(); s++) { | ||
| final DatabaseSchema schema = schemas.get(s); | ||
| final int sIdx = s; | ||
| for (int t = 0; t < TABLES_PER_SCHEMA; t++) { | ||
| final int tIdx = t; | ||
| tableFutures.add( | ||
| pool.submit( | ||
| () -> { | ||
| TableTestFactory.createWithName( | ||
| ns, schema.getFullyQualifiedName(), tag + "tbl" + sIdx + "x" + tIdx); | ||
| return null; | ||
| })); | ||
| } | ||
| } | ||
| for (Future<?> f : tableFutures) { | ||
| f.get(); | ||
| } | ||
| } finally { | ||
| pool.shutdown(); | ||
| pool.awaitTermination(30, TimeUnit.MINUTES); | ||
| } | ||
|
|
||
| long seedMs = System.currentTimeMillis() - seedStart; | ||
| LOG.info( | ||
| "[{}] Hierarchy seeded in {} ms ({} ms/entity avg)", | ||
| tag, | ||
| seedMs, | ||
| seedMs / Math.max(totalEntities, 1)); | ||
| return service; | ||
| } | ||
|
|
||
| private long timeOldDelete(DatabaseService service) throws Exception { | ||
| LOG.info("Timing OLD recursive hard delete for service {} ...", service.getName()); | ||
| long start = System.currentTimeMillis(); | ||
|
|
||
| String url = | ||
| SdkClients.getServerUrl() | ||
| + "/v1/services/databaseServices/" | ||
| + service.getId() | ||
| + "?hardDelete=true&recursive=true"; | ||
| sendDelete(url); | ||
|
|
||
| long elapsed = System.currentTimeMillis() - start; | ||
| LOG.info("OLD recursive hard delete completed in {} ms", elapsed); | ||
| return elapsed; | ||
| } | ||
|
|
||
| private long timeNewDelete(DatabaseService service) throws Exception { | ||
| LOG.info("Timing NEW FQN prefix hard delete for service {} ...", service.getName()); | ||
| long start = System.currentTimeMillis(); | ||
|
|
||
| String url = | ||
| SdkClients.getServerUrl() + "/v1/services/databaseServices/prefix/" + service.getId(); | ||
| sendDelete(url); | ||
|
|
||
| // Prefix delete is async — poll until the service is actually gone so we measure | ||
| // real deletion time, not just the time to hand off the job to the executor. | ||
| UUID serviceId = service.getId(); | ||
| Awaitility.await("Wait for prefix deletion of " + service.getName() + " to complete") | ||
| .atMost(DELETE_POLL_TIMEOUT) | ||
| .pollInterval(DELETE_POLL_INTERVAL) | ||
| .until( | ||
| () -> { | ||
| try { | ||
| SdkClients.adminClient().databaseServices().get(serviceId.toString()); | ||
| return false; | ||
| } catch (Exception e) { | ||
| return true; | ||
| } | ||
| }); | ||
|
|
||
| long elapsed = System.currentTimeMillis() - start; | ||
| LOG.info("NEW FQN prefix hard delete completed in {} ms", elapsed); | ||
| return elapsed; | ||
| } | ||
|
Comment on lines
+198
to
+241
|
||
|
|
||
| private void sendDelete(String url) throws Exception { | ||
| HttpRequest request = | ||
| HttpRequest.newBuilder() | ||
| .uri(URI.create(url)) | ||
| .header("Authorization", "Bearer " + SdkClients.getAdminToken()) | ||
| .DELETE() | ||
| .build(); | ||
| HttpResponse<String> response = | ||
| HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); | ||
| if (response.statusCode() >= 300) { | ||
| throw new RuntimeException( | ||
| "Delete failed with status " + response.statusCode() + ": " + response.body()); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These schema changes are being appended to the existing 1.13.0 migration. If a database was already upgraded past 1.13.0, the migration framework will not re-run 1.13.0, so the new columns/indexes will never be applied.
To ensure upgrades work correctly, the DDL should be added to the current/new migration version (e.g. 1.14.1) rather than modifying an older migration script.