-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathbatchPayloads.test.ts
More file actions
115 lines (94 loc) · 3.43 KB
/
batchPayloads.test.ts
File metadata and controls
115 lines (94 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// --- Module mocks (must come before imports) ---
vi.mock("~/v3/objectStore.server", () => ({
hasObjectStoreClient: vi.fn().mockReturnValue(true),
uploadPacketToObjectStore: vi.fn(),
}));
// Threshold of 10 bytes so any non-trivial payload triggers offloading
vi.mock("~/env.server", () => ({
env: {
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: 10,
TASK_PAYLOAD_OFFLOAD_THRESHOLD: 10,
OBJECT_STORE_DEFAULT_PROTOCOL: undefined,
},
}));
// Execute the span callback synchronously without real OTel
vi.mock("~/v3/tracer.server", () => ({
startActiveSpan: vi.fn(async (_name: string, fn: (span: any) => any) =>
fn({ setAttribute: vi.fn() })
),
}));
import { BatchPayloadProcessor } from "../../app/runEngine/concerns/batchPayloads.server";
import * as objectStore from "~/v3/objectStore.server";
vi.setConfig({ testTimeout: 30_000 });
// Minimal AuthenticatedEnvironment shape required by BatchPayloadProcessor
const mockEnvironment = {
id: "env-test",
slug: "production",
project: { externalRef: "proj-ext-ref" },
} as any;
describe("BatchPayloadProcessor", () => {
let mockUpload: ReturnType<typeof vi.mocked<typeof objectStore.uploadPacketToObjectStore>>;
beforeEach(() => {
mockUpload = vi.mocked(objectStore.uploadPacketToObjectStore);
mockUpload.mockReset();
});
it("offloads a large payload successfully on first attempt", async () => {
mockUpload.mockResolvedValueOnce("batch_abc/item_0/payload.json");
const processor = new BatchPayloadProcessor();
const result = await processor.process(
'{"message":"hello world"}',
"application/json",
"batch-internal-abc",
0,
mockEnvironment
);
expect(result.wasOffloaded).toBe(true);
expect(result.payloadType).toBe("application/store");
expect(result.payload).toBe("batch_abc/item_0/payload.json");
expect(mockUpload).toHaveBeenCalledTimes(1);
});
it("retries on transient fetch failure and succeeds on third attempt", async () => {
mockUpload
.mockRejectedValueOnce(new Error("fetch failed"))
.mockRejectedValueOnce(new Error("fetch failed"))
.mockResolvedValueOnce("batch_abc/item_0/payload.json");
const processor = new BatchPayloadProcessor();
const result = await processor.process(
'{"message":"hello world"}',
"application/json",
"batch-internal-abc",
0,
mockEnvironment
);
expect(result.wasOffloaded).toBe(true);
expect(mockUpload).toHaveBeenCalledTimes(3);
});
it("throws after exhausting all retry attempts", async () => {
mockUpload.mockRejectedValue(new Error("fetch failed"));
const processor = new BatchPayloadProcessor();
await expect(
processor.process(
'{"message":"hello world"}',
"application/json",
"batch-internal-abc",
0,
mockEnvironment
)
).rejects.toThrow("Failed to upload large payload to object store: fetch failed");
// 1 initial attempt + 3 retries = 4 total calls
expect(mockUpload).toHaveBeenCalledTimes(4);
});
it("does not offload when there is no payload data", async () => {
const processor = new BatchPayloadProcessor();
const result = await processor.process(
undefined,
"application/json",
"batch-internal-abc",
0,
mockEnvironment
);
expect(result.wasOffloaded).toBe(false);
expect(mockUpload).not.toHaveBeenCalled();
});
});