Skip to content

Commit 6d4434e

Browse files
giovaborgognoclaude
andcommitted
fix(events): phase 11 — MEDIUM audit fixes
- 11.1: Fix N+1 in DLQ retryAll — inline retry logic, share TriggerTaskService - 11.2: Add 512KB payload size check before fan-out (returns 413) - 11.3: Add try/catch with ServiceValidationError handling to events routes - 11.4: Add --delay, --tags, --idempotency-key, --ordering-key to CLI publish Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c176496 commit 6d4434e

File tree

6 files changed

+117
-28
lines changed

6 files changed

+117
-28
lines changed

apps/webapp/app/routes/api.v1.events.dlq.retry-all.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
4+
import { ServiceValidationError } from "~/v3/services/baseService.server";
45
import { DeadLetterManagementService } from "~/v3/services/events/deadLetterManagement.server";
56

67
const BodySchema = z
@@ -22,14 +23,24 @@ const { action, loader } = createActionApiRoute(
2223
async ({ body, authentication }) => {
2324
const service = new DeadLetterManagementService();
2425

25-
const result = await service.retryAll({
26-
projectId: authentication.environment.projectId,
27-
environmentId: authentication.environment.id,
28-
eventType: body?.eventType,
29-
environment: authentication.environment,
30-
});
26+
try {
27+
const result = await service.retryAll({
28+
projectId: authentication.environment.projectId,
29+
environmentId: authentication.environment.id,
30+
eventType: body?.eventType,
31+
environment: authentication.environment,
32+
});
3133

32-
return json(result, { status: 200 });
34+
return json(result, { status: 200 });
35+
} catch (error) {
36+
if (error instanceof ServiceValidationError) {
37+
return json({ error: error.message }, { status: error.status ?? 422 });
38+
}
39+
return json(
40+
{ error: error instanceof Error ? error.message : "Something went wrong" },
41+
{ status: 500 }
42+
);
43+
}
3344
}
3445
);
3546

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { json } from "@remix-run/server-runtime";
22
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
3+
import { ServiceValidationError } from "~/v3/services/baseService.server";
34
import { SchemaRegistryService } from "~/v3/services/events/schemaRegistry.server";
45

