-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathperformBulkAction.server.ts
More file actions
125 lines (108 loc) · 3.18 KB
/
performBulkAction.server.ts
File metadata and controls
125 lines (108 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import assertNever from "assert-never";
import { PrismaClientOrTransaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";
import { BaseService } from "../baseService.server";
import { CancelTaskRunService } from "../cancelTaskRun.server";
import { ReplayTaskRunService } from "../replayTaskRun.server";
export class PerformBulkActionService extends BaseService {
public async performBulkActionItem(bulkActionItemId: string) {
const item = await this._prisma.bulkActionItem.findFirst({
where: { id: bulkActionItemId },
include: {
group: true,
sourceRun: true,
destinationRun: true,
},
});
if (!item) {
return;
}
if (item.status !== "PENDING") {
return;
}
switch (item.group.type) {
case "REPLAY": {
const service = new ReplayTaskRunService(this._prisma);
const result = await service.call(item.sourceRun, { triggerSource: "dashboard" });
await this._prisma.bulkActionItem.update({
where: { id: item.id },
data: {
destinationRunId: result?.id,
status: result ? "COMPLETED" : "FAILED",
error: result ? undefined : "Failed to replay task run",
},
});
break;
}
case "CANCEL": {
const service = new CancelTaskRunService(this._prisma);
const result = await service.call(item.sourceRun);
await this._prisma.bulkActionItem.update({
where: { id: item.id },
data: {
destinationRunId: item.sourceRun.id,
status: result ? "COMPLETED" : "FAILED",
error: result ? undefined : "Task wasn't cancelable",
},
});
break;
}
default: {
assertNever(item.group.type);
}
}
const groupItems = await this._prisma.bulkActionItem.findMany({
where: { groupId: item.groupId },
select: {
status: true,
},
});
const isGroupCompleted = groupItems.every((item) => item.status !== "PENDING");
if (isGroupCompleted) {
await this._prisma.bulkActionItem.update({
where: { id: item.id },
data: {
status: "COMPLETED",
},
});
}
}
public async enqueueBulkActionItem(bulkActionItemId: string, groupId: string) {
await workerQueue.enqueue(
"v3.performBulkActionItem",
{
bulkActionItemId,
},
{
jobKey: `performBulkActionItem:${bulkActionItemId}`,
}
);
}
public async call(bulkActionGroupId: string) {
const actionGroup = await this._prisma.bulkActionGroup.findFirst({
include: {
items: true,
},
where: { id: bulkActionGroupId },
});
if (!actionGroup) {
return;
}
for (const item of actionGroup.items) {
await this.enqueueBulkActionItem(item.id, bulkActionGroupId);
}
}
static async enqueue(bulkActionGroupId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
return await workerQueue.enqueue(
"v3.performBulkAction",
{
bulkActionGroupId,
},
{
tx,
runAt,
jobKey: `performBulkAction:${bulkActionGroupId}`,
}
);
}
}