Skip to content

Commit 193e26e

Browse files
giovaborgognoclaude
andcommitted
fix(events): fix ordering — update all Lua scripts for global concurrency release
Previous commit only updated dequeue/release/enqueue Lua scripts but missed: - acknowledgeMessage: runs completing weren't releasing global concurrency - nackMessage: nacked runs weren't releasing global concurrency - moveToDeadLetterQueue: DLQ'd runs weren't releasing - clearMessageFromConcurrencySets: cleanup wasn't releasing Also: set globalConcurrencyLimit on subscriber task's queue (not dedicated queue) since the dev worker only monitors task queues, not custom queues. Removed queue override from PublishEventService — runs stay in the task's own queue and ordering is enforced by concurrencyKey + concurrencyLimit:1 + globalConcurrencyLimit:N. E2E verified: globalConcurrencyLimit=2 with 3 keys correctly limits to max 2 concurrent runs while maintaining per-key ordering. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a8a5ce3 commit 193e26e

File tree

4 files changed

+64
-10
lines changed

4 files changed

+64
-10
lines changed

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ async function syncWorkerEvents(
373373
orderingQueueName,
374374
1, // per-key limit: always 1 for strict ordering
375375
orderingQueueName,
376-
"SHARED",
376+
"NAMED",
377377
worker,
378378
prisma
379379
);
@@ -426,6 +426,20 @@ async function syncWorkerEvents(
426426
}
427427
}
428428

