Skip to content

Commit 7687d97

Browse files
committed
Object Storage seamless migration
This allows seamless migration to different object storage.
1 parent 97d2f72 commit 7687d97

File tree

15 files changed

+729
-403
lines changed

15 files changed

+729
-403
lines changed

.env.example

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,26 @@ POSTHOG_PROJECT_KEY=
7777
# DEPOT_TOKEN=<Depot org token>
7878
# DEV_OTEL_EXPORTER_OTLP_ENDPOINT="http://0.0.0.0:4318"
7979
# These are needed for the object store (for handling large payloads/outputs)
80+
# Default provider (backward compatible - no protocol prefix)
8081
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
8182
# OBJECT_STORE_ACCESS_KEY_ID=
8283
# OBJECT_STORE_SECRET_ACCESS_KEY=
84+
# OBJECT_STORE_REGION=auto
85+
# OBJECT_STORE_SERVICE=s3
86+
# OBJECT_STORE_DEFAULT_PROTOCOL=s3 # Optional: protocol to use for new uploads (e.g., "s3", "r2")
87+
#
88+
# Named providers (protocol-prefixed data) - optional for multi-provider support
89+
# OBJECT_STORE_S3_BASE_URL=https://s3.amazonaws.com
90+
# OBJECT_STORE_S3_ACCESS_KEY_ID=
91+
# OBJECT_STORE_S3_SECRET_ACCESS_KEY=
92+
# OBJECT_STORE_S3_REGION=us-east-1
93+
# OBJECT_STORE_S3_SERVICE=s3
94+
#
95+
# OBJECT_STORE_R2_BASE_URL=https://{bucket}.{accountId}.r2.cloudflarestorage.com
96+
# OBJECT_STORE_R2_ACCESS_KEY_ID=
97+
# OBJECT_STORE_R2_SECRET_ACCESS_KEY=
98+
# OBJECT_STORE_R2_REGION=auto
99+
# OBJECT_STORE_R2_SERVICE=s3
83100
# CHECKPOINT_THRESHOLD_IN_MS=10000
84101

85102
# These control the server-side internal telemetry

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,12 @@ const EnvironmentSchema = z
349349
OBJECT_STORE_REGION: z.string().optional(),
350350
OBJECT_STORE_SERVICE: z.string().default("s3"),
351351

352+
// Protocol to use for new uploads (e.g., "s3", "r2"). Data without protocol uses default provider above.
353+
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
354+
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
355+
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
356+
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),
357+
352358
ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
353359
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
354360
ARTIFACTS_OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import assertNever from "assert-never";
1515
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
1616
import { $replica, prisma } from "~/db.server";
1717
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
18-
import { generatePresignedUrl } from "~/v3/r2.server";
18+
import { generatePresignedUrl } from "~/v3/objectStore.server";
1919
import { tracer } from "~/v3/tracer.server";
2020
import { startSpanWithEnv } from "~/v3/tracing.server";
2121

apps/webapp/app/routes/api.v1.packets.$.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { json } from "@remix-run/server-runtime";
33
import { z } from "zod";
44
import { authenticateApiRequest } from "~/services/apiAuth.server";
55
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
6-
import { generatePresignedUrl } from "~/v3/r2.server";
6+
import { generatePresignedUrl } from "~/v3/objectStore.server";
77

88
const ParamsSchema = z.object({
99
"*": z.string(),

apps/webapp/app/routes/resources.packets.$environmentId.$.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { basename } from "node:path";
33
import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { requireUserId } from "~/services/session.server";
6-
import { generatePresignedRequest } from "~/v3/r2.server";
6+
import { generatePresignedRequest } from "~/v3/objectStore.server";
77

88
const ParamSchema = z.object({
99
environmentId: z.string(),

apps/webapp/app/runEngine/concerns/batchPayloads.server.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
1+
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
22
import { env } from "~/env.server";
3-
import { startActiveSpan } from "~/v3/tracer.server";
4-
import { uploadPacketToObjectStore, r2 } from "~/v3/r2.server";
53
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
64
import { logger } from "~/services/logger.server";
5+
import { hasObjectStoreClient, uploadPacketToObjectStore } from "~/v3/objectStore.server";
6+
import { startActiveSpan } from "~/v3/tracer.server";
77

88
export type BatchPayloadProcessResult = {
99
/** The processed payload - either the original or an R2 path */
@@ -31,7 +31,7 @@ export class BatchPayloadProcessor {
3131
* If not available, large payloads will be stored inline (which may fail for very large payloads).
3232
*/
3333
isObjectStoreAvailable(): boolean {
34-
return r2 !== undefined && env.OBJECT_STORE_BASE_URL !== undefined;
34+
return hasObjectStoreClient();
3535
}
3636

3737
/**
@@ -103,11 +103,17 @@ export class BatchPayloadProcessor {
103103
};
104104
}
105105

106-
// Upload to R2
106+
// Upload to object store
107107
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;
108108

109-
const [uploadError] = await tryCatch(
110-
uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment)
109+
const [uploadError, uploadedFilename] = await tryCatch(
110+
uploadPacketToObjectStore(
111+
filename,
112+
packet.data,
113+
packet.dataType,
114+
environment,
115+
env.OBJECT_STORE_DEFAULT_PROTOCOL
116+
)
111117
);
112118

113119
if (uploadError) {
@@ -125,18 +131,18 @@ export class BatchPayloadProcessor {
125131
);
126132
}
127133

128-
logger.debug("Batch item payload offloaded to R2", {
134+
logger.debug("Batch item payload offloaded to object store", {
129135
batchId,
130136
itemIndex,
131-
filename,
137+
filename: uploadedFilename,
132138
size,
133139
});
134140

135141
span.setAttribute("wasOffloaded", true);
136-
span.setAttribute("offloadPath", filename);
142+
span.setAttribute("offloadPath", uploadedFilename);
137143

138144
return {
139-
payload: filename,
145+
payload: uploadedFilename!,
140146
payloadType: "application/store",
141147
wasOffloaded: true,
142148
size,

apps/webapp/app/runEngine/concerns/payloads.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/
22
import { PayloadProcessor, TriggerTaskRequest } from "../types";
33
import { env } from "~/env.server";
44
import { startActiveSpan } from "~/v3/tracer.server";
5-
import { uploadPacketToObjectStore } from "~/v3/r2.server";
5+
import { uploadPacketToObjectStore } from "~/v3/objectStore.server";
66
import { ServiceValidationError } from "~/v3/services/common.server";
77

88
export class DefaultPayloadProcessor implements PayloadProcessor {
@@ -31,16 +31,16 @@ export class DefaultPayloadProcessor implements PayloadProcessor {
3131

3232
const filename = `${request.friendlyId}/payload.json`;
3333

34-
const [uploadError] = await tryCatch(
35-
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
34+
const [uploadError, uploadedFilename] = await tryCatch(
35+
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment, env.OBJECT_STORE_DEFAULT_PROTOCOL)
3636
);
3737

3838
if (uploadError) {
3939
throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable
4040
}
4141

4242
return {
43-
data: filename,
43+
data: uploadedFilename!,
4444
dataType: "application/store",
4545
};
4646
});

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { env } from "~/env.server";
1616
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1717
import { logger } from "~/services/logger.server";
1818
import { batchTriggerWorker } from "~/v3/batchTriggerWorker.server";
19-
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
19+
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/objectStore.server";
2020
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
2121
import { TriggerTaskService } from "../../v3/services/triggerTask.server";
2222
import { startActiveSpan } from "../../v3/tracer.server";

0 commit comments

Comments
 (0)