-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathnack.test.ts
More file actions
278 lines (241 loc) · 8.36 KB
/
nack.test.ts
File metadata and controls
278 lines (241 loc) · 8.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import { redisTest } from "@internal/testcontainers";
import { trace } from "@internal/tracing";
import { Logger } from "@trigger.dev/core/logger";
import { describe } from "node:test";
import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js";
import { RunQueue } from "../index.js";
import { RunQueueFullKeyProducer } from "../keyProducer.js";
import { InputPayload } from "../types.js";
import { setTimeout } from "node:timers/promises";
const testOptions = {
name: "rq",
tracer: trace.getTracer("rq"),
workers: 1,
defaultEnvConcurrency: 25,
logger: new Logger("RunQueue", "warn"),
retryOptions: {
maxAttempts: 5,
factor: 1.1,
minTimeoutInMs: 100,
maxTimeoutInMs: 1_000,
randomize: true,
},
keys: new RunQueueFullKeyProducer(),
};
const authenticatedEnvDev = {
id: "e1234",
type: "DEVELOPMENT" as const,
maximumConcurrencyLimit: 10,
project: { id: "p1234" },
organization: { id: "o1234" },
};
const messageDev: InputPayload = {
runId: "r4321",
taskIdentifier: "task/my-task",
orgId: "o1234",
projectId: "p1234",
environmentId: "e4321",
environmentType: "DEVELOPMENT",
queue: "task/my-task",
timestamp: Date.now(),
attempt: 0,
};
vi.setConfig({ testTimeout: 60_000 });
describe("RunQueue.nackMessage", () => {
redisTest("nacking a message clears all concurrency", async ({ redisContainer }) => {
const queue = new RunQueue({
...testOptions,
queueSelectionStrategy: new FairQueueSelectionStrategy({
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
keys: testOptions.keys,
}),
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
});
try {
const envMasterQueue = `env:${authenticatedEnvDev.id}`;
// Enqueue message with reserve concurrency
await queue.enqueueMessage({
env: authenticatedEnvDev,
message: messageDev,
masterQueues: ["main", envMasterQueue],
});
// Dequeue message
const dequeued = await queue.dequeueMessageFromMasterQueue("test_12345", envMasterQueue, 10);
expect(dequeued.length).toBe(1);
// Verify current concurrency is set and reserve is cleared
const queueCurrentConcurrency = await queue.currentConcurrencyOfQueue(
authenticatedEnvDev,
messageDev.queue
);
expect(queueCurrentConcurrency).toBe(1);
const envCurrentConcurrency = await queue.currentConcurrencyOfEnvironment(
authenticatedEnvDev
);
expect(envCurrentConcurrency).toBe(1);
// Nack the message
await queue.nackMessage({
orgId: messageDev.orgId,
messageId: messageDev.runId,
});
// Verify all concurrency is cleared
const queueCurrentConcurrencyAfterNack = await queue.currentConcurrencyOfQueue(
authenticatedEnvDev,
messageDev.queue
);
expect(queueCurrentConcurrencyAfterNack).toBe(0);
const envCurrentConcurrencyAfterNack = await queue.currentConcurrencyOfEnvironment(
authenticatedEnvDev
);
expect(envCurrentConcurrencyAfterNack).toBe(0);
const envQueueLength = await queue.lengthOfEnvQueue(authenticatedEnvDev);
expect(envQueueLength).toBe(1);
const message = await queue.readMessage(messageDev.orgId, messageDev.runId);
expect(message?.attempt).toBe(1);
//we need to wait because the default wait is 1 second
await setTimeout(300);
// Now we should be able to dequeue it again
const dequeued2 = await queue.dequeueMessageFromMasterQueue("test_12345", envMasterQueue, 10);
expect(dequeued2.length).toBe(1);
} finally {
await queue.quit();
}
});
redisTest(
"nacking a message with maxAttempts reached should be moved to dead letter queue",
async ({ redisContainer }) => {
const queue = new RunQueue({
...testOptions,
retryOptions: {
...testOptions.retryOptions,
maxAttempts: 2, // Set lower for testing
},
queueSelectionStrategy: new FairQueueSelectionStrategy({
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
keys: testOptions.keys,
}),
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
});
try {
const envMasterQueue = `env:${authenticatedEnvDev.id}`;
await queue.enqueueMessage({
env: authenticatedEnvDev,
message: messageDev,
masterQueues: ["main", envMasterQueue],
});
const dequeued = await queue.dequeueMessageFromMasterQueue(
"test_12345",
envMasterQueue,
10
);
expect(dequeued.length).toBe(1);
await queue.nackMessage({
orgId: messageDev.orgId,
messageId: messageDev.runId,
});
// Wait for any requeue delay
await setTimeout(300);
// Message should not be requeued as max attempts reached
const envQueueLength = await queue.lengthOfEnvQueue(authenticatedEnvDev);
expect(envQueueLength).toBe(1);
const message = await queue.readMessage(messageDev.orgId, messageDev.runId);
expect(message?.attempt).toBe(1);
// Now we dequeue and nack again, and it should be moved to dead letter queue
const dequeued3 = await queue.dequeueMessageFromMasterQueue(
"test_12345",
envMasterQueue,
10
);
expect(dequeued3.length).toBe(1);
const envQueueLengthDequeue = await queue.lengthOfEnvQueue(authenticatedEnvDev);
expect(envQueueLengthDequeue).toBe(0);
const deadLetterQueueLengthBefore = await queue.lengthOfDeadLetterQueue(
authenticatedEnvDev
);
expect(deadLetterQueueLengthBefore).toBe(0);
await queue.nackMessage({
orgId: messageDev.orgId,
messageId: messageDev.runId,
});
const envQueueLengthAfterNack = await queue.lengthOfEnvQueue(authenticatedEnvDev);
expect(envQueueLengthAfterNack).toBe(0);
const deadLetterQueueLengthAfterNack = await queue.lengthOfDeadLetterQueue(
authenticatedEnvDev
);
expect(deadLetterQueueLengthAfterNack).toBe(1);
} finally {
await queue.quit();
}
}
);
redisTest(
"nacking a message with retryAt sets the correct requeue time",
async ({ redisContainer }) => {
const queue = new RunQueue({
...testOptions,
queueSelectionStrategy: new FairQueueSelectionStrategy({
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
keys: testOptions.keys,
}),
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
});
try {
const envMasterQueue = `env:${authenticatedEnvDev.id}`;
// Enqueue message
await queue.enqueueMessage({
env: authenticatedEnvDev,
message: messageDev,
masterQueues: ["main", envMasterQueue],
});
// Dequeue message
const dequeued = await queue.dequeueMessageFromMasterQueue(
"test_12345",
envMasterQueue,
10
);
expect(dequeued.length).toBe(1);
// Set retryAt to 5 seconds in the future
const retryAt = Date.now() + 5000;
await queue.nackMessage({
orgId: messageDev.orgId,
messageId: messageDev.runId,
retryAt,
});
// Check the score of the message in the queue
const queueKey = queue.keys.queueKey(authenticatedEnvDev, messageDev.queue);
const score = await queue.oldestMessageInQueue(authenticatedEnvDev, messageDev.queue);
expect(typeof score).toBe("number");
if (typeof score !== "number") {
throw new Error("Expected score to be a number, but got undefined");
}
// Should be within 100ms of retryAt
expect(Math.abs(score - retryAt)).toBeLessThanOrEqual(100);
} finally {
await queue.quit();
}
}
);
});