56
export const loader = createLoaderApiRoute(
@@ -15,23 +16,33 @@ export const loader = createLoaderApiRoute(
1516
async ({ authentication }) => {
1617
const service = new SchemaRegistryService();
1718

18-
const events = await service.listSchemas({
19-
projectId: authentication.environment.projectId,
20-
environmentId: authentication.environment.id,
21-
});
19+
try {
20+
const events = await service.listSchemas({
21+
projectId: authentication.environment.projectId,
22+
environmentId: authentication.environment.id,
23+
});
2224

23-
return json({
24-
data: events.map((e) => ({
25-
id: e.id,
26-
slug: e.slug,
27-
version: e.version,
28-
description: e.description,
29-
hasSchema: e.schema !== null,
30-
deprecatedAt: e.deprecatedAt,
31-
subscriberCount: e.subscriberCount,
32-
createdAt: e.createdAt,
33-
updatedAt: e.updatedAt,
34-
})),
35-
});
25+
return json({
26+
data: events.map((e) => ({
27+
id: e.id,
28+
slug: e.slug,
29+
version: e.version,
30+
description: e.description,
31+
hasSchema: e.schema !== null,
32+
deprecatedAt: e.deprecatedAt,
33+
subscriberCount: e.subscriberCount,
34+
createdAt: e.createdAt,
35+
updatedAt: e.updatedAt,
36+
})),
37+
});
38+
} catch (error) {
39+
if (error instanceof ServiceValidationError) {
40+
return json({ error: error.message }, { status: error.status ?? 422 });
41+
}
42+
return json(
43+
{ error: error instanceof Error ? error.message : "Something went wrong" },
44+
{ status: 500 }
45+
);
46+
}
3647
}
3748
);

apps/webapp/app/v3/services/events/deadLetterManagement.server.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,26 @@ export class DeadLetterManagementService extends BaseService {
136136
let retriedCount = 0;
137137
let failedCount = 0;
138138

139+
const triggerService = new TriggerTaskService();
140+
139141
for (const dle of pendingItems) {
140142
try {
141-
await this.retry(dle.id, params.environment);
143+
const body: TriggerTaskRequestBody = {
144+
payload: dle.payload,
145+
options: {
146+
idempotencyKey: `dlq-retry:${dle.id}`,
147+
},
148+
};
149+
150+
await triggerService.call(dle.taskSlug, params.environment, body, {
151+
idempotencyKey: `dlq-retry:${dle.id}`,
152+
});
153+
154+
await this._prisma.deadLetterEvent.update({
155+
where: { id: dle.id },
156+
data: { status: "RETRIED", processedAt: new Date() },
157+
});
158+
142159
retriedCount++;
143160
} catch {
144161
failedCount++;

apps/webapp/app/v3/services/events/publishEvent.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,16 @@ export class PublishEventService extends BaseService {
171171
}
172172
}
173173

174+
// 2b. Check payload size (512KB limit — larger payloads require object store)
175+
const payloadBytes = Buffer.byteLength(JSON.stringify(payload), "utf-8");
176+
const MAX_PAYLOAD_BYTES = 512 * 1024; // 512KB
177+
if (payloadBytes > MAX_PAYLOAD_BYTES) {
178+
throw new ServiceValidationError(
179+
`Payload size ${payloadBytes} bytes exceeds the 512KB limit. Use smaller payloads or configure the object store for large payloads.`,
180+
413
181+
);
182+
}
183+
174184
// 3. Find all active subscriptions: exact match + pattern-based
175185
const [exactSubscriptions, patternSubscriptions] = await Promise.all([
176186
// Exact subscriptions: tied to this specific EventDefinition

packages/cli-v3/src/apiClient.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,17 @@ export class CliApiClient {
563563
});
564564
}
565565

566-
async publishEvent(projectRef: string, eventId: string, payload: unknown) {
566+
async publishEvent(
567+
projectRef: string,
568+
eventId: string,
569+
payload: unknown,
570+
options?: {
571+
idempotencyKey?: string;
572+
delay?: string;
573+
tags?: string[];
574+
orderingKey?: string;
575+
}
576+
) {
567577
if (!this.accessToken) {
568578
throw new Error("publishEvent: No access token");
569579
}
@@ -579,7 +589,17 @@ export class CliApiClient {
579589
...this.getHeaders(),
580590
"x-trigger-project-ref": projectRef,
581591
},
582-
body: JSON.stringify({ payload }),
592+
body: JSON.stringify({
593+
payload,
594+
options: options
595+
? {
596+
idempotencyKey: options.idempotencyKey,
597+
delay: options.delay,
598+
tags: options.tags,
599+
orderingKey: options.orderingKey,
600+
}
601+
: undefined,
602+
}),
583603
}
584604
);
585605
}

packages/cli-v3/src/commands/events/publish.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ const EventsPublishOptions = CommonCommandOptions.extend({
2121
projectRef: z.string().optional(),
2222
envFile: z.string().optional(),
2323
payload: z.string(),
24+
delay: z.string().optional(),
25+
tags: z.string().optional(),
26+
idempotencyKey: z.string().optional(),
27+
orderingKey: z.string().optional(),
2428
});
2529

2630
type EventsPublishOptions = z.infer<typeof EventsPublishOptions>;
@@ -34,6 +38,10 @@ export function configureEventsPublishCommand(program: Command) {
3438
.option("-c, --config <config file>", "The name of the config file")
3539
.option("-p, --project-ref <project ref>", "The project ref")
3640
.option("--env-file <env file>", "Path to the .env file")
41+
.option("--delay <delay>", "Delay before execution (e.g. '30s', '5m', ISO date)")
42+
.option("--tags <tags>", "Comma-separated tags to attach")
43+
.option("--idempotency-key <key>", "Idempotency key for deduplication")
44+
.option("--ordering-key <key>", "Ordering key for sequential processing")
3745
).action(async (eventId: string, options) => {
3846
await handleTelemetry(async () => {
3947
await printInitialBanner(false, options.profile);
@@ -96,7 +104,19 @@ async function _eventsPublishCommand(options: EventsPublishCommandInput) {
96104
loadingSpinner.start("Publishing event...");
97105

98106
const apiClient = new CliApiClient(authentication.auth.apiUrl, authentication.auth.accessToken);
99-
const result = await apiClient.publishEvent(resolvedConfig.project, options.eventId, payload);
107+
const publishOptions = {
108+
idempotencyKey: options.idempotencyKey,
109+
delay: options.delay,
110+
tags: options.tags ? options.tags.split(",").map((t: string) => t.trim()) : undefined,
111+
orderingKey: options.orderingKey,
112+
};
113+
const hasOptions = Object.values(publishOptions).some((v) => v !== undefined);
114+
const result = await apiClient.publishEvent(
115+
resolvedConfig.project,
116+
options.eventId,
117+
payload,
118+
hasOptions ? publishOptions : undefined
119+
);
100120

101121
if (!result.success) {
102122
loadingSpinner.stop("Failed to publish event");

0 commit comments

Comments
 (0)