-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathenqueueSystem.ts
More file actions
123 lines (113 loc) · 3.7 KB
/
enqueueSystem.ts
File metadata and controls
123 lines (113 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import {
Prisma,
PrismaClientOrTransaction,
TaskRun,
TaskRunExecutionStatus,
} from "@trigger.dev/database";
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
import { MinimalAuthenticatedEnvironment } from "../../shared/index.js";
import { ExecutionSnapshotSystem } from "./executionSnapshotSystem.js";
import { SystemResources } from "./systems.js";
export type EnqueueSystemOptions = {
resources: SystemResources;
executionSnapshotSystem: ExecutionSnapshotSystem;
};
export class EnqueueSystem {
private readonly $: SystemResources;
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
constructor(private readonly options: EnqueueSystemOptions) {
this.$ = options.resources;
this.executionSnapshotSystem = options.executionSnapshotSystem;
}
public async enqueueRun({
run,
env,
tx,
snapshot,
previousSnapshotId,
batchId,
checkpointId,
completedWaitpoints,
workerId,
runnerId,
skipRunLock,
includeTtl = false,
enableFastPath = false,
}: {
run: TaskRun;
env: MinimalAuthenticatedEnvironment;
tx?: PrismaClientOrTransaction;
snapshot?: {
status?: Extract<TaskRunExecutionStatus, "QUEUED" | "QUEUED_EXECUTING">;
description?: string;
metadata?: Prisma.JsonValue;
};
previousSnapshotId?: string;
batchId?: string;
checkpointId?: string;
completedWaitpoints?: {
id: string;
index?: number;
}[];
workerId?: string;
runnerId?: string;
skipRunLock?: boolean;
/** When true, include TTL in the queued message (only for first enqueue from trigger). Default false. */
includeTtl?: boolean;
/** When true, allow the queue to push directly to worker queue if concurrency is available. */
enableFastPath?: boolean;
}) {
const prisma = tx ?? this.$.prisma;
return await this.$.runLock.lockIf(!skipRunLock, "enqueueRun", [run.id], async () => {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
run: run,
snapshot: {
executionStatus: snapshot?.status ?? "QUEUED",
description: snapshot?.description ?? "Run was QUEUED",
metadata: snapshot?.metadata ?? undefined,
},
previousSnapshotId,
batchId,
environmentId: env.id,
environmentType: env.type,
projectId: env.project.id,
organizationId: env.organization.id,
checkpointId,
completedWaitpoints,
workerId,
runnerId,
});
// Force development runs to use the environment id as the worker queue.
const workerQueue = env.type === "DEVELOPMENT" ? env.id : run.workerQueue;
const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
// Include TTL only when explicitly requested (first enqueue from trigger).
// Re-enqueues (waitpoint, checkpoint, delayed, pending version) must not add TTL.
let ttlExpiresAt: number | undefined;
if (includeTtl && run.ttl) {
const expireAt = parseNaturalLanguageDuration(run.ttl);
if (expireAt) {
ttlExpiresAt = expireAt.getTime();
}
}
await this.$.runQueue.enqueueMessage({
env,
workerQueue,
enableFastPath,
message: {
runId: run.id,
taskIdentifier: run.taskIdentifier,
orgId: env.organization.id,
projectId: env.project.id,
environmentId: env.id,
environmentType: env.type,
queue: run.queue,
concurrencyKey: run.concurrencyKey ?? undefined,
timestamp,
attempt: 0,
ttlExpiresAt,
},
});
return newSnapshot;
});
}
}