-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathbatchPayloads.server.ts
More file actions
177 lines (157 loc) · 5.61 KB
/
batchPayloads.server.ts
File metadata and controls
177 lines (157 loc) · 5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { env } from "~/env.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { hasObjectStoreClient, uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { startActiveSpan } from "~/v3/tracer.server";
export type BatchPayloadProcessResult = {
/** The processed payload - either the original or an R2 path */
payload: unknown;
/** The payload type - "application/store" if offloaded to R2 */
payloadType: string;
/** Whether the payload was offloaded to R2 */
wasOffloaded: boolean;
/** Size of the payload in bytes */
size: number;
};
/**
* BatchPayloadProcessor handles payload offloading for batch items.
*
* When a batch item's payload exceeds the configured threshold, it's uploaded
* to object storage (R2) and the payload is replaced with the storage path.
* This aligns with how single task triggers work via DefaultPayloadProcessor.
*
* Path format: batch_{batchId}/item_{index}/payload.json
*/
export class BatchPayloadProcessor {
/**
* Check if object storage is available for payload offloading.
* If not available, large payloads will be stored inline (which may fail for very large payloads).
*/
isObjectStoreAvailable(): boolean {
return hasObjectStoreClient();
}
/**
* Process a batch item payload, offloading to R2 if it exceeds the threshold.
*
* @param payload - The raw payload from the batch item
* @param payloadType - The payload type (e.g., "application/json")
* @param batchId - The batch ID (internal format)
* @param itemIndex - The item index within the batch
* @param environment - The authenticated environment for R2 path construction
* @returns The processed result with potentially offloaded payload
*/
async process(
payload: unknown,
payloadType: string,
batchId: string,
itemIndex: number,
environment: AuthenticatedEnvironment
): Promise<BatchPayloadProcessResult> {
return startActiveSpan("BatchPayloadProcessor.process()", async (span) => {
span.setAttribute("batchId", batchId);
span.setAttribute("itemIndex", itemIndex);
span.setAttribute("payloadType", payloadType);
// Create the packet for size checking
const packet = this.#createPayloadPacket(payload, payloadType);
if (!packet.data) {
return {
payload,
payloadType,
wasOffloaded: false,
size: 0,
};
}
const threshold = env.BATCH_PAYLOAD_OFFLOAD_THRESHOLD ?? env.TASK_PAYLOAD_OFFLOAD_THRESHOLD;
const { needsOffloading, size } = packetRequiresOffloading(packet, threshold);
span.setAttribute("payloadSize", size);
span.setAttribute("needsOffloading", needsOffloading);
span.setAttribute("threshold", threshold);
if (!needsOffloading) {
return {
payload,
payloadType,
wasOffloaded: false,
size,
};
}
// Check if object store is available
if (!this.isObjectStoreAvailable()) {
logger.warn("Payload exceeds threshold but object store is not available", {
batchId,
itemIndex,
size,
threshold,
});
// Return without offloading - the payload will be stored inline
// This may fail downstream for very large payloads
return {
payload,
payloadType,
wasOffloaded: false,
size,
};
}
// Upload to object store
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(
filename,
packet.data,
packet.dataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
)
);
if (uploadError) {
logger.error("Failed to upload batch item payload to object store", {
batchId,
itemIndex,
error: uploadError instanceof Error ? uploadError.message : String(uploadError),
});
// Throw to fail this item - SDK can retry
throw new Error(
`Failed to upload large payload to object store: ${
uploadError instanceof Error ? uploadError.message : String(uploadError)
}`
);
}
logger.debug("Batch item payload offloaded to object store", {
batchId,
itemIndex,
filename: uploadedFilename,
size,
});
span.setAttribute("wasOffloaded", true);
span.setAttribute("offloadPath", uploadedFilename);
return {
payload: uploadedFilename!,
payloadType: "application/store",
wasOffloaded: true,
size,
};
});
}
/**
* Create an IOPacket from payload for size checking.
*/
#createPayloadPacket(payload: unknown, payloadType: string): IOPacket {
if (payloadType === "application/json") {
// Payload from SDK is already serialized as a string - use directly
if (typeof payload === "string") {
return { data: payload, dataType: "application/json" };
}
// Non-string payloads (e.g., direct API calls with objects) need serialization
return { data: JSON.stringify(payload), dataType: "application/json" };
}
if (typeof payload === "string") {
return { data: payload, dataType: payloadType };
}
// For other types, try to stringify
try {
return { data: JSON.stringify(payload), dataType: payloadType };
} catch {
return { dataType: payloadType };
}
}
}