Skip to content

Commit c176496

Browse files
giovaborgognoclaude
andcommitted
fix(events): phase 10 — CRITICAL + HIGH audit fixes
- 10.1: Fix expireTtlRuns Lua global concurrency slot leak (CRITICAL) Add SREM for globalCurrentConcurrency in TTL expiration script - 10.2: Fix clearMessageFromConcurrencySets bare queue name (HIGH) Add queueGlobalCurrentConcurrencyKey(env, queue) to build correct key - 10.3: Add .max(100) to batch publish items array (HIGH) - 10.4: Fix publishAndWait schema — move parentRunId to top-level (HIGH) - 10.5: ClickHouse interval already safe (whitelist map, not interpolation) - 10.6: Add @@index([projectId, environmentId, enabled]) to EventSubscription - 10.7: Fix batch publish partial failure — per-item error handling with 207 Also tightens Zod schemas: z.any() → z.unknown(), idempotencyKey .max(256), metadata → z.record(z.unknown()) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 193e26e commit c176496

File tree

9 files changed

+64
-63
lines changed

9 files changed

+64
-63
lines changed

apps/webapp/app/routes/api.v1.events.$eventId.batchPublish.ts

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@ const { action, loader } = createActionApiRoute(
3434
eventPublishRateLimitChecker
3535
);
3636

37-
try {
38-
const results: PublishEventResult[] = [];
37+
const results: Array<
38+
| { ok: true; eventId: string; runs: PublishEventResult["runs"] }
39+
| { ok: false; error: string }
40+
> = [];
3941

40-
for (const item of body.items) {
42+
for (const item of body.items) {
43+
try {
4144
const result = await service.call(
4245
params.eventId,
4346
authentication.environment,
@@ -52,31 +55,23 @@ const { action, loader } = createActionApiRoute(
5255
}
5356
);
5457

55-
results.push(result);
58+
results.push({ ok: true, eventId: result.eventId, runs: result.runs });
59+
} catch (error) {
60+
if (error instanceof EventPublishRateLimitError) {
61+
results.push({ ok: false, error: error.message });
62+
} else if (error instanceof ServiceValidationError) {
63+
results.push({ ok: false, error: error.message });
64+
} else {
65+
results.push({
66+
ok: false,
67+
error: error instanceof Error ? error.message : "Unknown error",
68+
});
69+
}
5670
}
57-
58-
return json({ results }, { status: 200 });
59-
} catch (error) {
60-
if (error instanceof EventPublishRateLimitError) {
61-
return json(
62-
{ error: error.message },
63-
{
64-
status: 429,
65-
headers: {
66-
"x-ratelimit-limit": String(error.limit),
67-
"x-ratelimit-remaining": String(error.remaining),
68-
"retry-after": String(Math.ceil(error.retryAfterMs / 1000)),
69-
},
70-
}
71-
);
72-
} else if (error instanceof ServiceValidationError) {
73-
return json({ error: error.message }, { status: error.status ?? 422 });
74-
} else if (error instanceof Error) {
75-
return json({ error: error.message }, { status: 500 });
76-
}
77-
78-
return json({ error: "Something went wrong" }, { status: 500 });
7971
}
72+
73+
const hasErrors = results.some((r) => !r.ok);
74+
return json({ results }, { status: hasErrors ? 207 : 200 });
8075
}
8176
);
8277

