Skip to content

Commit adab225

Browse files
committed
Large waitpoints now get offloaded
1 parent f44843e commit adab225

File tree

5 files changed

+170
-21
lines changed

5 files changed

+170
-21
lines changed

apps/webapp/app/routes/api.v1.waitpoints.tokens.$waitpointFriendlyId.callback.$hash.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2-
import {
3-
type CompleteWaitpointTokenResponseBody,
4-
conditionallyExportPacket,
5-
stringifyIO,
6-
} from "@trigger.dev/core/v3";
2+
import { type CompleteWaitpointTokenResponseBody, stringifyIO } from "@trigger.dev/core/v3";
73
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
84
import { z } from "zod";
95
import { $replica } from "~/db.server";
106
import { env } from "~/env.server";
7+
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
8+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
119
import { verifyHttpCallbackHash } from "~/services/httpCallback.server";
1210
import { logger } from "~/services/logger.server";
1311
import { engine } from "~/v3/runEngine.server";
@@ -41,8 +39,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
4139
},
4240
include: {
4341
environment: {
44-
select: {
45-
apiKey: true,
42+
include: {
43+
project: true,
44+
organization: true,
45+
orgMember: true,
4646
parentEnvironment: {
4747
select: {
4848
apiKey: true,
@@ -77,9 +77,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
7777
const body = await request.json().catch(() => ({}));
7878

7979
const stringifiedData = await stringifyIO(body);
80-
const finalData = await conditionallyExportPacket(
80+
const finalData = await processWaitpointCompletionPacket(
8181
stringifiedData,
82-
`${waitpointId}/waitpoint/http-callback`
82+
waitpoint.environment,
83+
`${WaitpointId.toFriendlyId(waitpointId)}/http-callback`
8384
);
8485

8586
const result = await engine.completeWaitpoint({

apps/webapp/app/routes/api.v1.waitpoints.tokens.$waitpointFriendlyId.complete.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ import { json } from "@remix-run/server-runtime";
22
import {
33
CompleteWaitpointTokenRequestBody,
44
type CompleteWaitpointTokenResponseBody,
5-
conditionallyExportPacket,
65
stringifyIO,
76
} from "@trigger.dev/core/v3";
87
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
98
import { z } from "zod";
109
import { $replica } from "~/db.server";
1110
import { env } from "~/env.server";
1211
import { logger } from "~/services/logger.server";
12+
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
1313
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1414
import { engine } from "~/v3/runEngine.server";
1515

@@ -52,9 +52,10 @@ const { action, loader } = createActionApiRoute(
5252
}
5353

5454
const stringifiedData = await stringifyIO(body.data);
55-
const finalData = await conditionallyExportPacket(
55+
const finalData = await processWaitpointCompletionPacket(
5656
stringifiedData,
57-
`${waitpointId}/waitpoint/token`
57+
authentication.environment,
58+
`${WaitpointId.toFriendlyId(waitpointId)}/token`
5859
);
5960

6061
const result = await engine.completeWaitpoint({

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.waitpoints.$waitpointFriendlyId.complete/route.tsx

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,7 @@ import { env } from "~/env.server";
22
import { parse } from "@conform-to/zod";
33
import { Form, useLocation, useNavigation, useSubmit } from "@remix-run/react";
44
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
5-
import {
6-
conditionallyExportPacket,
7-
IOPacket,
8-
stringifyIO,
9-
timeoutError,
10-
WaitpointTokenStatus,
11-
} from "@trigger.dev/core/v3";
5+
import { stringifyIO, timeoutError, WaitpointTokenStatus } from "@trigger.dev/core/v3";
126
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
137
import type { Waitpoint } from "@trigger.dev/database";
148
import { useCallback, useRef } from "react";
@@ -24,6 +18,8 @@ import { $replica } from "~/db.server";
2418
import { useOrganization } from "~/hooks/useOrganizations";
2519
import { useProject } from "~/hooks/useProject";
2620
import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/message.server";
21+
import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server";
22+
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
2723
import { logger } from "~/services/logger.server";
2824
import { requireUserId } from "~/services/session.server";
2925
import { EnvironmentParamSchema, ProjectParamSchema, v3RunsPath } from "~/utils/pathBuilder";
@@ -86,6 +82,7 @@ export const action = async ({ request, params }: ActionFunctionArgs) => {
8682
const waitpoint = await $replica.waitpoint.findFirst({
8783
select: {
8884
projectId: true,
85+
environmentId: true,
8986
},
9087
where: {
9188
id: waitpointId,
@@ -150,11 +147,29 @@ export const action = async ({ request, params }: ActionFunctionArgs) => {
150147
);
151148
}
152149

150+
const environment = await findEnvironmentBySlug(project.id, envParam, userId);
151+
if (!environment) {
152+
return redirectWithErrorMessage(
153+
submission.value.failureRedirect,
154+
request,
155+
"Environment not found"
156+
);
157+
}
158+
159+
if (environment.id !== waitpoint.environmentId) {
160+
return redirectWithErrorMessage(
161+
submission.value.failureRedirect,
162+
request,
163+
"No waitpoint found"
164+
);
165+
}
166+
153167
const data = submission.value.payload ? JSON.parse(submission.value.payload) : {};
154168
const stringifiedData = await stringifyIO(data);
155-
const finalData = await conditionallyExportPacket(
169+
const finalData = await processWaitpointCompletionPacket(
156170
stringifiedData,
157-
`${waitpointId}/waitpoint/token`
171+
environment,
172+
`${WaitpointId.toFriendlyId(waitpointId)}/token`
158173
);
159174

160175
const result = await engine.completeWaitpoint({
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
2+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
3+
import { env } from "~/env.server";
4+
import { uploadPacketToObjectStore } from "~/v3/objectStore.server";
5+
import { ServiceValidationError } from "~/v3/services/common.server";
6+
import { startActiveSpan } from "~/v3/tracer.server";
7+
8+
function packetExtensionForDataType(dataType: string): string {
9+
switch (dataType) {
10+
case "application/json":
11+
case "application/super+json":
12+
return "json";
13+
case "text/plain":
14+
return "txt";
15+
default:
16+
return "txt";
17+
}
18+
}
19+
20+
/**
21+
* Offloads large waitpoint completion payloads to object store (same threshold and
22+
* upload path pattern as DefaultPayloadProcessor). Object key prefix should use the
23+
* waitpoint friendly id folder, e.g. `${WaitpointId.toFriendlyId(internalId)}/token`.
24+
* Replaces no-op conditionallyExportPacket usage in webapp routes where apiClientManager is unset.
25+
*/
26+
export async function processWaitpointCompletionPacket(
27+
packet: IOPacket,
28+
environment: AuthenticatedEnvironment,
29+
pathPrefix: string
30+
): Promise<IOPacket> {
31+
return await startActiveSpan("processWaitpointCompletionPacket()", async (span) => {
32+
if (!packet.data) {
33+
return packet;
34+
}
35+
36+
const { needsOffloading, size } = packetRequiresOffloading(
37+
packet,
38+
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD
39+
);
40+
41+
span.setAttribute("needsOffloading", needsOffloading);
42+
span.setAttribute("size", size);
43+
44+
if (!needsOffloading) {
45+
return packet;
46+
}
47+
48+
const filename = `${pathPrefix}.${packetExtensionForDataType(packet.dataType)}`;
49+
50+
const [uploadError, uploadedFilename] = await tryCatch(
51+
uploadPacketToObjectStore(
52+
filename,
53+
packet.data,
54+
packet.dataType,
55+
environment,
56+
env.OBJECT_STORE_DEFAULT_PROTOCOL
57+
)
58+
);
59+
60+
if (uploadError) {
61+
throw new ServiceValidationError("Failed to upload large waitpoint to object store", 500);
62+
}
63+
64+
return {
65+
data: uploadedFilename!,
66+
dataType: "application/store",
67+
};
68+
});
69+
}

apps/webapp/test/objectStore.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { type IOPacket } from "@trigger.dev/core/v3";
33
import { type PrismaClient } from "@trigger.dev/database";
44
import { afterAll, describe, expect, it, vi } from "vitest";
55
import { env } from "~/env.server";
6+
import { processWaitpointCompletionPacket } from "~/runEngine/concerns/waitpointCompletionPacket.server";
67
import {
78
downloadPacketFromObjectStore,
89
formatStorageUri,
@@ -65,6 +66,7 @@ const originalEnvObj = {
6566
OBJECT_STORE_SECRET_ACCESS_KEY: env.OBJECT_STORE_SECRET_ACCESS_KEY,
6667
OBJECT_STORE_REGION: env.OBJECT_STORE_REGION,
6768
OBJECT_STORE_DEFAULT_PROTOCOL: env.OBJECT_STORE_DEFAULT_PROTOCOL,
69+
TASK_PAYLOAD_OFFLOAD_THRESHOLD: env.TASK_PAYLOAD_OFFLOAD_THRESHOLD,
6870
};
6971

7072
describe("Object Storage", () => {
@@ -344,6 +346,66 @@ describe("Object Storage", () => {
344346
}
345347
);
346348

349+
postgresAndMinioTest(
350+
"processWaitpointCompletionPacket offloads above threshold and round-trips download",
351+
async ({ minioConfig, prisma }) => {
352+
env.OBJECT_STORE_BASE_URL = minioConfig.baseUrl;
353+
env.OBJECT_STORE_ACCESS_KEY_ID = minioConfig.accessKeyId;
354+
env.OBJECT_STORE_SECRET_ACCESS_KEY = minioConfig.secretAccessKey;
355+
env.OBJECT_STORE_REGION = minioConfig.region;
356+
env.OBJECT_STORE_DEFAULT_PROTOCOL = undefined;
357+
358+
const savedThreshold = env.TASK_PAYLOAD_OFFLOAD_THRESHOLD;
359+
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD = 256;
360+
361+
const environment = await createTestEnvironment(prisma);
362+
const pathPrefix = `waitpoint_completiontest/token`;
363+
364+
try {
365+
const smallPacket: IOPacket = {
366+
data: JSON.stringify({ ok: true }),
367+
dataType: "application/json",
368+
};
369+
const smallResult = await processWaitpointCompletionPacket(
370+
smallPacket,
371+
environment as any,
372+
pathPrefix
373+
);
374+
expect(smallResult).toEqual(smallPacket);
375+
376+
const largeBody = "x".repeat(400);
377+
const largePacket: IOPacket = {
378+
data: largeBody,
379+
dataType: "text/plain",
380+
};
381+
const largeResult = await processWaitpointCompletionPacket(
382+
largePacket,
383+
environment as any,
384+
pathPrefix
385+
);
386+
387+
expect(largeResult.dataType).toBe("application/store");
388+
expect(largeResult.data).toBe(`${pathPrefix}.txt`);
389+
390+
const downloadedPacket = await downloadPacketFromObjectStore(
391+
{
392+
data: largeResult.data,
393+
dataType: "application/store",
394+
},
395+
environment as any
396+
);
397+
398+
expect(downloadedPacket.data).toBe(largeBody);
399+
} finally {
400+
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD = savedThreshold;
401+
402+
await prisma.runtimeEnvironment.delete({ where: { id: environment.id } });
403+
await prisma.project.delete({ where: { id: environment.projectId } });
404+
await prisma.organization.delete({ where: { id: environment.organizationId } });
405+
}
406+
}
407+
);
408+
347409
describe("hasObjectStoreClient", () => {
348410
it("returns false when no store is configured", () => {
349411
env.OBJECT_STORE_BASE_URL = undefined;
@@ -582,5 +644,6 @@ describe("Object Storage", () => {
582644
env.OBJECT_STORE_SECRET_ACCESS_KEY = originalEnvObj.OBJECT_STORE_SECRET_ACCESS_KEY;
583645
env.OBJECT_STORE_REGION = originalEnvObj.OBJECT_STORE_REGION;
584646
env.OBJECT_STORE_DEFAULT_PROTOCOL = originalEnvObj.OBJECT_STORE_DEFAULT_PROTOCOL;
647+
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD = originalEnvObj.TASK_PAYLOAD_OFFLOAD_THRESHOLD;
585648
});
586649
});

0 commit comments

Comments
 (0)