Skip to content

Commit ad83f88

Browse files
giovaborgognoclaude
andcommitted
feat(events): ordering key with per-key serialization + global concurrency limit
Run engine changes: - Add globalConcurrencyLimit (gcl) and globalCurrentConcurrency (gcc) Redis keys - Modify dequeueMessagesFromQueue Lua: check global limit when gcl exists - Modify releaseConcurrency Lua: SREM from gcc set - Modify enqueueMessage/enqueueMessageWithTtl: SREM from gcc on re-enqueue - No impact on existing queues (gcl check only runs when key exists) Event system changes: - PublishEventService overrides queue to `evt-order:{eventSlug}` when orderingKey present - Deploy creates ordering queue with concurrencyLimit:1 (per-key) + global limit - SDK event() accepts `ordering: { concurrencyLimit: N }` config - EventManifest/EventMetadata include ordering field Behavior: orderingKey guarantees strict per-key ordering (1 at a time per key) while concurrencyLimit controls total parallel runs across all keys. Run-engine tests: 236 pass, 2 fail (pre-existing flaky, not caused by this change) Event integration tests: 24/24 pass Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3476f76 commit ad83f88

File tree

10 files changed

+151
-7
lines changed

10 files changed

+151
-7
lines changed

apps/webapp/app/v3/runQueue.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,15 @@ export async function updateQueueConcurrencyLimits(
3232
]);
3333
}
3434

35+
/** Updates the global concurrency limit for a queue (across all concurrency keys) */
36+
export async function updateGlobalQueueConcurrencyLimits(
37+
environment: AuthenticatedEnvironment,
38+
queueName: string,
39+
concurrency: number
40+
) {
41+
await engine.runQueue.updateGlobalQueueConcurrencyLimits(environment, queueName, concurrency);
42+
}
43+
3544
/** Removes MARQS and the RunQueue limits for a queue */
3645
export async function removeQueueConcurrencyLimits(
3746
environment: AuthenticatedEnvironment,

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
removeQueueConcurrencyLimits,
1919
updateEnvConcurrencyLimits,
2020
updateQueueConcurrencyLimits,
21+
updateGlobalQueueConcurrencyLimits,
2122
} from "../runQueue.server";
2223
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
2324
import { clampMaxDuration } from "../utils/maxDuration";
@@ -362,6 +363,26 @@ async function syncWorkerEvents(
362363
});
363364

364365
eventDefinitions.set(event.id, eventDef.id);
366+
367+
// Create ordering queue for events that have ordering config
368+
if (event.ordering) {
369+
const orderingQueueName = `evt-order:${event.id}`;
370+
const globalLimit = event.ordering.concurrencyLimit ?? environment.maximumConcurrencyLimit;
371+
372+
await upsertWorkerQueueRecord(
373+
orderingQueueName,
374+
1, // per-key limit: always 1 for strict ordering
375+
orderingQueueName,
376+
"SHARED",
377+
worker,
378+
prisma
379+
);
380+
381+
// Set per-key limit = 1 in Redis
382+
await updateQueueConcurrencyLimits(environment, orderingQueueName, 1);
383+
// Set global limit in Redis (total concurrent across all keys)
384+
await updateGlobalQueueConcurrencyLimits(environment, orderingQueueName, globalLimit);
385+
}
365386
}
366387
}
367388

