Skip to content

Commit 5f5a123

Browse files
harshachclaude
andauthored
feat(dataRetention): sweep orphan time-series rows for per-type tables (#28384)
* feat(dataRetention): sweep orphan time-series rows for per-type tables Adds `cleanOrphanedTimeSeriesRows()` to `DataRetention` alongside the existing orphan-relationship and orphan-tag sweeps. Each of the five affected DAOs gets a `deleteOrphanedRecords(int limit)` query (MySQL + PostgreSQL) that left-joins to its parent and deletes rows the parent no longer covers: - `testCaseResolutionStatus`: parent link via `entity_relationship` PARENT_OF - `agentExecution`: `agentId` → `ai_application_entity.id` - `mcpExecution`: `serverId` → `mcp_server_entity.id` - `profile_data`: `entityFQNHash` → `table_entity.fqnHash` - `query_cost_time_series`: `entityFQNHash` → `query_entity.fqnHash` The sweep runs after `cleanOrphanedRelationshipsAndHierarchies()` so the PARENT_OF check sees the post-cleanup `entity_relationship` state. Pairs with PR #28367, where the bulk hard-delete cascade now skips time-series children and relies on `DataRetention` to reclaim them out-of-band. Adds `OrphanedTimeSeriesCleanupIT` covering all five per-type queries: inserts a real-parent row and a bogus-parent row through the existing DAO `insert(...)` paths, runs `deleteOrphanedRecords(BATCH)`, asserts the orphan is gone and the valid row is preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(dataRetention): fix orphan-cleanup IT name lengths and McpExecution table arg - McpExecutionDAO.insertWithoutExtension uses `<table>` placeholder; the test was passing `null`, which made Jdbi fail to render the SQL template. Pass the literal table name `mcp_execution_entity`. - `ns.prefix(...)` embeds class + method names, so chaining it through database -> schema -> table -> auto-created test_suite pushed the test_suite `name` column past its VARCHAR(256) bound. Use `ns.uniqueShortId()` for the hierarchy components and shorten the test method names so the resulting FQN stays well under the column limit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review(dataRetention): bound profile-data orphan delete by rows + clarify PARENT_OF=9 Addresses two PR review findings: - `profiler_data_time_series.deleteOrphanedRecords` previously LIMITed distinct `entityFQNHash` values, then deleted every row for each hash — a batch could delete tens of millions of rows if many orphan hashes each had thousands of rows. Switch to row-level limiting: single-table `DELETE ... WHERE NOT EXISTS (...) LIMIT N` on MySQL, and `ctid IN (SELECT ... LIMIT N)` on PostgreSQL (the table has no `id` column, so we use Postgres ctid for the inner subquery). This matches the row-count cap used by the other four orphan-cleanup queries. - Annotate `er.relation = 9` in the testCaseResolutionStatus query with a `// 9 = Relationship.PARENT_OF` inline comment plus a leading block comment noting the ordinal is stable because the enum appends new values. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 65149e1 commit 5f5a123

3 files changed

Lines changed: 554 additions & 0 deletions

File tree

Lines changed: 362 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
/*
2+
* Copyright 2025 Collate
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.openmetadata.it.tests;
14+
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertNotNull;
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
18+
19+
import com.fasterxml.jackson.databind.DeserializationFeature;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import java.util.List;
22+
import java.util.UUID;
23+
import org.junit.jupiter.api.BeforeAll;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.junit.jupiter.api.parallel.Execution;
27+
import org.junit.jupiter.api.parallel.ExecutionMode;
28+
import org.openmetadata.it.bootstrap.SharedEntities;
29+
import org.openmetadata.it.bootstrap.TestSuiteBootstrap;
30+
import org.openmetadata.it.util.SdkClients;
31+
import org.openmetadata.it.util.TestNamespace;
32+
import org.openmetadata.it.util.TestNamespaceExtension;
33+
import org.openmetadata.schema.api.ai.CreateMcpServer;
34+
import org.openmetadata.schema.api.data.CreateDatabase;
35+
import org.openmetadata.schema.api.data.CreateDatabaseSchema;
36+
import org.openmetadata.schema.api.data.CreateQuery;
37+
import org.openmetadata.schema.api.data.CreateTable;
38+
import org.openmetadata.schema.api.services.CreateMcpService;
39+
import org.openmetadata.schema.api.tests.CreateTestCase;
40+
import org.openmetadata.schema.api.tests.CreateTestCaseResolutionStatus;
41+
import org.openmetadata.schema.entity.ai.AIApplication;
42+
import org.openmetadata.schema.entity.ai.ApplicationType;
43+
import org.openmetadata.schema.entity.ai.McpServer;
44+
import org.openmetadata.schema.entity.ai.McpServerType;
45+
import org.openmetadata.schema.entity.ai.McpTransportType;
46+
import org.openmetadata.schema.entity.data.Database;
47+
import org.openmetadata.schema.entity.data.DatabaseSchema;
48+
import org.openmetadata.schema.entity.data.Query;
49+
import org.openmetadata.schema.entity.data.Table;
50+
import org.openmetadata.schema.tests.TestCase;
51+
import org.openmetadata.schema.tests.type.TestCaseResolutionStatus;
52+
import org.openmetadata.schema.tests.type.TestCaseResolutionStatusTypes;
53+
import org.openmetadata.schema.type.Column;
54+
import org.openmetadata.schema.type.ColumnDataType;
55+
import org.openmetadata.sdk.client.OpenMetadataClient;
56+
import org.openmetadata.sdk.fluent.AIApplications;
57+
import org.openmetadata.sdk.models.ListParams;
58+
import org.openmetadata.sdk.network.HttpMethod;
59+
import org.openmetadata.sdk.network.RequestOptions;
60+
import org.openmetadata.service.Entity;
61+
import org.openmetadata.service.jdbi3.CollectionDAO;
62+
import org.openmetadata.service.util.FullyQualifiedName;
63+
64+
/**
65+
* Integration tests for {@link CollectionDAO} per-type orphan time-series cleanup queries used by
66+
* {@code DataRetention.cleanOrphanedTimeSeriesRows()}.
67+
*
68+
* <p>Each test inserts one valid row (referencing a real parent entity) and one orphan row
69+
* (referencing a non-existent parent), invokes {@code deleteOrphanedRecords(limit)} on the
70+
* corresponding DAO, and verifies that only the orphan is deleted.
71+
*/
72+
@Execution(ExecutionMode.CONCURRENT)
73+
@ExtendWith(TestNamespaceExtension.class)
74+
public class OrphanedTimeSeriesCleanupIT {
75+
76+
private static final int BATCH = 10_000;
77+
private static final String MCP_SERVICE_NAME = "mcp-orphan-cleanup-svc";
78+
79+
private static final ObjectMapper MAPPER =
80+
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
81+
82+
@BeforeAll
83+
public static void setup() throws Exception {
84+
AIApplications.setDefaultClient(SdkClients.adminClient());
85+
86+
CreateMcpService createService =
87+
new CreateMcpService()
88+
.withName(MCP_SERVICE_NAME)
89+
.withServiceType(CreateMcpService.McpServiceType.Mcp);
90+
SdkClients.adminClient()
91+
.getHttpClient()
92+
.executeForString(
93+
HttpMethod.PUT,
94+
"/v1/services/mcpServices",
95+
createService,
96+
RequestOptions.builder().build());
97+
}
98+
99+
@Test
100+
void agentExecutionOrphans(TestNamespace ns) {
101+
AIApplication app =
102+
AIApplications.create()
103+
.name("agentExecOrph_" + ns.uniqueShortId())
104+
.withApplicationType(ApplicationType.Chatbot)
105+
.withDescription("Parent app for orphan cleanup test")
106+
.execute();
107+
assertNotNull(app.getId());
108+
109+
UUID validId = UUID.randomUUID();
110+
UUID orphanId = UUID.randomUUID();
111+
UUID orphanAgentId = UUID.randomUUID();
112+
113+
insertAgentExecution(validId, app.getId());
114+
insertAgentExecution(orphanId, orphanAgentId);
115+
116+
int deleted = Entity.getCollectionDAO().agentExecutionDAO().deleteOrphanedRecords(BATCH);
117+
118+
assertTrue(deleted >= 1, "Expected at least the inserted orphan row to be deleted");
119+
assertEquals(0, countRowsById("agent_execution_entity", orphanId.toString()));
120+
assertEquals(1, countRowsById("agent_execution_entity", validId.toString()));
121+
}
122+
123+
@Test
124+
void mcpExecutionOrphans(TestNamespace ns) throws Exception {
125+
CreateMcpServer createServer =
126+
new CreateMcpServer()
127+
.withName("mcpOrph-" + ns.uniqueShortId())
128+
.withService(MCP_SERVICE_NAME)
129+
.withServerType(McpServerType.DataAccess)
130+
.withTransportType(McpTransportType.Stdio)
131+
.withDescription("Parent MCP server for orphan cleanup test");
132+
String response =
133+
SdkClients.adminClient()
134+
.getHttpClient()
135+
.executeForString(
136+
HttpMethod.POST, "/v1/mcpServers", createServer, RequestOptions.builder().build());
137+
McpServer server = MAPPER.readValue(response, McpServer.class);
138+
assertNotNull(server.getId());
139+
140+
UUID validId = UUID.randomUUID();
141+
UUID orphanId = UUID.randomUUID();
142+
UUID orphanServerId = UUID.randomUUID();
143+
144+
insertMcpExecution(validId, server.getId());
145+
insertMcpExecution(orphanId, orphanServerId);
146+
147+
int deleted = Entity.getCollectionDAO().mcpExecutionDAO().deleteOrphanedRecords(BATCH);
148+
149+
assertTrue(deleted >= 1, "Expected at least the inserted orphan row to be deleted");
150+
assertEquals(0, countRowsById("mcp_execution_entity", orphanId.toString()));
151+
assertEquals(1, countRowsById("mcp_execution_entity", validId.toString()));
152+
}
153+
154+
@Test
155+
void testCaseResolutionStatusOrphans(TestNamespace ns) throws Exception {
156+
OpenMetadataClient client = SdkClients.adminClient();
157+
Table table = createTable(ns, "tcrs");
158+
TestCase testCase = createTestCase(table, "tcrsCase_" + ns.uniqueShortId());
159+
160+
CreateTestCaseResolutionStatus createStatus =
161+
new CreateTestCaseResolutionStatus()
162+
.withTestCaseResolutionStatusType(TestCaseResolutionStatusTypes.New)
163+
.withTestCaseReference(testCase.getFullyQualifiedName());
164+
TestCaseResolutionStatus validStatus = client.testCaseResolutionStatuses().create(createStatus);
165+
assertNotNull(validStatus.getId());
166+
167+
UUID orphanId = UUID.randomUUID();
168+
UUID orphanStateId = UUID.randomUUID();
169+
insertResolutionStatus(orphanId, orphanStateId, testCase);
170+
171+
int deleted =
172+
Entity.getCollectionDAO()
173+
.testCaseResolutionStatusTimeSeriesDao()
174+
.deleteOrphanedRecords(BATCH);
175+
176+
assertTrue(deleted >= 1, "Expected at least the inserted orphan row to be deleted");
177+
assertEquals(0, countRowsById("test_case_resolution_status_time_series", orphanId.toString()));
178+
assertEquals(
179+
1,
180+
countRowsById("test_case_resolution_status_time_series", validStatus.getId().toString()));
181+
}
182+
183+
@Test
184+
void profilerDataOrphans(TestNamespace ns) throws Exception {
185+
Table table = createTable(ns, "prof");
186+
String validFqn = table.getFullyQualifiedName();
187+
String orphanFqn = "orphanTbl_" + ns.uniqueShortId() + ".profile";
188+
189+
String validJson =
190+
String.format("{\"timestamp\":%d,\"rowCount\":42}", System.currentTimeMillis());
191+
String orphanJson =
192+
String.format("{\"timestamp\":%d,\"rowCount\":7}", System.currentTimeMillis());
193+
194+
Entity.getCollectionDAO()
195+
.profilerDataTimeSeriesDao()
196+
.insert(validFqn, "table.tableProfile", "tableProfile", validJson);
197+
Entity.getCollectionDAO()
198+
.profilerDataTimeSeriesDao()
199+
.insert(orphanFqn, "table.tableProfile", "tableProfile", orphanJson);
200+
201+
int deleted =
202+
Entity.getCollectionDAO().profilerDataTimeSeriesDao().deleteOrphanedRecords(BATCH);
203+
204+
assertTrue(deleted >= 1, "Expected at least the inserted orphan row to be deleted");
205+
assertEquals(
206+
0,
207+
countRowsByFqnHash("profiler_data_time_series", FullyQualifiedName.buildHash(orphanFqn)));
208+
assertTrue(
209+
countRowsByFqnHash("profiler_data_time_series", FullyQualifiedName.buildHash(validFqn))
210+
>= 1,
211+
"Valid profiler row must be preserved");
212+
}
213+
214+
@Test
215+
void queryCostOrphans(TestNamespace ns) throws Exception {
216+
Query query = createQuery(ns, "qc");
217+
String validFqn = query.getFullyQualifiedName();
218+
String orphanFqn = "orphanQc_" + ns.uniqueShortId();
219+
220+
String validJson =
221+
String.format(
222+
"{\"id\":\"%s\",\"timestamp\":%d,\"cost\":1.5,\"count\":3}",
223+
UUID.randomUUID(), System.currentTimeMillis());
224+
UUID orphanRowId = UUID.randomUUID();
225+
String orphanJson =
226+
String.format(
227+
"{\"id\":\"%s\",\"timestamp\":%d,\"cost\":2.5,\"count\":1}",
228+
orphanRowId, System.currentTimeMillis());
229+
230+
Entity.getCollectionDAO()
231+
.queryCostRecordTimeSeriesDAO()
232+
.insert(validFqn, "queryCostRecord", validJson);
233+
Entity.getCollectionDAO()
234+
.queryCostRecordTimeSeriesDAO()
235+
.insert(orphanFqn, "queryCostRecord", orphanJson);
236+
237+
int deleted =
238+
Entity.getCollectionDAO().queryCostRecordTimeSeriesDAO().deleteOrphanedRecords(BATCH);
239+
240+
assertTrue(deleted >= 1, "Expected at least the inserted orphan row to be deleted");
241+
assertEquals(0, countRowsById("query_cost_time_series", orphanRowId.toString()));
242+
assertTrue(
243+
countRowsByFqnHash("query_cost_time_series", FullyQualifiedName.buildHash(validFqn)) >= 1,
244+
"Valid query-cost row must be preserved");
245+
}
246+
247+
private void insertAgentExecution(UUID id, UUID agentId) {
248+
String json =
249+
String.format(
250+
"{\"id\":\"%s\",\"agentId\":\"%s\",\"timestamp\":%d,\"status\":\"Success\","
251+
+ "\"agent\":{\"id\":\"%s\",\"type\":\"aiApplication\"}}",
252+
id, agentId, System.currentTimeMillis(), agentId);
253+
Entity.getCollectionDAO().agentExecutionDAO().insertWithoutExtension(null, "", "", json);
254+
}
255+
256+
private void insertMcpExecution(UUID id, UUID serverId) {
257+
String json =
258+
String.format(
259+
"{\"id\":\"%s\",\"serverId\":\"%s\",\"timestamp\":%d,\"status\":\"Success\","
260+
+ "\"server\":{\"id\":\"%s\",\"type\":\"mcpServer\"}}",
261+
id, serverId, System.currentTimeMillis(), serverId);
262+
Entity.getCollectionDAO()
263+
.mcpExecutionDAO()
264+
.insertWithoutExtension("mcp_execution_entity", "", "", json);
265+
}
266+
267+
private void insertResolutionStatus(UUID id, UUID stateId, TestCase testCase) {
268+
String json =
269+
String.format(
270+
"{\"id\":\"%s\",\"stateId\":\"%s\",\"timestamp\":%d,"
271+
+ "\"testCaseResolutionStatusType\":\"New\","
272+
+ "\"testCaseReference\":{\"id\":\"%s\",\"type\":\"testCase\","
273+
+ "\"fullyQualifiedName\":\"%s\"}}",
274+
id,
275+
stateId,
276+
System.currentTimeMillis(),
277+
testCase.getId(),
278+
testCase.getFullyQualifiedName());
279+
Entity.getCollectionDAO()
280+
.testCaseResolutionStatusTimeSeriesDao()
281+
.insert(testCase.getFullyQualifiedName(), Entity.TEST_CASE_RESOLUTION_STATUS, json);
282+
}
283+
284+
private Table createTable(TestNamespace ns, String prefix) throws Exception {
285+
OpenMetadataClient client = SdkClients.adminClient();
286+
String id = ns.uniqueShortId();
287+
Database database =
288+
client
289+
.databases()
290+
.create(
291+
new CreateDatabase()
292+
.withName(prefix + "Db_" + id)
293+
.withService(SharedEntities.get().MYSQL_SERVICE.getFullyQualifiedName()));
294+
DatabaseSchema schema =
295+
client
296+
.databaseSchemas()
297+
.create(
298+
new CreateDatabaseSchema()
299+
.withName(prefix + "Sc_" + id)
300+
.withDatabase(database.getFullyQualifiedName()));
301+
return client
302+
.tables()
303+
.create(
304+
new CreateTable()
305+
.withName(prefix + "Tb_" + id)
306+
.withDatabaseSchema(schema.getFullyQualifiedName())
307+
.withColumns(
308+
List.of(new Column().withName("id").withDataType(ColumnDataType.BIGINT))));
309+
}
310+
311+
private TestCase createTestCase(Table table, String name) throws Exception {
312+
OpenMetadataClient client = SdkClients.adminClient();
313+
String testDefFqn =
314+
client
315+
.testDefinitions()
316+
.list(new ListParams().withLimit(1))
317+
.getData()
318+
.get(0)
319+
.getFullyQualifiedName();
320+
CreateTestCase createTestCase =
321+
new CreateTestCase()
322+
.withName(name)
323+
.withEntityLink("<#E::table::" + table.getFullyQualifiedName() + "::columns::id>")
324+
.withTestDefinition(testDefFqn);
325+
return client.testCases().create(createTestCase);
326+
}
327+
328+
private Query createQuery(TestNamespace ns, String prefix) throws Exception {
329+
OpenMetadataClient client = SdkClients.adminClient();
330+
Table table = createTable(ns, prefix);
331+
return client
332+
.queries()
333+
.create(
334+
new CreateQuery()
335+
.withName(prefix + "Q_" + ns.uniqueShortId())
336+
.withQuery("SELECT 1")
337+
.withService(SharedEntities.get().MYSQL_SERVICE.getFullyQualifiedName())
338+
.withQueryUsedIn(List.of(table.getEntityReference())));
339+
}
340+
341+
private int countRowsById(String table, String id) {
342+
return TestSuiteBootstrap.getJdbi()
343+
.withHandle(
344+
handle ->
345+
handle
346+
.createQuery("SELECT COUNT(*) FROM " + table + " WHERE id = :id")
347+
.bind("id", id)
348+
.mapTo(Integer.class)
349+
.one());
350+
}
351+
352+
private int countRowsByFqnHash(String table, String fqnHash) {
353+
return TestSuiteBootstrap.getJdbi()
354+
.withHandle(
355+
handle ->
356+
handle
357+
.createQuery("SELECT COUNT(*) FROM " + table + " WHERE entityFQNHash = :h")
358+
.bind("h", fqnHash)
359+
.mapTo(Integer.class)
360+
.one());
361+
}
362+
}

0 commit comments

Comments
 (0)