-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathchangeCurrentDeployment.server.ts
More file actions
234 lines (208 loc) · 7.6 KB
/
Copy pathchangeCurrentDeployment.server.ts
File metadata and controls
234 lines (208 loc) · 7.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import { BackgroundWorkerMetadata, tryCatch } from "@trigger.dev/core/v3";
import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";
import { PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { syncTaskIdentifiers } from "~/services/taskIdentifierRegistry.server";
import {
type TaskMetadataCache,
type TaskMetadataEntry,
} from "~/services/taskMetadataCache.server";
import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server";
import { BaseService, ServiceValidationError } from "./baseService.server";
import { syncDeclarativeSchedules } from "./createBackgroundWorker.server";
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";
import { compareDeploymentVersions } from "../utils/deploymentVersions";
export type ChangeCurrentDeploymentDirection = "promote" | "rollback";
export class ChangeCurrentDeploymentService extends BaseService {
private readonly _taskMetaCache: TaskMetadataCache;
constructor(
prisma?: PrismaClientOrTransaction,
replica?: PrismaClientOrTransaction,
taskMetaCache: TaskMetadataCache = taskMetadataCacheInstance
) {
super(prisma, replica);
this._taskMetaCache = taskMetaCache;
}
public async call(
deployment: WorkerDeployment,
direction: ChangeCurrentDeploymentDirection,
disableVersionCheck?: boolean
) {
if (!deployment.workerId) {
throw new ServiceValidationError(
direction === "promote"
? "Deployment is not associated with a worker and cannot be promoted."
: "Deployment is not associated with a worker and cannot be rolled back."
);
}
if (deployment.status !== "DEPLOYED") {
throw new ServiceValidationError(
direction === "promote"
? "Deployment must be in the DEPLOYED state to be promoted."
: "Deployment must be in the DEPLOYED state to be rolled back."
);
}
const currentPromotion = await this._prisma.workerDeploymentPromotion.findFirst({
where: {
environmentId: deployment.environmentId,
label: CURRENT_DEPLOYMENT_LABEL,
},
select: {
deployment: {
select: { id: true, version: true },
},
},
});
if (currentPromotion) {
if (currentPromotion.deployment.id === deployment.id) {
throw new ServiceValidationError("Deployment is already the current deployment.");
}
// if there is a current promotion, we have to validate we are moving in the right direction based on the deployment versions
if (!disableVersionCheck) {
switch (direction) {
case "promote": {
if (
compareDeploymentVersions(currentPromotion.deployment.version, deployment.version) >=
0
) {
throw new ServiceValidationError(
"Cannot promote a deployment that is older than the current deployment."
);
}
break;
}
case "rollback": {
if (
compareDeploymentVersions(currentPromotion.deployment.version, deployment.version) <=
0
) {
throw new ServiceValidationError(
"Cannot rollback to a deployment that is newer than the current deployment."
);
}
break;
}
}
}
}
//set this deployment as the current deployment for this environment
await this._prisma.workerDeploymentPromotion.upsert({
where: {
environmentId_label: {
environmentId: deployment.environmentId,
label: CURRENT_DEPLOYMENT_LABEL,
},
},
create: {
deploymentId: deployment.id,
environmentId: deployment.environmentId,
label: CURRENT_DEPLOYMENT_LABEL,
},
update: {
deploymentId: deployment.id,
},
});
const [fetchTasksError, tasks] = await tryCatch(
this._prisma.backgroundWorkerTask.findMany({
where: { workerId: deployment.workerId! },
select: {
slug: true,
triggerSource: true,
ttl: true,
queue: { select: { id: true, name: true } },
},
})
);
if (fetchTasksError) {
logger.error("Error fetching worker tasks on deployment change", {
error: fetchTasksError,
});
}
if (tasks) {
// Side effect 1: refresh the `TaskIdentifier` table and the existing
// `tids:` Redis cache so the task-listing UI reflects the new deploy.
const [syncIdentifiersError] = await tryCatch(
syncTaskIdentifiers(
deployment.environmentId,
deployment.projectId,
deployment.workerId!,
tasks.map((t) => ({ id: t.slug, triggerSource: t.triggerSource }))
)
);
if (syncIdentifiersError) {
logger.error("Error syncing task identifiers on deployment change", {
error: syncIdentifiersError,
});
}
// Side effect 2: refresh the `task-meta:` cache that the queue resolver
// reads from. Independent of side effect 1 — if `syncTaskIdentifiers`
// throws, the queue resolver still gets a warm cache for the new worker.
const metadataEntries: TaskMetadataEntry[] = tasks.map((t) => ({
slug: t.slug,
ttl: t.ttl,
triggerSource: t.triggerSource,
queueId: t.queue?.id ?? null,
queueName: t.queue?.name ?? "",
}));
// Cache calls log+swallow internally.
await this._taskMetaCache.populateByCurrentWorker(
deployment.environmentId,
deployment.workerId!,
metadataEntries
);
}
const [scheduleSyncError] = await tryCatch(this.#syncSchedulesForDeployment(deployment));
if (scheduleSyncError) {
logger.error("Error syncing declarative schedules on deployment change", {
error: scheduleSyncError,
});
}
// Only V1 engine workers need the WAITING_FOR_DEPLOY drain — V2 runs sit
// in PENDING_VERSION and are handled out of band, so enqueuing here for V2
// just produces empty scans of the TaskRun status index.
const worker = await this._prisma.backgroundWorker.findFirst({
where: { id: deployment.workerId },
select: { engine: true },
});
if (worker?.engine === "V1") {
await ExecuteTasksWaitingForDeployService.enqueue(deployment.workerId);
}
}
async #syncSchedulesForDeployment(deployment: WorkerDeployment) {
const worker = await this._prisma.backgroundWorker.findFirst({
where: { id: deployment.workerId! },
});
if (!worker) {
logger.error("Worker not found for deployment schedule sync", {
deploymentId: deployment.id,
workerId: deployment.workerId,
});
return;
}
const parsed = BackgroundWorkerMetadata.safeParse(worker.metadata);
if (!parsed.success) {
logger.error("Failed to parse worker metadata for schedule sync", {
deploymentId: deployment.id,
workerId: deployment.workerId,
error: parsed.error,
});
return;
}
const environment = await this._prisma.runtimeEnvironment.findFirst({
where: { id: deployment.environmentId },
include: {
project: true,
organization: true,
orgMember: true,
},
});
if (!environment) {
logger.error("Environment not found for deployment schedule sync", {
deploymentId: deployment.id,
environmentId: deployment.environmentId,
});
return;
}
await syncDeclarativeSchedules(parsed.data.tasks, worker, environment, this._prisma);
}
}