apps/webapp/app/routes/api.v1.events.$eventId.publishAndWait.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,7 @@ const { action, loader } = createActionApiRoute(
2626
},
2727
},
2828
async ({ body, params, authentication }) => {
29-
const parentRunId = body.options?.parentRunId;
30-
if (!parentRunId) {
31-
return json(
32-
{ error: "parentRunId is required for publishAndWait" },
33-
{ status: 400 }
34-
);
35-
}
29+
const parentRunId = body.parentRunId;
3630

3731
const service = new PublishEventService(
3832
undefined,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- CreateIndex
2+
CREATE INDEX CONCURRENTLY "EventSubscription_projectId_environmentId_enabled_idx" ON "public"."EventSubscription"("projectId", "environmentId", "enabled");

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ model EventSubscription {
644644
645645
@@unique([eventDefinitionId, taskSlug, environmentId])
646646
@@index([eventDefinitionId, environmentId, enabled])
647-
@@index([projectId, environmentId])
647+
@@index([projectId, environmentId, enabled])
648648
}
649649

650650
enum DeadLetterStatus {

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2106,7 +2106,7 @@ export class RunQueue {
21062106
envCurrentConcurrencyKey,
21072107
queueCurrentDequeuedKey,
21082108
envCurrentDequeuedKey,
2109-
this.keys.queueGlobalCurrentConcurrencyKeyFromQueue(queue),
2109+
this.keys.queueGlobalCurrentConcurrencyKey(env, queue),
21102110
messageId
21112111
);
21122112
}
@@ -2698,6 +2698,11 @@ for i, member in ipairs(expiredMembers) do
26982698
redis.call('SREM', concurrencyKey, runId)
26992699
redis.call('SREM', dequeuedKey, runId)
27002700
2701+
-- Remove from global concurrency set (strip :ck:* suffix to get base queue key)
2702+
local globalQueueKey = string.gsub(rawQueueKey, ":ck:.+$", "")
2703+
local globalCurrentConcurrencyKey = keyPrefix .. globalQueueKey .. ":globalCurrentConcurrency"
2704+
redis.call('SREM', globalCurrentConcurrencyKey, runId)
2705+
27012706
-- Env concurrency (derive from rawQueueKey; must match RunQueueKeyProducer: org + proj + env)
27022707
-- rawQueueKey format: {org:X}:proj:Y:env:Z:queue:Q[:ck:C]
27032708
local projMatch = string.match(rawQueueKey, ":proj:([^:]+):env:")

internal-packages/run-engine/src/run-queue/keyProducer.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ export class RunQueueFullKeyProducer implements RunQueueKeyProducer {
154154
return [this.queueKey(env, queue), constants.GLOBAL_CONCURRENCY_LIMIT_PART].join(":");
155155
}
156156

157+
queueGlobalCurrentConcurrencyKey(env: RunQueueKeyProducerEnvironment, queue: string) {
158+
return [this.queueKey(env, queue), constants.GLOBAL_CURRENT_CONCURRENCY_PART].join(":");
159+
}
160+
157161
queueCurrentConcurrencyKeyFromQueue(queue: string) {
158162
return `${queue}:${constants.CURRENT_CONCURRENCY_PART}`;
159163
}

internal-packages/run-engine/src/run-queue/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ export interface RunQueueKeyProducer {
7878
queueGlobalConcurrencyLimitKeyFromQueue(queue: string): string;
7979
queueGlobalCurrentConcurrencyKeyFromQueue(queue: string): string;
8080
queueGlobalConcurrencyLimitKey(env: RunQueueKeyProducerEnvironment, queue: string): string;
81+
queueGlobalCurrentConcurrencyKey(env: RunQueueKeyProducerEnvironment, queue: string): string;
8182
queueCurrentConcurrencyKeyFromQueue(queue: string): string;
8283
queueCurrentConcurrencyKey(
8384
env: RunQueueKeyProducerEnvironment,

packages/core/src/v3/schemas/api.ts

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,14 +1602,14 @@ export type AppendToStreamResponseBody = z.infer<typeof AppendToStreamResponseBo
16021602
// ---- Event publish schemas ----
16031603

16041604
export const PublishEventRequestBody = z.object({
1605-
payload: z.any(),
1605+
payload: z.unknown(),
16061606
options: z
16071607
.object({
1608-
idempotencyKey: z.string().optional(),
1608+
idempotencyKey: z.string().max(256).optional(),
16091609
delay: z.string().or(z.coerce.date()).optional(),
16101610
tags: RunTags.optional(),
1611-
metadata: z.any().optional(),
1612-
context: z.any().optional(),
1611+
metadata: z.record(z.unknown()).optional(),
1612+
context: z.unknown().optional(),
16131613
orderingKey: z.string().optional(),
16141614
})
16151615
.optional(),
@@ -1630,21 +1630,23 @@ export const PublishEventResponseBody = z.object({
16301630
export type PublishEventResponseBody = z.infer<typeof PublishEventResponseBody>;
16311631

16321632
export const BatchPublishEventRequestBody = z.object({
1633-
items: z.array(
1634-
z.object({
1635-
payload: z.any(),
1636-
options: z
1637-
.object({
1638-
idempotencyKey: z.string().optional(),
1639-
delay: z.string().or(z.coerce.date()).optional(),
1640-
tags: RunTags.optional(),
1641-
metadata: z.any().optional(),
1642-
context: z.any().optional(),
1643-
orderingKey: z.string().optional(),
1644-
})
1645-
.optional(),
1646-
})
1647-
),
1633+
items: z
1634+
.array(
1635+
z.object({
1636+
payload: z.unknown(),
1637+
options: z
1638+
.object({
1639+
idempotencyKey: z.string().max(256).optional(),
1640+
delay: z.string().or(z.coerce.date()).optional(),
1641+
tags: RunTags.optional(),
1642+
metadata: z.record(z.unknown()).optional(),
1643+
context: z.unknown().optional(),
1644+
orderingKey: z.string().optional(),
1645+
})
1646+
.optional(),
1647+
})
1648+
)
1649+
.max(100),
16481650
});
16491651

16501652
export type BatchPublishEventRequestBody = z.infer<typeof BatchPublishEventRequestBody>;
@@ -1656,16 +1658,16 @@ export const BatchPublishEventResponseBody = z.object({
16561658
export type BatchPublishEventResponseBody = z.infer<typeof BatchPublishEventResponseBody>;
16571659

16581660
export const PublishAndWaitEventRequestBody = z.object({
1659-
payload: z.any(),
1661+
payload: z.unknown(),
1662+
parentRunId: z.string(),
16601663
options: z
16611664
.object({
1662-
idempotencyKey: z.string().optional(),
1665+
idempotencyKey: z.string().max(256).optional(),
16631666
delay: z.string().or(z.coerce.date()).optional(),
16641667
tags: RunTags.optional(),
1665-
metadata: z.any().optional(),
1666-
context: z.any().optional(),
1668+
metadata: z.record(z.unknown()).optional(),
1669+
context: z.unknown().optional(),
16671670
orderingKey: z.string().optional(),
1668-
parentRunId: z.string(),
16691671
})
16701672
.optional(),
16711673
});

packages/trigger-sdk/src/v3/events.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,18 +239,16 @@ export function createEvent<TId extends string, TSchema extends Schema | undefin
239239

240240
const response = await apiClient.publishAndWaitEvent(id, {
241241
payload: validatedPayload,
242+
parentRunId: ctx.run.id,
242243
options: options
243244
? {
244245
idempotencyKey: options.idempotencyKey,
245246
delay: options.delay instanceof Date ? options.delay.toISOString() : options.delay,
246247
tags: options.tags,
247248
metadata: options.metadata,
248249
orderingKey: options.orderingKey,
249-
parentRunId: ctx.run.id,
250250
}
251-
: {
252-
parentRunId: ctx.run.id,
253-
},
251+
: undefined,
254252
});
255253

256254
if (response.runs.length === 0) {

0 commit comments

Comments
 (0)