Skip to content

Commit c7c9119

Browse files
giovaborgognoclaude
andcommitted
feat(events): phase 8.7 — event-system reference project
Demo project showcasing all event system features: - events.ts: event definitions with schemas, rate limits - basic-subscribers.ts: fan-out to multiple tasks - filtered-subscribers.ts: content-based filtering - pattern-subscribers.ts: wildcard patterns (*, #) - publish-and-wait.ts: scatter-gather orchestration - consumer-groups.ts: load-balanced event handling - ordering.ts: sequential processing per entity Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 9bd85f6 commit c7c9119

File tree

12 files changed

+328
-0
lines changed

12 files changed

+328
-0
lines changed

references/event-system/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
node_modules
2+
.trigger
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "references-event-system",
3+
"private": true,
4+
"type": "module",
5+
"scripts": {
6+
"dev": "trigger dev",
7+
"deploy": "trigger deploy"
8+
},
9+
"devDependencies": {
10+
"trigger.dev": "workspace:*"
11+
},
12+
"dependencies": {
13+
"@trigger.dev/build": "workspace:*",
14+
"@trigger.dev/sdk": "workspace:*",
15+
"zod": "3.25.76"
16+
}
17+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export {};
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { task, logger } from "@trigger.dev/sdk";
2+
import { orderCreated, orderShipped } from "./events";
3+
4+
// ---- Basic Fan-out: Multiple tasks subscribe to the same event ----
5+
6+
/** Send a confirmation email when an order is created */
7+
export const sendConfirmationEmail = task({
8+
id: "send-confirmation-email",
9+
on: orderCreated,
10+
run: async (payload) => {
11+
logger.info("Sending confirmation email", {
12+
orderId: payload.orderId,
13+
customerId: payload.customerId,
14+
});
15+
16+
// Simulate email sending
17+
return { sent: true, to: payload.customerId };
18+
},
19+
});
20+
21+
/** Update inventory when an order is created */
22+
export const updateInventory = task({
23+
id: "update-inventory",
24+
on: orderCreated,
25+
run: async (payload) => {
26+
logger.info("Updating inventory", {
27+
orderId: payload.orderId,
28+
itemCount: payload.items.length,
29+
});
30+
31+
for (const item of payload.items) {
32+
logger.info(`Adjusting stock: ${item.sku} -${item.qty}`);
33+
}
34+
35+
return { adjusted: payload.items.length };
36+
},
37+
});
38+
39+
/** Notify customer when order is shipped */
40+
export const notifyShipped = task({
41+
id: "notify-shipped",
42+
on: orderShipped,
43+
run: async (payload) => {
44+
logger.info("Order shipped notification", {
45+
orderId: payload.orderId,
46+
tracking: payload.trackingNumber,
47+
});
48+
49+
return { notified: true };
50+
},
51+
});
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { task, logger } from "@trigger.dev/sdk";
2+
import { userActivity } from "./events";
3+
4+
// ---- Consumer Groups: Load-balanced event handling ----
5+
// Within a consumer group, only ONE task receives each event.
6+
7+
export const activityProcessorA = task({
8+
id: "activity-processor-a",
9+
on: userActivity,
10+
consumerGroup: "activity-processors",
11+
run: async (payload) => {
12+
logger.info("Processor A handling activity", {
13+
userId: payload.userId,
14+
action: payload.action,
15+
});
16+
return { processor: "A", userId: payload.userId };
17+
},
18+
});
19+
20+
export const activityProcessorB = task({
21+
id: "activity-processor-b",
22+
on: userActivity,
23+
consumerGroup: "activity-processors",
24+
run: async (payload) => {
25+
logger.info("Processor B handling activity", {
26+
userId: payload.userId,
27+
action: payload.action,
28+
});
29+
return { processor: "B", userId: payload.userId };
30+
},
31+
});
32+
33+
// This task is NOT in the consumer group — it receives ALL events
34+
export const activityAnalytics = task({
35+
id: "activity-analytics",
36+
on: userActivity,
37+
run: async (payload) => {
38+
logger.info("Analytics: recording activity", {
39+
userId: payload.userId,
40+
action: payload.action,
41+
});
42+
return { recorded: true };
43+
},
44+
});
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { event } from "@trigger.dev/sdk";
2+
import { z } from "zod";
3+
4+
// ---- Event Definitions ----
5+
6+
/** Published when an order is placed */
7+
export const orderCreated = event({
8+
id: "order.created",
9+
schema: z.object({
10+
orderId: z.string(),
11+
amount: z.number(),
12+
customerId: z.string(),
13+
items: z.array(z.object({ sku: z.string(), qty: z.number() })),
14+
}),
15+
});
16+
17+
/** Published when an order is shipped */
18+
export const orderShipped = event({
19+
id: "order.shipped",
20+
schema: z.object({
21+
orderId: z.string(),
22+
trackingNumber: z.string(),
23+
}),
24+
});
25+
26+
/** Published for any user action (rate-limited) */
27+
export const userActivity = event({
28+
id: "user.activity",
29+
schema: z.object({
30+
userId: z.string(),
31+
action: z.string(),
32+
timestamp: z.string(),
33+
}),
34+
rateLimit: {
35+
limit: 100,
36+
window: "1m",
37+
},
38+
});
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { task, logger } from "@trigger.dev/sdk";
2+
import { orderCreated } from "./events";
3+
4+
// ---- Content-based Filtering: Only receive events that match ----
5+
6+
/** Only handles high-value orders (amount >= 1000) */
7+
export const highValueOrderHandler = task({
8+
id: "high-value-order",
9+
on: orderCreated,
10+
filter: {
11+
amount: [{ $gte: 1000 }],
12+
},
13+
run: async (payload) => {
14+
logger.info("High-value order detected!", {
15+
orderId: payload.orderId,
16+
amount: payload.amount,
17+
});
18+
19+
// Alert VIP team, apply special handling, etc.
20+
return { flagged: true, amount: payload.amount };
21+
},
22+
});
23+
24+
/** Only handles orders from a specific customer */
25+
export const vipCustomerHandler = task({
26+
id: "vip-customer-handler",
27+
on: orderCreated,
28+
filter: {
29+
customerId: ["customer-vip-001", "customer-vip-002"],
30+
},
31+
run: async (payload) => {
32+
logger.info("VIP customer order", { customerId: payload.customerId });
33+
return { vip: true };
34+
},
35+
});
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { task, logger } from "@trigger.dev/sdk";
2+
import { orderCreated } from "./events";
3+
4+
// ---- Ordering Keys: Sequential processing per entity ----
5+
6+
/**
7+
* This publisher uses ordering keys to ensure events for the same customer
8+
* are processed sequentially (no concurrent runs per customer).
9+
*/
10+
export const placeOrder = task({
11+
id: "place-order",
12+
run: async (payload: {
13+
orderId: string;
14+
amount: number;
15+
customerId: string;
16+
}) => {
17+
logger.info("Publishing order with ordering key", {
18+
orderId: payload.orderId,
19+
customerId: payload.customerId,
20+
});
21+
22+
const result = await orderCreated.publish(
23+
{
24+
orderId: payload.orderId,
25+
amount: payload.amount,
26+
customerId: payload.customerId,
27+
items: [{ sku: "ITEM-001", qty: 1 }],
28+
},
29+
{
30+
// Events for the same customer are processed one at a time
31+
orderingKey: payload.customerId,
32+
// Prevent duplicate publishes
33+
idempotencyKey: `order-${payload.orderId}`,
34+
}
35+
);
36+
37+
return { eventId: result.id, runs: result.runs.length };
38+
},
39+
});
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { events, task, logger } from "@trigger.dev/sdk";
2+
3+
// ---- Wildcard Pattern Subscriptions ----
4+
5+
/** Catches all order.* events (order.created, order.shipped, etc.) */
6+
export const orderAuditLog = task({
7+
id: "order-audit-log",
8+
on: events.match("order.*"),
9+
run: async (payload) => {
10+
// payload is `unknown` for pattern subscriptions
11+
logger.info("Order event received", { payload });
12+
return { logged: true };
13+
},
14+
});
15+
16+
/** Catches all user.# events (user.activity, user.profile.updated, etc.) */
17+
export const userEventTracker = task({
18+
id: "user-event-tracker",
19+
on: events.match("user.#"),
20+
run: async (payload) => {
21+
logger.info("User event tracked", { payload });
22+
return { tracked: true };
23+
},
24+
});
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { task, logger } from "@trigger.dev/sdk";
2+
import { orderCreated, orderShipped } from "./events";
3+
4+
// ---- Publish-and-Wait: Fan-out then collect results ----
5+
6+
/**
7+
* Orchestrator task that publishes an event and waits for all subscribers
8+
* to finish before proceeding (scatter-gather pattern).
9+
*/
10+
export const processOrder = task({
11+
id: "process-order",
12+
run: async (payload: { orderId: string; amount: number; customerId: string }) => {
13+
logger.info("Starting order processing", { orderId: payload.orderId });
14+
15+
// Publish and wait for ALL subscribers (sendConfirmationEmail, updateInventory, etc.)
16+
const result = await orderCreated.publishAndWait({
17+
orderId: payload.orderId,
18+
amount: payload.amount,
19+
customerId: payload.customerId,
20+
items: [{ sku: "WIDGET-001", qty: 2 }],
21+
});
22+
23+
logger.info("All subscribers completed", {
24+
eventId: result.id,
25+
subscriberCount: Object.keys(result.results).length,
26+
});
27+
28+
// Check results from each subscriber
29+
for (const [taskSlug, runResult] of Object.entries(result.results)) {
30+
if (runResult.ok) {
31+
logger.info(`${taskSlug}: success`, { output: runResult.output });
32+
} else {
33+
logger.error(`${taskSlug}: failed`, { error: runResult.error });
34+
}
35+
}
36+
37+
// Continue with next step: publish shipped event
38+
await orderShipped.publish({
39+
orderId: payload.orderId,
40+
trackingNumber: `TRK-${Date.now()}`,
41+
});
42+
43+
return { orderId: payload.orderId, status: "completed" };
44+
},
45+
});

0 commit comments

Comments
 (0)