Skip to content

Commit b69bd96

Browse files
CarinaWolliCarinaWollianikdhabal
authored
fix: duplicate workflow reminders caused by scanWorkflowBody (calcom#21767)
* don't scan if there is newer task * create seperate tasks per workflowStepId * revert changed constant * fix comment * add back missing import * code clean up * fix PR feedback * change function to hasNewerScanTaskForStepId --------- Co-authored-by: CarinaWolli <wollencarina@gmail.com> Co-authored-by: Anik Dhabal Babu <81948346+anikdhabal@users.noreply.github.com>
1 parent 09a49dd commit b69bd96

3 files changed

Lines changed: 63 additions & 12 deletions

File tree

packages/features/tasker/repository.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { type Prisma } from "@prisma/client";
33
import db from "@calcom/prisma";
44

55
import { type TaskTypes } from "./tasker";
6+
import { scanWorkflowBodySchema } from "./tasks/scanWorkflowBody";
67

78
const whereSucceeded: Prisma.TaskWhereInput = {
89
succeededAt: { not: null },
@@ -187,4 +188,24 @@ export class Task {
187188
// },
188189
// });
189190
}
191+
192+
static async hasNewerScanTaskForStepId(workflowStepId: number, createdAt: string) {
193+
const tasks = await db.$queryRaw<{ payload: string }[]>`
194+
SELECT "payload"
195+
FROM "Task"
196+
WHERE "type" = 'scanWorkflowBody'
197+
AND "succeededAt" IS NULL
198+
AND (payload::jsonb ->> 'workflowStepId')::int = ${workflowStepId}
199+
`;
200+
201+
return tasks.some((task) => {
202+
try {
203+
const parsed = scanWorkflowBodySchema.parse(JSON.parse(task.payload));
204+
if (!parsed.createdAt) return false;
205+
return new Date(parsed.createdAt) > new Date(createdAt);
206+
} catch {
207+
return false;
208+
}
209+
});
210+
}
190211
}

packages/features/tasker/tasks/scanWorkflowBody.ts

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import z from "zod";
22

33
import { getTemplateBodyForAction } from "@calcom/features/ee/workflows/lib/actionHelperFunctions";
44
import compareReminderBodyToTemplate from "@calcom/features/ee/workflows/lib/compareReminderBodyToTemplate";
5+
import { Task } from "@calcom/features/tasker/repository";
56
import { lockUser, LockReason } from "@calcom/lib/autoLock";
67
import logger from "@calcom/lib/logger";
78
import { getTranslation } from "@calcom/lib/server/i18n";
@@ -11,18 +12,39 @@ import { scheduleWorkflowNotifications } from "@calcom/trpc/server/routers/viewe
1112

1213
export const scanWorkflowBodySchema = z.object({
1314
userId: z.number(),
14-
workflowStepIds: z.array(z.number()),
15+
// deprecated: use workflowStepId instead
16+
workflowStepIds: z.array(z.number()).optional(),
17+
workflowStepId: z.number().optional(),
18+
createdAt: z.string().optional(),
1519
});
1620

1721
const log = logger.getSubLogger({ prefix: ["[tasker] scanWorkflowBody"] });
1822

1923
export async function scanWorkflowBody(payload: string) {
20-
const { workflowStepIds, userId } = scanWorkflowBodySchema.parse(JSON.parse(payload));
24+
const { workflowStepIds, userId, createdAt, workflowStepId } = scanWorkflowBodySchema.parse(
25+
JSON.parse(payload)
26+
);
27+
28+
const stepIdsToScan: number[] = workflowStepIds ? workflowStepIds : [];
29+
30+
if (workflowStepId && !workflowStepIds) {
31+
if (createdAt) {
32+
const hasNewerTask = await Task.hasNewerScanTaskForStepId(workflowStepId, createdAt);
33+
34+
if (hasNewerTask) return;
35+
36+
stepIdsToScan.push(workflowStepId);
37+
} else {
38+
stepIdsToScan.push(workflowStepId);
39+
}
40+
}
41+
42+
if (stepIdsToScan.length === 0) return;
2143

2244
const workflowSteps = await prisma.workflowStep.findMany({
2345
where: {
2446
id: {
25-
in: workflowStepIds,
47+
in: stepIdsToScan,
2648
},
2749
},
2850
include: {
@@ -116,7 +138,7 @@ export async function scanWorkflowBody(payload: string) {
116138
await prisma.workflowStep.updateMany({
117139
where: {
118140
id: {
119-
in: workflowStepIds,
141+
in: stepIdsToScan,
120142
},
121143
},
122144
data: {
@@ -130,7 +152,7 @@ export async function scanWorkflowBody(payload: string) {
130152
steps: {
131153
some: {
132154
id: {
133-
in: workflowStepIds,
155+
in: stepIdsToScan,
134156
},
135157
},
136158
},
@@ -142,7 +164,7 @@ export async function scanWorkflowBody(payload: string) {
142164
});
143165

144166
if (!workflow) {
145-
log.warn(`Workflow with steps ${workflowStepIds} not found`);
167+
log.warn(`Workflow with steps ${stepIdsToScan} not found`);
146168
return;
147169
}
148170

packages/trpc/server/routers/viewer/workflows/update.handler.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,11 @@ export const updateHandler = async ({ ctx, input }: UpdateOptions) => {
392392
});
393393

394394
if (SCANNING_WORKFLOW_STEPS && didBodyChange) {
395-
await tasker.create("scanWorkflowBody", { workflowStepIds: [oldStep.id], userId: ctx.user.id });
395+
await tasker.create("scanWorkflowBody", {
396+
workflowStepId: oldStep.id,
397+
userId: ctx.user.id,
398+
createdAt: new Date().toISOString(),
399+
});
396400
} else {
397401
// schedule notifications for edited steps
398402
await scheduleWorkflowNotifications({
@@ -467,11 +471,15 @@ export const updateHandler = async ({ ctx, input }: UpdateOptions) => {
467471
);
468472

469473
if (SCANNING_WORKFLOW_STEPS) {
470-
// workflows are scanned then scheduled in the task
471-
await tasker.create("scanWorkflowBody", {
472-
workflowStepIds: createdSteps.map((step) => step.id),
473-
userId: ctx.user.id,
474-
});
474+
await Promise.all(
475+
createdSteps.map((step) =>
476+
tasker.create("scanWorkflowBody", {
477+
workflowStepId: step.id,
478+
userId: ctx.user.id,
479+
createdAt: new Date().toISOString(),
480+
})
481+
)
482+
);
475483
} else {
476484
// schedule notification for new step
477485
await scheduleWorkflowNotifications({

0 commit comments

Comments
 (0)