Skip to content

Commit ef2ec86

Browse files
giovaborgognoclaude
andcommitted
feat(events): consumer-side rate limiting + metrics endpoint
Consumer-side rate limiting: - Add `rateLimit` JSON field to EventSubscription model - Add `consumerRateLimit` option to TaskOptionsWithEvent in SDK - Propagate through manifest pipeline → createBackgroundWorker - PublishEventService checks per-subscriber rate limits during fan-out - Rate-limited subscribers are skipped with warning log Metrics endpoint: - Add GET /api/v1/events/:eventId/metrics for backpressure monitoring - Returns subscriber health (active/disabled/filters/rate limits) - Returns DLQ depth (pending/retried/discarded counts) - Returns event-level rate limit configuration - Add GetEventMetricsResponseBody schema Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bd74a50 commit ef2ec86

File tree

10 files changed

+189
-2
lines changed

10 files changed

+189
-2
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
import { parseEventRateLimitConfig } from "~/v3/services/events/eventRateLimiter.server";
6+
7+
const ParamsSchema = z.object({
8+
eventId: z.string(),
9+
});
10+
11+
export const loader = createLoaderApiRoute(
12+
{
13+
params: ParamsSchema,
14+
corsStrategy: "all",
15+
authorization: {
16+
action: "read",
17+
resource: (_resource, params) => ({ tasks: params.eventId }),
18+
superScopes: ["read:runs", "read:all", "admin"],
19+
},
20+
findResource: async () => 1 as const,
21+
},
22+
async ({ params, authentication }) => {
23+
const environment = authentication.environment;
24+
25+
// Find event definition
26+
const eventDef = await prisma.eventDefinition.findFirst({
27+
where: {
28+
slug: params.eventId,
29+
projectId: environment.projectId,
30+
},
31+
orderBy: { createdAt: "desc" },
32+
});
33+
34+
if (!eventDef) {
35+
return json({ error: `Event "${params.eventId}" not found` }, { status: 404 });
36+
}
37+
38+
// Get subscribers
39+
const subscriptions = await prisma.eventSubscription.findMany({
40+
where: {
41+
eventDefinitionId: eventDef.id,
42+
environmentId: environment.id,
43+
},
44+
select: {
45+
taskSlug: true,
46+
enabled: true,
47+
rateLimit: true,
48+
filter: true,
49+
consumerGroup: true,
50+
},
51+
});
52+
53+
const activeCount = subscriptions.filter((s) => s.enabled).length;
54+
const disabledCount = subscriptions.length - activeCount;
55+
56+
// Get DLQ counts
57+
const [pendingCount, retriedCount, discardedCount] = await Promise.all([
58+
prisma.deadLetterEvent.count({
59+
where: {
60+
eventType: params.eventId,
61+
projectId: environment.projectId,
62+
environmentId: environment.id,
63+
status: "PENDING",
64+
},
65+
}),
66+
prisma.deadLetterEvent.count({
67+
where: {
68+
eventType: params.eventId,
69+
projectId: environment.projectId,
70+
environmentId: environment.id,
71+
status: "RETRIED",
72+
},
73+
}),
74+
prisma.deadLetterEvent.count({
75+
where: {
76+
eventType: params.eventId,
77+
projectId: environment.projectId,
78+
environmentId: environment.id,
79+
status: "DISCARDED",
80+
},
81+
}),
82+
]);
83+
84+
// Parse rate limit config
85+
const rateLimitConfig = parseEventRateLimitConfig(eventDef.rateLimit);
86+
87+
return json({
88+
eventType: params.eventId,
89+
subscribers: {
90+
total: subscriptions.length,
91+
active: activeCount,
92+
disabled: disabledCount,
93+
list: subscriptions.map((s) => ({
94+
taskSlug: s.taskSlug,
95+
enabled: s.enabled,
96+
hasRateLimit: !!s.rateLimit,
97+
hasFilter: !!s.filter,
98+
consumerGroup: s.consumerGroup,
99+
})),
100+
},
101+
dlq: {
102+
pending: pendingCount,
103+
retried: retriedCount,
104+
discarded: discardedCount,
105+
},
106+
rateLimit: rateLimitConfig
107+
? { limit: rateLimitConfig.limit, window: rateLimitConfig.window }
108+
: null,
109+
});
110+
}
111+
);

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,13 +460,15 @@ async function syncWorkerEvents(
460460
filter: (task.onEventFilter as any) ?? undefined,
461461
pattern: task.onEventPattern ?? undefined,
462462
consumerGroup: task.onEventConsumerGroup ?? undefined,
463+
rateLimit: (task.onEventConsumerRateLimit as any) ?? undefined,
463464
},
464465
update: {
465466
workerId: worker.id,
466467
enabled: true,
467468
filter: (task.onEventFilter as any) ?? undefined,
468469
pattern: task.onEventPattern ?? undefined,
469470
consumerGroup: task.onEventConsumerGroup ?? undefined,
471+
rateLimit: (task.onEventConsumerRateLimit as any) ?? undefined,
470472
},
471473
});
472474

apps/webapp/app/v3/services/events/publishEvent.server.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,34 @@ export class PublishEventService extends BaseService {
278278
);
279279
}
280280

281-
// 7. Fan out: trigger each matching subscribed task
281+
// 7. Check per-subscriber rate limits and fan out
282282
const runs: PublishEventResult["runs"] = [];
283283

