Skip to content

Commit def21b2

Browse files
authored
fix(batch): retry R2 upload on transient failure in BatchPayloadProcessor (#3331)
A single "fetch failed" from the object store was aborting the entire batch stream with no retry. Added p-retry (3 attempts, 500ms-2s backoff) around ploadPacketToObjectStore so transient network errors self-heal server-side instead of propagating to the SDK.
1 parent 4f2ff3d commit def21b2

File tree

4 files changed

+155
-19
lines changed

4 files changed

+155
-19
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix transient R2/object store upload failures during batchTrigger() item streaming.
7+
8+
- Added p-retry (3 attempts, 500ms–2s exponential backoff) around `uploadPacketToObjectStore` in `BatchPayloadProcessor.process()` so transient network errors self-heal server-side rather than aborting the entire batch stream.
9+
- Removed `x-should-retry: false` from the 500 response on the batch items route so the SDK's existing 5xx retry path can recover if server-side retries are exhausted. Item deduplication by index makes full-stream retries safe.

apps/webapp/app/routes/api.v3.batches.$batchId.items.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
104104
return json({ error: error.message }, { status: 400 });
105105
}
106106

107-
return json(
108-
{ error: error.message },
109-
{ status: 500, headers: { "x-should-retry": "false" } }
110-
);
107+
return json({ error: error.message }, { status: 500 });
111108
}
112109

113110
return json({ error: "Something went wrong" }, { status: 500 });

apps/webapp/app/runEngine/concerns/batchPayloads.server.ts

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
2+
import pRetry from "p-retry";
23
import { env } from "~/env.server";
34
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
45
import { logger } from "~/services/logger.server";
@@ -103,32 +104,46 @@ export class BatchPayloadProcessor {
103104
};
104105
}
105106

106-
// Upload to object store
107+
// Upload to object store, retrying on transient network errors
108+
const { data: packetData, dataType: packetDataType } = packet;
107109
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;
108110