429+
// Look up event ordering config for this task's event
430+
const eventManifest = metadata.events?.find((e) => e.id === task.onEvent);
431+
const eventOrdering = eventManifest?.ordering;
432+
433+
// If the event has ordering config, set globalConcurrencyLimit on this task's queue
434+
if (eventOrdering?.concurrencyLimit) {
435+
const taskQueueName = `task/${task.id}`;
436+
await updateGlobalQueueConcurrencyLimits(
437+
environment,
438+
taskQueueName,
439+
eventOrdering.concurrencyLimit
440+
);
441+
}
442+
429443
const subscription = await prisma.eventSubscription.upsert({
430444
where: {
431445
eventDefinitionId_taskSlug_environmentId: {

apps/webapp/app/v3/services/events/publishEvent.server.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -303,11 +303,6 @@ export class PublishEventService extends BaseService {
303303
: undefined,
304304
metadata: eventMetadata,
305305
delay: options.delay,
306-
// When ordering key is present, route to dedicated ordering queue
307-
// with concurrencyLimit:1 per key + global limit
308-
queue: options.orderingKey
309-
? { name: `evt-order:${eventSlug}` }
310-
: undefined,
311306
concurrencyKey: options.orderingKey
312307
? `evt:${eventSlug}:${options.orderingKey}`
313308
: undefined,

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2063,6 +2063,7 @@ export class RunQueue {
20632063
envCurrentDequeuedKey,
20642064
envQueueKey,
20652065
workerQueueKey,
2066+
this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(message.queue),
20662067
messageId,
20672068
messageQueue,
20682069
messageKeyValue,
@@ -2105,6 +2106,7 @@ export class RunQueue {
21052106
envCurrentConcurrencyKey,
21062107
queueCurrentDequeuedKey,
21072108
envCurrentDequeuedKey,
2109+
this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(queue),
21082110
messageId
21092111
);
21102112
}
@@ -2151,6 +2153,7 @@ export class RunQueue {
21512153
queueCurrentDequeuedKey,
21522154
envCurrentDequeuedKey,
21532155
envQueueKey,
2156+
this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(message.queue),
21542157
//args
21552158
messageId,
21562159
messageQueue,
@@ -2184,6 +2187,7 @@ export class RunQueue {
21842187
envCurrentDequeuedKey,
21852188
envQueueKey,
21862189
deadLetterQueueKey,
2190+
this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(message.queue),
21872191
messageId,
21882192
messageQueue
21892193
);
@@ -2919,7 +2923,7 @@ return message
29192923
});
29202924

29212925
this.redis.defineCommand("acknowledgeMessage", {
2922-
numberOfKeys: 9,
2926+
numberOfKeys: 10,
29232927
lua: `
29242928
-- Keys:
29252929
local masterQueueKey = KEYS[1]
@@ -2931,6 +2935,7 @@ local queueCurrentDequeuedKey = KEYS[6]
29312935
local envCurrentDequeuedKey = KEYS[7]
29322936
local envQueueKey = KEYS[8]
29332937
local workerQueueKey = KEYS[9]
2938+
local globalCurrentConcurrencyKey = KEYS[10]
29342939
29352940
-- Args:
29362941
local messageId = ARGV[1]
@@ -2955,6 +2960,7 @@ end
29552960
29562961
-- Update the concurrency keys
29572962
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
2963+
redis.call('SREM', globalCurrentConcurrencyKey, messageId)
29582964
redis.call('SREM', envCurrentConcurrencyKey, messageId)
29592965
redis.call('SREM', queueCurrentDequeuedKey, messageId)
29602966
redis.call('SREM', envCurrentDequeuedKey, messageId)
@@ -2967,7 +2973,7 @@ end
29672973
});
29682974

29692975
this.redis.defineCommand("nackMessage", {
2970-
numberOfKeys: 8,
2976+
numberOfKeys: 9,
29712977
lua: `
29722978
-- Keys:
29732979
local masterQueueKey = KEYS[1]
@@ -2978,6 +2984,7 @@ local envCurrentConcurrencyKey = KEYS[5]
29782984
local queueCurrentDequeuedKey = KEYS[6]
29792985
local envCurrentDequeuedKey = KEYS[7]
29802986
local envQueueKey = KEYS[8]
2987+
local globalCurrentConcurrencyKey = KEYS[9]
29812988
29822989
-- Args:
29832990
local messageId = ARGV[1]
@@ -2990,6 +2997,7 @@ redis.call('SET', messageKey, messageData)
29902997
29912998
-- Update the concurrency keys
29922999
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
3000+
redis.call('SREM', globalCurrentConcurrencyKey, messageId)
29933001
redis.call('SREM', envCurrentConcurrencyKey, messageId)
29943002
redis.call('SREM', queueCurrentDequeuedKey, messageId)
29953003
redis.call('SREM', envCurrentDequeuedKey, messageId)
@@ -3009,7 +3017,7 @@ end
30093017
});
30103018

30113019
this.redis.defineCommand("moveToDeadLetterQueue", {
3012-
numberOfKeys: 9,
3020+
numberOfKeys: 10,
30133021
lua: `
30143022
-- Keys:
30153023
local masterQueueKey = KEYS[1]
@@ -3021,6 +3029,7 @@ local queueCurrentDequeuedKey = KEYS[6]
30213029
local envCurrentDequeuedKey = KEYS[7]
30223030
local envQueueKey = KEYS[8]
30233031
local deadLetterQueueKey = KEYS[9]
3032+
local globalCurrentConcurrencyKey = KEYS[10]
30243033
30253034
-- Args:
30263035
local messageId = ARGV[1]
@@ -3043,6 +3052,7 @@ redis.call('ZADD', deadLetterQueueKey, tonumber(redis.call('TIME')[1]), messageI
30433052
30443053
-- Update the concurrency keys
30453054
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
3055+
redis.call('SREM', globalCurrentConcurrencyKey, messageId)
30463056
redis.call('SREM', envCurrentConcurrencyKey, messageId)
30473057
redis.call('SREM', queueCurrentDequeuedKey, messageId)
30483058
redis.call('SREM', envCurrentDequeuedKey, messageId)
@@ -3151,19 +3161,21 @@ return results
31513161
});
31523162

31533163
this.redis.defineCommand("clearMessageFromConcurrencySets", {
3154-
numberOfKeys: 4,
3164+
numberOfKeys: 5,
31553165
lua: `
31563166
-- Keys:
31573167
local queueCurrentConcurrencyKey = KEYS[1]
31583168
local envCurrentConcurrencyKey = KEYS[2]
31593169
local queueCurrentDequeuedKey = KEYS[3]
31603170
local envCurrentDequeuedKey = KEYS[4]
3171+
local globalCurrentConcurrencyKey = KEYS[5]
31613172
31623173
-- Args:
31633174
local messageId = ARGV[1]
31643175
31653176
-- Update the concurrency keys
31663177
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
3178+
redis.call('SREM', globalCurrentConcurrencyKey, messageId)
31673179
redis.call('SREM', envCurrentConcurrencyKey, messageId)
31683180
redis.call('SREM', queueCurrentDequeuedKey, messageId)
31693181
redis.call('SREM', envCurrentDequeuedKey, messageId)
@@ -3285,6 +3297,7 @@ declare module "@internal/redis" {
32853297
envCurrentDequeuedKey: string,
32863298
envQueueKey: string,
32873299
workerQueueKey: string,
3300+
globalCurrentConcurrencyKey: string,
32883301
// args
32893302
messageId: string,
32903303
messageQueueName: string,
@@ -3299,6 +3312,7 @@ declare module "@internal/redis" {
32993312
envCurrentConcurrencyKey: string,
33003313
queueCurrentDequeuedKey: string,
33013314
envCurrentDequeuedKey: string,
3315+
globalCurrentConcurrencyKey: string,
33023316
// args
33033317
messageId: string,
33043318
callback?: Callback<void>
@@ -3314,6 +3328,7 @@ declare module "@internal/redis" {
33143328
queueCurrentDequeuedKey: string,
33153329
envCurrentDequeuedKey: string,
33163330
envQueueKey: string,
3331+
globalCurrentConcurrencyKey: string,
33173332
// args
33183333
messageId: string,
33193334
messageQueueName: string,
@@ -3333,6 +3348,7 @@ declare module "@internal/redis" {
33333348
envCurrentDequeuedKey: string,
33343349
envQueueKey: string,
33353350
deadLetterQueueKey: string,
3351+
globalCurrentConcurrencyKey: string,
33363352
// args
33373353
messageId: string,
33383354
messageQueueName: string,
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { event, task, logger } from "@trigger.dev/sdk";
2+
import { z } from "zod";
3+
4+
// Define event with ordering config
5+
export const testEvent = event({
6+
id: "test.greeting",
7+
schema: z.object({
8+
name: z.string(),
9+
message: z.string(),
10+
}),
11+
ordering: {
12+
concurrencyLimit: 2,
13+
},
14+
});
15+
16+
// Slow subscriber with concurrencyLimit: 1 for ordering
17+
// The concurrencyLimit:1 ensures per-key ordering when used with orderingKey
18+
export const slowWorker = task({
19+
id: "slow-greeting-worker",
20+
on: testEvent,
21+
queue: { concurrencyLimit: 1 },
22+
run: async (payload) => {
23+
const start = Date.now();
24+
logger.info(`[slow-worker] START "${payload.name}" at ${new Date().toISOString()}`);
25+
await new Promise((r) => setTimeout(r, 2000));
26+
logger.info(`[slow-worker] END "${payload.name}" after ${Date.now() - start}ms`);
27+
return { name: payload.name, duration: Date.now() - start };
28+
},
29+
});

0 commit comments

Comments
 (0)