284284
for (const subscription of subscriptionsToTrigger) {
285285
try {
286+
// Check per-subscriber rate limit (if configured)
287+
if (this._rateLimitChecker && subscription.rateLimit) {
288+
const subRateLimitConfig = parseEventRateLimitConfig(subscription.rateLimit);
289+
if (subRateLimitConfig) {
290+
const subRateLimitKey = `consumer:${subscription.taskSlug}:${eventSlug}`;
291+
const subRateLimitResult = await this._rateLimitChecker.check(
292+
subRateLimitKey,
293+
subRateLimitConfig
294+
);
295+
if (!subRateLimitResult.allowed) {
296+
logger.warn("Subscriber rate limit exceeded, skipping", {
297+
eventSlug,
298+
eventId,
299+
taskSlug: subscription.taskSlug,
300+
limit: subRateLimitResult.limit,
301+
retryAfter: subRateLimitResult.retryAfter,
302+
});
303+
span.setAttribute("consumerRateLimited", true);
304+
continue;
305+
}
306+
}
307+
}
308+
286309
// Derive per-consumer idempotency key if a global one was provided
287310
const consumerIdempotencyKey = options.idempotencyKey
288311
? `${options.idempotencyKey}:${subscription.taskSlug}`
@@ -396,7 +419,7 @@ export class PublishEventService extends BaseService {
396419
* Different eventIds distribute evenly across group members.
397420
*/
398421
private applyConsumerGroups(
399-
subscriptions: Array<{ id: string; consumerGroup: string | null; taskSlug: string }>,
422+
subscriptions: Array<{ id: string; consumerGroup: string | null; taskSlug: string; rateLimit: unknown }>,
400423
eventId?: string
401424
): typeof subscriptions {
402425
const ungrouped: typeof subscriptions = [];
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "public"."EventSubscription" ADD COLUMN "rateLimit" JSONB;

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,7 @@ model EventSubscription {
639639
filter Json? // EventFilter (Phase 2)
640640
pattern String? // Wildcard pattern (Phase 2)
641641
consumerGroup String? // Consumer group name (Phase 5)
642+
rateLimit Json? // Per-subscriber rate limit { limit, window }
642643
enabled Boolean @default(true)
643644
priority Int @default(0)
644645

packages/core/src/v3/schemas/api.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,3 +1859,38 @@ export const GetEventStatsResponseBody = z.object({
18591859
});
18601860

18611861
export type GetEventStatsResponseBody = z.infer<typeof GetEventStatsResponseBody>;
1862+
1863+
// ---- Event Metrics schemas ----
1864+
1865+
export const EventMetricsSubscriber = z.object({
1866+
taskSlug: z.string(),
1867+
enabled: z.boolean(),
1868+
hasRateLimit: z.boolean(),
1869+
hasFilter: z.boolean(),
1870+
consumerGroup: z.string().nullable(),
1871+
});
1872+
1873+
export type EventMetricsSubscriber = z.infer<typeof EventMetricsSubscriber>;
1874+
1875+
export const GetEventMetricsResponseBody = z.object({
1876+
eventType: z.string(),
1877+
subscribers: z.object({
1878+
total: z.number().int(),
1879+
active: z.number().int(),
1880+
disabled: z.number().int(),
1881+
list: z.array(EventMetricsSubscriber),
1882+
}),
1883+
dlq: z.object({
1884+
pending: z.number().int(),
1885+
retried: z.number().int(),
1886+
discarded: z.number().int(),
1887+
}),
1888+
rateLimit: z
1889+
.object({
1890+
limit: z.number().int(),
1891+
window: z.string(),
1892+
})
1893+
.nullable(),
1894+
});
1895+
1896+
export type GetEventMetricsResponseBody = z.infer<typeof GetEventMetricsResponseBody>;

packages/core/src/v3/schemas/resources.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ export const TaskResource = z.object({
1919
onEventFilter: z.unknown().optional(),
2020
onEventPattern: z.string().optional(),
2121
onEventConsumerGroup: z.string().optional(),
22+
onEventConsumerRateLimit: z.object({
23+
limit: z.number().int().positive(),
24+
window: z.string(),
25+
}).optional(),
2226
});
2327

2428
export type TaskResource = z.infer<typeof TaskResource>;

packages/core/src/v3/schemas/schemas.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ const taskMetadata = {
240240
onEventPattern: z.string().optional(),
241241
/** Consumer group name — within a group, only one task receives each event */
242242
onEventConsumerGroup: z.string().optional(),
243+
/** Per-subscriber rate limit — controls how fast this task receives events */
244+
onEventConsumerRateLimit: z.object({
245+
limit: z.number().int().positive(),
246+
window: z.string(),
247+
}).optional(),
243248
};
244249

245250
export const TaskMetadata = z.object(taskMetadata);

packages/core/src/v3/types/tasks.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,8 @@ export type TaskOptionsWithEvent<
424424
filter?: import("../schemas/eventFilter.js").EventFilter;
425425
/** Consumer group — within a group, only one task receives each event */
426426
consumerGroup?: string;
427+
/** Per-subscriber rate limit — controls how fast this task receives events */
428+
consumerRateLimit?: { limit: number; window: string };
427429
};
428430

429431
declare const __output: unique symbol;

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ export function createTask<
250250
const onEventFilter = "filter" in params && params.filter ? params.filter : undefined;
251251
const onEventPattern = eventSource && "pattern" in eventSource ? eventSource.pattern : undefined;
252252
const onEventConsumerGroup = "consumerGroup" in params && params.consumerGroup ? params.consumerGroup as string : undefined;
253+
const onEventConsumerRateLimit = "consumerRateLimit" in params && params.consumerRateLimit ? params.consumerRateLimit as { limit: number; window: string } : undefined;
253254

254255
resourceCatalog.registerTaskMetadata({
255256
id: params.id,
@@ -263,6 +264,7 @@ export function createTask<
263264
onEventFilter,
264265
onEventPattern,
265266
onEventConsumerGroup,
267+
onEventConsumerRateLimit,
266268
fns: {
267269
run: params.run,
268270
},

0 commit comments

Comments
 (0)