109111
const [uploadError, uploadedFilename] = await tryCatch(
110-
uploadPacketToObjectStore(
111-
filename,
112-
packet.data,
113-
packet.dataType,
114-
environment,
115-
env.OBJECT_STORE_DEFAULT_PROTOCOL
112+
pRetry(
113+
() =>
114+
uploadPacketToObjectStore(
115+
filename,
116+
packetData,
117+
packetDataType,
118+
environment,
119+
env.OBJECT_STORE_DEFAULT_PROTOCOL
120+
),
121+
{
122+
retries: 3,
123+
minTimeout: 500,
124+
maxTimeout: 2000,
125+
factor: 2,
126+
onFailedAttempt: (error) => {
127+
logger.warn("Batch item payload upload to object store failed, retrying", {
128+
batchId,
129+
itemIndex,
130+
attempt: error.attemptNumber,
131+
retriesLeft: error.retriesLeft,
132+
error: error.message,
133+
});
134+
},
135+
}
116136
)
117137
);
118138

119139
if (uploadError) {
120-
logger.error("Failed to upload batch item payload to object store", {
140+
logger.error("Failed to upload batch item payload to object store after retries", {
121141
batchId,
122142
itemIndex,
123-
error: uploadError instanceof Error ? uploadError.message : String(uploadError),
143+
error: uploadError.message,
124144
});
125145

126-
// Throw to fail this item - SDK can retry
127-
throw new Error(
128-
`Failed to upload large payload to object store: ${
129-
uploadError instanceof Error ? uploadError.message : String(uploadError)
130-
}`
131-
);
146+
throw new Error(`Failed to upload large payload to object store: ${uploadError.message}`);
132147
}
133148

134149
logger.debug("Batch item payload offloaded to object store", {
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2+
3+
// --- Module mocks (must come before imports) ---
4+
5+
vi.mock("~/v3/objectStore.server", () => ({
6+
hasObjectStoreClient: vi.fn().mockReturnValue(true),
7+
uploadPacketToObjectStore: vi.fn(),
8+
}));
9+
10+
// Threshold of 10 bytes so any non-trivial payload triggers offloading
11+
vi.mock("~/env.server", () => ({
12+
env: {
13+
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: 10,
14+
TASK_PAYLOAD_OFFLOAD_THRESHOLD: 10,
15+
OBJECT_STORE_DEFAULT_PROTOCOL: undefined,
16+
},
17+
}));
18+
19+
// Execute the span callback synchronously without real OTel
20+
vi.mock("~/v3/tracer.server", () => ({
21+
startActiveSpan: vi.fn(async (_name: string, fn: (span: any) => any) =>
22+
fn({ setAttribute: vi.fn() })
23+
),
24+
}));
25+
26+
import { BatchPayloadProcessor } from "../../app/runEngine/concerns/batchPayloads.server";
27+
import * as objectStore from "~/v3/objectStore.server";
28+
29+
vi.setConfig({ testTimeout: 30_000 });
30+
31+
// Minimal AuthenticatedEnvironment shape required by BatchPayloadProcessor
32+
const mockEnvironment = {
33+
id: "env-test",
34+
slug: "production",
35+
project: { externalRef: "proj-ext-ref" },
36+
} as any;
37+
38+
describe("BatchPayloadProcessor", () => {
39+
let mockUpload: ReturnType<typeof vi.mocked<typeof objectStore.uploadPacketToObjectStore>>;
40+
41+
beforeEach(() => {
42+
mockUpload = vi.mocked(objectStore.uploadPacketToObjectStore);
43+
mockUpload.mockReset();
44+
});
45+
46+
it("offloads a large payload successfully on first attempt", async () => {
47+
mockUpload.mockResolvedValueOnce("batch_abc/item_0/payload.json");
48+
49+
const processor = new BatchPayloadProcessor();
50+
const result = await processor.process(
51+
'{"message":"hello world"}',
52+
"application/json",
53+
"batch-internal-abc",
54+
0,
55+
mockEnvironment
56+
);
57+
58+
expect(result.wasOffloaded).toBe(true);
59+
expect(result.payloadType).toBe("application/store");
60+
expect(result.payload).toBe("batch_abc/item_0/payload.json");
61+
expect(mockUpload).toHaveBeenCalledTimes(1);
62+
});
63+
64+
it("retries on transient fetch failure and succeeds on third attempt", async () => {
65+
mockUpload
66+
.mockRejectedValueOnce(new Error("fetch failed"))
67+
.mockRejectedValueOnce(new Error("fetch failed"))
68+
.mockResolvedValueOnce("batch_abc/item_0/payload.json");
69+
70+
const processor = new BatchPayloadProcessor();
71+
const result = await processor.process(
72+
'{"message":"hello world"}',
73+
"application/json",
74+
"batch-internal-abc",
75+
0,
76+
mockEnvironment
77+
);
78+
79+
expect(result.wasOffloaded).toBe(true);
80+
expect(mockUpload).toHaveBeenCalledTimes(3);
81+
});
82+
83+
it("throws after exhausting all retry attempts", async () => {
84+
mockUpload.mockRejectedValue(new Error("fetch failed"));
85+
86+
const processor = new BatchPayloadProcessor();
87+
88+
await expect(
89+
processor.process(
90+
'{"message":"hello world"}',
91+
"application/json",
92+
"batch-internal-abc",
93+
0,
94+
mockEnvironment
95+
)
96+
).rejects.toThrow("Failed to upload large payload to object store: fetch failed");
97+
98+
// 1 initial attempt + 3 retries = 4 total calls
99+
expect(mockUpload).toHaveBeenCalledTimes(4);
100+
});
101+
102+
it("does not offload when there is no payload data", async () => {
103+
const processor = new BatchPayloadProcessor();
104+
const result = await processor.process(
105+
undefined,
106+
"application/json",
107+
"batch-internal-abc",
108+
0,
109+
mockEnvironment
110+
);
111+
112+
expect(result.wasOffloaded).toBe(false);
113+
expect(mockUpload).not.toHaveBeenCalled();
114+
});
115+
});

0 commit comments

Comments
 (0)