apps/webapp/app/v3/services/events/publishEvent.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,11 @@ export class PublishEventService extends BaseService {
303303
: undefined,
304304
metadata: eventMetadata,
305305
delay: options.delay,
306+
// When ordering key is present, route to dedicated ordering queue
307+
// with concurrencyLimit:1 per key + global limit
308+
queue: options.orderingKey
309+
? { name: `evt-order:${eventSlug}` }
310+
: undefined,
306311
concurrencyKey: options.orderingKey
307312
? `evt:${eventSlug}:${options.orderingKey}`
308313
: undefined,

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,21 @@ export class RunQueue {
341341
return this.redis.del(this.keys.queueConcurrencyLimitKey(env, queue));
342342
}
343343

344+
public async updateGlobalQueueConcurrencyLimits(
345+
env: MinimalAuthenticatedEnvironment,
346+
queue: string,
347+
concurrency: number
348+
) {
349+
return this.redis.set(this.keys.queueGlobalConcurrencyLimitKey(env, queue), concurrency);
350+
}
351+
352+
public async removeGlobalQueueConcurrencyLimits(
353+
env: MinimalAuthenticatedEnvironment,
354+
queue: string
355+
) {
356+
return this.redis.del(this.keys.queueGlobalConcurrencyLimitKey(env, queue));
357+
}
358+
344359
public async getQueueConcurrencyLimit(env: MinimalAuthenticatedEnvironment, queue: string) {
345360
const result = await this.redis.get(this.keys.queueConcurrencyLimitKey(env, queue));
346361

@@ -906,6 +921,7 @@ export class RunQueue {
906921
this.keys.envCurrentConcurrencyKeyFromQueue(message.queue),
907922
this.keys.queueCurrentDequeuedKeyFromQueue(message.queue),
908923
this.keys.envCurrentDequeuedKeyFromQueue(message.queue),
924+
this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(message.queue),
909925
messageId
910926
);
911927
},
@@ -1655,6 +1671,7 @@ export class RunQueue {
16551671
const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKeyFromQueue(message.queue);
16561672
const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKeyFromQueue(message.queue);
16571673
const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue);
1674+
const globalCurrentConcurrencyKey = this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(message.queue);
16581675
const masterQueueKey = this.keys.masterQueueKeyForEnvironment(
16591676
message.environmentId,
16601677
this.shardCount
@@ -1694,6 +1711,7 @@ export class RunQueue {
16941711
envCurrentDequeuedKey,
16951712
envQueueKey,
16961713
ttlInfo.ttlQueueKey,
1714+
globalCurrentConcurrencyKey,
16971715
queueName,
16981716
messageId,
16991717
messageData,
@@ -1711,6 +1729,7 @@ export class RunQueue {
17111729
queueCurrentDequeuedKey,
17121730
envCurrentDequeuedKey,
17131731
envQueueKey,
1732+
globalCurrentConcurrencyKey,
17141733
queueName,
17151734
messageId,
17161735
messageData,
@@ -1745,6 +1764,10 @@ export class RunQueue {
17451764
const messageKeyPrefix = this.keys.messageKeyPrefixFromQueue(messageQueue);
17461765
const envQueueKey = this.keys.envQueueKeyFromQueue(messageQueue);
17471766
const masterQueueKey = this.keys.masterQueueKeyForShard(shard);
1767+
const globalConcurrencyLimitKey =
1768+
this.keys.queueGlobalConcurrencyLimitKeyFromQueue(messageQueue);
1769+
const globalCurrentConcurrencyKey =
1770+
this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(messageQueue);
17481771

17491772
// Get TTL queue key if TTL system is enabled
17501773
const ttlShardCount = this.options.ttlSystem?.shardCount ?? this.shardCount;
@@ -1767,6 +1790,8 @@ export class RunQueue {
17671790
envQueueKey,
17681791
masterQueueKey,
17691792
ttlQueueKey,
1793+
globalConcurrencyLimitKey,
1794+
globalCurrentConcurrencyKey,
17701795
shard,
17711796
maxCount,
17721797
});
@@ -1783,6 +1808,8 @@ export class RunQueue {
17831808
envQueueKey,
17841809
masterQueueKey,
17851810
ttlQueueKey,
1811+
globalConcurrencyLimitKey,
1812+
globalCurrentConcurrencyKey,
17861813
//args
17871814
messageQueue,
17881815
String(Date.now()),
@@ -2502,7 +2529,7 @@ end
25022529
});
25032530

25042531
this.redis.defineCommand("enqueueMessage", {
2505-
numberOfKeys: 8,
2532+
numberOfKeys: 9,
25062533
lua: `
25072534
local masterQueueKey = KEYS[1]
25082535
local queueKey = KEYS[2]
@@ -2512,6 +2539,7 @@ local envCurrentConcurrencyKey = KEYS[5]
25122539
local queueCurrentDequeuedKey = KEYS[6]
25132540
local envCurrentDequeuedKey = KEYS[7]
25142541
local envQueueKey = KEYS[8]
2542+
local globalCurrentConcurrencyKey = KEYS[9]
25152543
25162544
local queueName = ARGV[1]
25172545
local messageId = ARGV[2]
@@ -2541,12 +2569,13 @@ redis.call('SREM', queueCurrentConcurrencyKey, messageId)
25412569
redis.call('SREM', envCurrentConcurrencyKey, messageId)
25422570
redis.call('SREM', queueCurrentDequeuedKey, messageId)
25432571
redis.call('SREM', envCurrentDequeuedKey, messageId)
2572+
redis.call('SREM', globalCurrentConcurrencyKey, messageId)
25442573
`,
25452574
});
25462575

25472576
// Enqueue with TTL tracking - atomically adds to both normal queue and TTL sorted set
25482577
this.redis.defineCommand("enqueueMessageWithTtl", {
2549-
numberOfKeys: 9,
2578+
numberOfKeys: 10,
25502579
lua: `
25512580
local masterQueueKey = KEYS[1]
25522581
local queueKey = KEYS[2]
@@ -2557,6 +2586,7 @@ local queueCurrentDequeuedKey = KEYS[6]
25572586
local envCurrentDequeuedKey = KEYS[7]
25582587
local envQueueKey = KEYS[8]
25592588
local ttlQueueKey = KEYS[9]
2589+
local globalCurrentConcurrencyKey = KEYS[10]
25602590
25612591
local queueName = ARGV[1]
25622592
local messageId = ARGV[2]
@@ -2591,6 +2621,7 @@ redis.call('SREM', queueCurrentConcurrencyKey, messageId)
25912621
redis.call('SREM', envCurrentConcurrencyKey, messageId)
25922622
redis.call('SREM', queueCurrentDequeuedKey, messageId)
25932623
redis.call('SREM', envCurrentDequeuedKey, messageId)
2624+
redis.call('SREM', globalCurrentConcurrencyKey, messageId)
25942625
`,
25952626
});
25962627

@@ -2692,7 +2723,7 @@ return results
26922723
});
26932724

26942725
this.redis.defineCommand("dequeueMessagesFromQueue", {
2695-
numberOfKeys: 10,
2726+
numberOfKeys: 12,
26962727
lua: `
26972728
local queueKey = KEYS[1]
26982729
local queueConcurrencyLimitKey = KEYS[2]
@@ -2704,6 +2735,8 @@ local messageKeyPrefix = KEYS[7]
27042735
local envQueueKey = KEYS[8]
27052736
local masterQueueKey = KEYS[9]
27062737
local ttlQueueKey = KEYS[10] -- Optional: TTL sorted set key (empty string if not used)
2738+
local globalConcurrencyLimitKey = KEYS[11] -- Global queue concurrency limit (without :ck:)
2739+
local globalCurrentConcurrencyKey = KEYS[12] -- Global queue concurrency tracking (without :ck:)
27072740
27082741
local queueName = ARGV[1]
27092742
local currentTime = tonumber(ARGV[2])
@@ -2722,7 +2755,7 @@ if envCurrentConcurrency >= envConcurrencyLimitWithBurstFactor then
27222755
return nil
27232756
end
27242757
2725-
-- Check current queue concurrency against the limit
2758+
-- Check current queue concurrency against the limit (per-key when concurrencyKey is used)
27262759
local queueCurrentConcurrency = tonumber(redis.call('SCARD', queueCurrentConcurrencyKey) or '0')
27272760
local queueConcurrencyLimit = math.min(tonumber(redis.call('GET', queueConcurrencyLimitKey) or '1000000'), envConcurrencyLimit)
27282761
local totalQueueConcurrencyLimit = queueConcurrencyLimit
@@ -2732,10 +2765,23 @@ if queueCurrentConcurrency >= totalQueueConcurrencyLimit then
27322765
return nil
27332766
end
27342767
2768+
-- Check global queue concurrency limit (across all concurrency keys)
2769+
-- Only applies when globalConcurrencyLimitKey is set (e.g. for event ordering queues)
2770+
local globalAvailableCapacity = 1000000
2771+
local globalConcurrencyLimitRaw = redis.call('GET', globalConcurrencyLimitKey)
2772+
if globalConcurrencyLimitRaw then
2773+
local globalConcurrencyLimit = tonumber(globalConcurrencyLimitRaw)
2774+
local globalCurrentConcurrency = tonumber(redis.call('SCARD', globalCurrentConcurrencyKey) or '0')
2775+
if globalCurrentConcurrency >= globalConcurrencyLimit then
2776+
return nil
2777+
end
2778+
globalAvailableCapacity = globalConcurrencyLimit - globalCurrentConcurrency
2779+
end
2780+
27352781
-- Calculate how many messages we can actually dequeue based on concurrency limits
27362782
local envAvailableCapacity = envConcurrencyLimitWithBurstFactor - envCurrentConcurrency
27372783
local queueAvailableCapacity = totalQueueConcurrencyLimit - queueCurrentConcurrency
2738-
local actualMaxCount = math.min(maxCount, envAvailableCapacity, queueAvailableCapacity)
2784+
local actualMaxCount = math.min(maxCount, envAvailableCapacity, queueAvailableCapacity, globalAvailableCapacity)
27392785
27402786
if actualMaxCount <= 0 then
27412787
return nil
@@ -2779,6 +2825,11 @@ for i = 1, #messages, 2 do
27792825
redis.call('SADD', queueCurrentConcurrencyKey, messageId)
27802826
redis.call('SADD', envCurrentConcurrencyKey, messageId)
27812827
2828+
-- Track global queue concurrency (only if global limit is configured)
2829+
if globalConcurrencyLimitRaw then
2830+
redis.call('SADD', globalCurrentConcurrencyKey, messageId)
2831+
end
2832+
27822833
-- Remove from TTL set if provided (run is being executed, not expired)
27832834
if ttlQueueKey and ttlQueueKey ~= '' and ttlExpiresAt then
27842835
local ttlMember = queueName .. '|' .. messageId .. '|' .. (messageData.orgId or '')
@@ -2999,13 +3050,14 @@ redis.call('SREM', envCurrentDequeuedKey, messageId)
29993050
});
30003051

30013052
this.redis.defineCommand("releaseConcurrency", {
3002-
numberOfKeys: 4,
3053+
numberOfKeys: 5,
30033054
lua: `
30043055
-- Keys:
30053056
local queueCurrentConcurrencyKey = KEYS[1]
30063057
local envCurrentConcurrencyKey = KEYS[2]
30073058
local queueCurrentDequeuedKey = KEYS[3]
30083059
local envCurrentDequeuedKey = KEYS[4]
3060+
local globalCurrentConcurrencyKey = KEYS[5]
30093061
30103062
-- Args:
30113063
local messageId = ARGV[1]
@@ -3015,6 +3067,7 @@ redis.call('SREM', queueCurrentConcurrencyKey, messageId)
30153067
redis.call('SREM', envCurrentConcurrencyKey, messageId)
30163068
redis.call('SREM', queueCurrentDequeuedKey, messageId)
30173069
redis.call('SREM', envCurrentDequeuedKey, messageId)
3070+
redis.call('SREM', globalCurrentConcurrencyKey, messageId)
30183071
`,
30193072
});
30203073

@@ -3139,6 +3192,7 @@ declare module "@internal/redis" {
31393192
queueCurrentDequeuedKey: string,
31403193
envCurrentDequeuedKey: string,
31413194
envQueueKey: string,
3195+
globalCurrentConcurrencyKey: string,
31423196
//args
31433197
queueName: string,
31443198
messageId: string,
@@ -3158,6 +3212,7 @@ declare module "@internal/redis" {
31583212
envCurrentDequeuedKey: string,
31593213
envQueueKey: string,
31603214
ttlQueueKey: string,
3215+
globalCurrentConcurrencyKey: string,
31613216
//args
31623217
queueName: string,
31633218
messageId: string,
@@ -3194,6 +3249,8 @@ declare module "@internal/redis" {
31943249
envQueueKey: string,
31953250
masterQueueKey: string,
31963251
ttlQueueKey: string,
3252+
globalConcurrencyLimitKey: string,
3253+
globalCurrentConcurrencyKey: string,
31973254
//args
31983255
childQueueName: string,
31993256
currentTime: string,
@@ -3288,6 +3345,7 @@ declare module "@internal/redis" {
32883345
envCurrentConcurrencyKey: string,
32893346
queueCurrentDequeuedKey: string,
32903347
envCurrentDequeuedKey: string,
3348+
globalCurrentConcurrencyKey: string,
32913349
// args
32923350
messageId: string,
32933351
callback?: Callback<void>

internal-packages/run-engine/src/run-queue/keyProducer.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const constants = {
1717
DEAD_LETTER_QUEUE_PART: "deadLetter",
1818
MASTER_QUEUE_PART: "masterQueue",
1919
WORKER_QUEUE_PART: "workerQueue",
20+
GLOBAL_CONCURRENCY_LIMIT_PART: "globalConcurrency",
21+
GLOBAL_CURRENT_CONCURRENCY_PART: "globalCurrentConcurrency",
2022
} as const;
2123

2224
export class RunQueueFullKeyProducer implements RunQueueKeyProducer {
@@ -138,6 +140,20 @@ export class RunQueueFullKeyProducer implements RunQueueKeyProducer {
138140
return `${concurrencyQueueName}:${constants.CONCURRENCY_LIMIT_PART}`;
139141
}
140142

143+
queueGlobalConcurrencyLimitKeyFromQueue(queue: string) {
144+
const globalQueueName = queue.replace(/:ck:.+$/, "");
145+
return `${globalQueueName}:${constants.GLOBAL_CONCURRENCY_LIMIT_PART}`;
146+
}
147+
148+
queueGlobalCurrentConcurrencyKeyFromQueue(queue: string) {
149+
const globalQueueName = queue.replace(/:ck:.+$/, "");
150+
return `${globalQueueName}:${constants.GLOBAL_CURRENT_CONCURRENCY_PART}`;
151+
}
152+
153+
queueGlobalConcurrencyLimitKey(env: RunQueueKeyProducerEnvironment, queue: string) {
154+
return [this.queueKey(env, queue), constants.GLOBAL_CONCURRENCY_LIMIT_PART].join(":");
155+
}
156+
141157
queueCurrentConcurrencyKeyFromQueue(queue: string) {
142158
return `${queue}:${constants.CURRENT_CONCURRENCY_PART}`;
143159
}

internal-packages/run-engine/src/run-queue/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ export interface RunQueueKeyProducer {
7575
envQueueKeyFromQueue(queue: string): string;
7676
queueConcurrencyLimitKey(env: RunQueueKeyProducerEnvironment, queue: string): string;
7777
queueConcurrencyLimitKeyFromQueue(queue: string): string;
78+
queueGlobalConcurrencyLimitKeyFromQueue(queue: string): string;
79+
queueGlobalCurrentConcurrencyKeyFromQueue(queue: string): string;
80+
queueGlobalConcurrencyLimitKey(env: RunQueueKeyProducerEnvironment, queue: string): string;
7881
queueCurrentConcurrencyKeyFromQueue(queue: string): string;
7982
queueCurrentConcurrencyKey(
8083
env: RunQueueKeyProducerEnvironment,

packages/core/src/v3/resource-catalog/catalog.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ export interface EventMetadata {
99
rawSchema?: unknown;
1010
/** Rate limit configuration */
1111
rateLimit?: { limit: number; window: string };
12+
/** Ordering configuration — enables per-key serialization with global concurrency limit */
13+
ordering?: { concurrencyLimit?: number };
1214
}
1315

1416
export interface ResourceCatalog {

packages/core/src/v3/resource-catalog/standardResourceCatalog.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ export class StandardResourceCatalog implements ResourceCatalog {
191191
version: event.version,
192192
description: event.description,
193193
rateLimit: event.rateLimit,
194+
ordering: event.ordering,
194195
}));
195196
}
196197

packages/core/src/v3/schemas/schemas.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ export const EventRateLimitManifest = z.object({
183183

184184
export type EventRateLimitManifest = z.infer<typeof EventRateLimitManifest>;
185185

186+
export const EventOrderingManifest = z.object({
187+
/** Maximum number of ordering keys processed in parallel */
188+
concurrencyLimit: z.number().int().positive().optional(),
189+
});
190+
191+
export type EventOrderingManifest = z.infer<typeof EventOrderingManifest>;
192+
186193
export const EventManifest = z.object({
187194
/** Unique event identifier (e.g. "order.created") */
188195
id: z.string(),
@@ -194,6 +201,8 @@ export const EventManifest = z.object({
194201
schema: z.unknown().optional(),
195202
/** Rate limit configuration */
196203
rateLimit: EventRateLimitManifest.optional(),
204+
/** Ordering configuration — creates a dedicated queue with per-key serialization */
205+
ordering: EventOrderingManifest.optional(),
197206
});
198207

199208
export type EventManifest = z.infer<typeof EventManifest>;

0 commit comments

Comments
 (0)