-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathcreateBatch.server.ts
More file actions
202 lines (179 loc) · 7.63 KB
/
createBatch.server.ts
File metadata and controls
202 lines (179 loc) · 7.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import type { InitializeBatchOptions } from "@internal/run-engine";
import { type CreateBatchRequestBody, type CreateBatchResponse } from "@trigger.dev/core/v3";
import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { type BatchTaskRun, Prisma } from "@trigger.dev/database";
import { Evt } from "evt";
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import { BatchRateLimitExceededError, getBatchLimits } from "../concerns/batchLimits.server";
import { DefaultQueueManager } from "../concerns/queues.server";
import { DefaultTriggerTaskValidator } from "../validators/triggerTaskValidator";
export type CreateBatchServiceOptions = {
triggerVersion?: string;
traceContext?: Record<string, string | undefined | Record<string, string | undefined>>;
spanParentAsLink?: boolean;
oneTimeUseToken?: string;
realtimeStreamsVersion?: "v1" | "v2";
triggerSource?: string;
};
/**
* Create Batch Service (Phase 1 of 2-phase batch API).
*
* This service handles Phase 1 of the streaming batch API:
* 1. Validates entitlement and queue limits
* 2. Creates BatchTaskRun in Postgres with status=PENDING, expectedCount set
* 3. For batchTriggerAndWait: blocks the parent run immediately
* 4. Initializes batch metadata in Redis
* 5. Returns batch ID - items are streamed separately via Phase 2
*
* The batch is NOT sealed until Phase 2 completes.
*/
export class CreateBatchService extends WithRunEngine {
public onBatchTaskRunCreated: Evt<BatchTaskRun> = new Evt();
private readonly queueConcern: DefaultQueueManager;
private readonly validator: DefaultTriggerTaskValidator;
constructor(protected readonly _prisma: PrismaClientOrTransaction = prisma) {
super({ prisma: _prisma });
this.queueConcern = new DefaultQueueManager(this._prisma, this._engine);
this.validator = new DefaultTriggerTaskValidator();
}
/**
* Create a batch for 2-phase processing.
* Items will be streamed separately via the StreamBatchItemsService.
*/
public async call(
environment: AuthenticatedEnvironment,
body: CreateBatchRequestBody,
options: CreateBatchServiceOptions = {}
): Promise<CreateBatchResponse> {
try {
return await this.traceWithEnv<CreateBatchResponse>(
"createBatch()",
environment,
async (span) => {
const { id, friendlyId } = BatchId.generate();
span.setAttribute("batchId", friendlyId);
span.setAttribute("runCount", body.runCount);
// Validate entitlement
const entitlementValidation = await this.validator.validateEntitlement({
environment,
});
if (!entitlementValidation.ok) {
throw entitlementValidation.error;
}
// Extract plan type from entitlement validation for billing tracking
const planType = entitlementValidation.plan?.type;
// Get batch limits for this organization
const { config, rateLimiter } = await getBatchLimits(environment.organization);
// Check rate limit BEFORE creating the batch
// This prevents burst creation of batches that exceed the rate limit
const rateResult = await rateLimiter.limit(environment.id, body.runCount);
if (!rateResult.success) {
throw new BatchRateLimitExceededError(
rateResult.limit,
rateResult.remaining,
new Date(rateResult.reset),
body.runCount
);
}
// Note: Queue size limits are validated per-queue when batch items are processed,
// since we don't know which queues items will go to until they're streamed.
// Create BatchTaskRun in Postgres with PENDING status
// The batch will be sealed (status -> PROCESSING) when items are streamed
const batch = await this._prisma.batchTaskRun.create({
data: {
id,
friendlyId,
runtimeEnvironmentId: environment.id,
status: "PENDING",
runCount: body.runCount,
expectedCount: body.runCount,
runIds: [],
batchVersion: "runengine:v2", // 2-phase streaming batch API
oneTimeUseToken: options.oneTimeUseToken,
idempotencyKey: body.idempotencyKey,
// Not sealed yet - will be sealed when items stream completes
sealed: false,
},
});
this.onBatchTaskRunCreated.post(batch);
// Block parent run if this is a batchTriggerAndWait
if (body.parentRunId && body.resumeParentOnCompletion) {
await this._engine.blockRunWithCreatedBatch({
runId: RunId.fromFriendlyId(body.parentRunId),
batchId: batch.id,
environmentId: environment.id,
projectId: environment.projectId,
organizationId: environment.organizationId,
});
}
// Initialize batch metadata in Redis (without items)
const initOptions: InitializeBatchOptions = {
batchId: id,
friendlyId,
environmentId: environment.id,
environmentType: environment.type,
organizationId: environment.organizationId,
projectId: environment.projectId,
runCount: body.runCount,
parentRunId: body.parentRunId,
resumeParentOnCompletion: body.resumeParentOnCompletion,
triggerVersion: options.triggerVersion,
traceContext: options.traceContext as Record<string, unknown> | undefined,
spanParentAsLink: options.spanParentAsLink,
realtimeStreamsVersion: options.realtimeStreamsVersion,
idempotencyKey: body.idempotencyKey,
processingConcurrency: config.processingConcurrency,
planType,
triggerSource: options.triggerSource,
};
await this._engine.initializeBatch(initOptions);
logger.info("Batch created", {
batchId: friendlyId,
runCount: body.runCount,
envId: environment.id,
projectId: environment.projectId,
parentRunId: body.parentRunId,
resumeParentOnCompletion: body.resumeParentOnCompletion,
processingConcurrency: config.processingConcurrency,
});
return {
id: friendlyId,
runCount: body.runCount,
isCached: false,
idempotencyKey: body.idempotencyKey,
};
}
);
} catch (error) {
// Handle Prisma unique constraint violations
if (error instanceof Prisma.PrismaClientKnownRequestError) {
logger.debug("CreateBatchService: Prisma error", {
code: error.code,
message: error.message,
meta: error.meta,
});
if (error.code === "P2002") {
const target = error.meta?.target;
if (
Array.isArray(target) &&
target.length > 0 &&
typeof target[0] === "string" &&
target[0].includes("oneTimeUseToken")
) {
throw new ServiceValidationError(
"Cannot create batch with a one-time use token as it has already been used."
);
} else {
throw new ServiceValidationError(
"Cannot create batch as it has already been created with the same idempotency key."
);
}
}
}
throw error;
}
}
}