Skip to content

Commit 63a4bd3

Browse files
giovaborgognoclaude
andcommitted
feat(events): phase 4.4 — SDK DLQ config per event
Add `dlq` option to `event()` allowing per-event DLQ configuration. When `dlq.enabled` is false, failed event-triggered runs are silently discarded instead of being stored in the dead letter queue. - Add EventDLQConfig type to SDK and EventDLQManifest schema to core - Add dlqConfig JSON column to EventDefinition model - Wire config through deploy (createBackgroundWorker) and resource catalog - DeadLetterService checks config before creating DLQ entries Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 86dc70f commit 63a4bd3

File tree

8 files changed

+72
-1
lines changed

8 files changed

+72
-1
lines changed

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,12 +353,14 @@ async function syncWorkerEvents(
353353
description: event.description,
354354
schema: event.schema as any ?? undefined,
355355
rateLimit: event.rateLimit as any ?? undefined,
356+
dlqConfig: event.dlq as any ?? undefined,
356357
projectId: worker.projectId,
357358
},
358359
update: {
359360
description: event.description,
360361
schema: event.schema as any ?? undefined,
361362
rateLimit: event.rateLimit as any ?? undefined,
363+
dlqConfig: event.dlq as any ?? undefined,
362364
},
363365
});
364366

apps/webapp/app/v3/services/events/deadLetterService.server.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@ export class DeadLetterService extends BaseService {
2121
return; // Not an event-triggered run
2222
}
2323

24+
// Check if DLQ is disabled for this event type
25+
const dlqEnabled = await this.isDLQEnabled(eventContext.eventType, run.projectId);
26+
if (!dlqEnabled) {
27+
logger.debug("DLQ disabled for event type, skipping", {
28+
runId: run.id,
29+
eventType: eventContext.eventType,
30+
});
31+
return;
32+
}
33+
2434
try {
2535
await this._prisma.deadLetterEvent.create({
2636
data: {
@@ -52,6 +62,24 @@ export class DeadLetterService extends BaseService {
5262
}
5363
}
5464

65+
private async isDLQEnabled(eventType: string, projectId: string): Promise<boolean> {
66+
try {
67+
const eventDef = await this._prisma.eventDefinition.findFirst({
68+
where: { slug: eventType, projectId },
69+
select: { dlqConfig: true },
70+
});
71+
72+
if (!eventDef?.dlqConfig) {
73+
return true; // Default: DLQ enabled
74+
}
75+
76+
const config = eventDef.dlqConfig as Record<string, unknown>;
77+
return config.enabled !== false;
78+
} catch {
79+
return true; // On error, default to enabled
80+
}
81+
}
82+
5583
private extractEventContext(run: TaskRun): EventContext | null {
5684
if (!run.metadata) return null;
5785

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "public"."EventDefinition" ADD COLUMN "dlqConfig" JSONB;

internal-packages/database/prisma/schema.prisma

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,9 @@ model EventDefinition {
604604
// Rate limiting (Phase 7) — JSON config e.g. { "limit": 100, "window": "1m" }
605605
rateLimit Json?
606606
607+
// DLQ configuration (Phase 4.4) — JSON config e.g. { "enabled": false }
608+
dlqConfig Json?
609+
607610
project Project @relation(fields: [projectId], references: [id], onDelete: Cascade, onUpdate: Cascade)
608611
projectId String
609612

packages/core/src/v3/resource-catalog/catalog.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ export interface EventMetadata {
1111
rateLimit?: { limit: number; window: string };
1212
/** Ordering configuration — enables per-key serialization with global concurrency limit */
1313
ordering?: { concurrencyLimit?: number };
14+
/** Dead letter queue configuration */
15+
dlq?: { enabled?: boolean };
1416
}
1517

1618
export interface ResourceCatalog {

packages/core/src/v3/resource-catalog/standardResourceCatalog.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ export class StandardResourceCatalog implements ResourceCatalog {
192192
description: event.description,
193193
rateLimit: event.rateLimit,
194194
ordering: event.ordering,
195+
dlq: event.dlq,
195196
}));
196197
}
197198

packages/core/src/v3/schemas/schemas.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ export const EventOrderingManifest = z.object({
190190

191191
export type EventOrderingManifest = z.infer<typeof EventOrderingManifest>;
192192

193+
export const EventDLQManifest = z.object({
194+
/** Whether to store failed event-triggered runs in the DLQ (default: true) */
195+
enabled: z.boolean().optional(),
196+
});
197+
198+
export type EventDLQManifest = z.infer<typeof EventDLQManifest>;
199+
193200
export const EventManifest = z.object({
194201
/** Unique event identifier (e.g. "order.created") */
195202
id: z.string(),
@@ -203,6 +210,8 @@ export const EventManifest = z.object({
203210
rateLimit: EventRateLimitManifest.optional(),
204211
/** Ordering configuration — creates a dedicated queue with per-key serialization */
205212
ordering: EventOrderingManifest.optional(),
213+
/** Dead letter queue configuration */
214+
dlq: EventDLQManifest.optional(),
206215
});
207216

208217
export type EventManifest = z.infer<typeof EventManifest>;

packages/trigger-sdk/src/v3/events.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ export interface EventOptions<TId extends string, TSchema extends Schema | undef
5151
* ```
5252
*/
5353
ordering?: EventOrdering;
54+
/**
55+
* Dead letter queue configuration. Controls what happens when event-triggered runs fail.
56+
* By default, all failed runs are stored in the DLQ for inspection and retry.
57+
*
58+
* @example
59+
* ```ts
60+
* event({
61+
* id: "order.created",
62+
* dlq: { enabled: false }, // Don't store failures in DLQ
63+
* });
64+
* ```
65+
*/
66+
dlq?: EventDLQConfig;
5467
}
5568

5669
/** Ordering configuration for an event */
@@ -59,6 +72,16 @@ export interface EventOrdering {
5972
concurrencyLimit?: number;
6073
}
6174

75+
/** Dead letter queue configuration for an event */
76+
export interface EventDLQConfig {
77+
/**
78+
* Whether to store failed event-triggered runs in the DLQ.
79+
* When false, failed runs are discarded silently.
80+
* @default true
81+
*/
82+
enabled?: boolean;
83+
}
84+
6285
/** Options for publishing an event */
6386
export interface PublishEventOptions {
6487
/** Idempotency key to prevent duplicate publications */
@@ -189,7 +212,7 @@ export function createEvent<TId extends string>(
189212
export function createEvent<TId extends string, TSchema extends Schema | undefined = undefined>(
190213
options: EventOptions<TId, TSchema>
191214
): EventDefinition<TId, any> {
192-
const { id, schema, description, version = "1.0", rateLimit, ordering } = options;
215+
const { id, schema, description, version = "1.0", rateLimit, ordering, dlq } = options;
193216

194217
// Build the parse function if a schema is provided
195218
let parseFn: SchemaParseFn<any> | undefined;
@@ -318,6 +341,7 @@ export function createEvent<TId extends string, TSchema extends Schema | undefin
318341
rawSchema: schema,
319342
rateLimit,
320343
ordering,
344+
dlq,
321345
});
322346

323347
// Mark as event for runtime detection

0 commit comments

Comments
 (0)