Skip to content

Commit 1be7852

Browse files
giovaborgognoclaude
andcommitted
feat(events): stale subscription cleanup — daily cron job in admin worker
Add CleanupStaleSubscriptionsService that finds disabled EventSubscriptions whose associated task no longer exists in any active worker, and deletes them. Runs daily at 3 AM UTC via the admin worker cron. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 63a4bd3 commit 1be7852

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

apps/webapp/app/v3/services/adminWorker.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { singleton } from "~/utils/singleton";
88
import { tracer } from "../tracer.server";
99
import { $replica } from "~/db.server";
1010
import { RunsBackfillerService } from "../../services/runsBackfiller.server";
11+
import { CleanupStaleSubscriptionsService } from "./events/cleanupStaleSubscriptions.server";
12+
import { prisma } from "~/db.server";
1113

1214
function initializeWorker() {
1315
const redisOptions = {
@@ -26,6 +28,13 @@ function initializeWorker() {
2628
name: "admin-worker",
2729
redisOptions,
2830
catalog: {
31+
"admin.cleanupStaleSubscriptions": {
32+
schema: z.object({}),
33+
visibilityTimeoutMs: 60_000 * 5, // 5 minutes
34+
retry: { maxAttempts: 3 },
35+
cron: "0 3 * * *", // Daily at 3 AM UTC
36+
jitterInMs: 60_000, // 1 minute jitter
37+
},
2938
"admin.backfillRunsToReplication": {
3039
schema: z.object({
3140
from: z.coerce.date(),
@@ -50,6 +59,10 @@ function initializeWorker() {
5059
shutdownTimeoutMs: env.ADMIN_WORKER_SHUTDOWN_TIMEOUT_MS,
5160
logger: new Logger("AdminWorker", env.ADMIN_WORKER_LOG_LEVEL),
5261
jobs: {
62+
"admin.cleanupStaleSubscriptions": async () => {
63+
const service = new CleanupStaleSubscriptionsService(prisma);
64+
await service.call();
65+
},
5366
"admin.backfillRunsToReplication": async ({ payload, id }) => {
5467
if (!runsReplicationInstance) {
5568
logger.error("Runs replication instance not found");
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { PrismaClientOrTransaction } from "~/db.server";
2+
import { logger } from "~/services/logger.server";
3+
4+
/**
5+
* Cleans up stale EventSubscriptions — disabled subscriptions whose associated
6+
* task no longer exists in any active worker for that environment.
7+
*/
8+
export class CleanupStaleSubscriptionsService {
9+
constructor(private readonly _prisma: PrismaClientOrTransaction) {}
10+
11+
async call(): Promise<{ deletedCount: number; scannedCount: number }> {
12+
// Find all disabled subscriptions
13+
const disabledSubscriptions = await this._prisma.eventSubscription.findMany({
14+
where: { enabled: false },
15+
select: {
16+
id: true,
17+
taskSlug: true,
18+
projectId: true,
19+
environmentId: true,
20+
},
21+
});
22+
23+
if (disabledSubscriptions.length === 0) {
24+
return { deletedCount: 0, scannedCount: 0 };
25+
}
26+
27+
// For each disabled subscription, check if ANY active worker still has that task
28+
const idsToDelete: string[] = [];
29+
30+
for (const sub of disabledSubscriptions) {
31+
const taskExists = await this._prisma.backgroundWorkerTask.findFirst({
32+
where: {
33+
slug: sub.taskSlug,
34+
projectId: sub.projectId,
35+
runtimeEnvironmentId: sub.environmentId,
36+
},
37+
select: { id: true },
38+
});
39+
40+
if (!taskExists) {
41+
idsToDelete.push(sub.id);
42+
}
43+
}
44+
45+
if (idsToDelete.length > 0) {
46+
await this._prisma.eventSubscription.deleteMany({
47+
where: { id: { in: idsToDelete } },
48+
});
49+
}
50+
51+
logger.info("Cleaned up stale event subscriptions", {
52+
deletedCount: idsToDelete.length,
53+
scannedCount: disabledSubscriptions.length,
54+
});
55+
56+
return { deletedCount: idsToDelete.length, scannedCount: disabledSubscriptions.length };
57+
}
58+
}

0 commit comments

Comments
 (0)