Skip to content

Commit 2fd0bcc

Browse files
Merge pull request #60 from dillonstreator/wakeup-signals
Wakeup signals
2 parents 6cd66ee + 2696a98 commit 2fd0bcc

9 files changed

Lines changed: 928 additions & 152 deletions

File tree

README.md

Lines changed: 265 additions & 43 deletions
Large diffs are not rendered by default.

architecture.mmd

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
graph TB
2+
subgraph Application["Your Application"]
3+
AppTx["BEGIN TRANSACTION"]
4+
AppData["1. Insert/Update business data<br/>(users, orders, etc.)"]
5+
AppEvent["2. Insert event record"]
6+
AppCommit["COMMIT TRANSACTION"]
7+
8+
AppTx --> AppData
9+
AppData --> AppEvent
10+
AppEvent --> AppCommit
11+
end
12+
13+
subgraph Database["Database"]
14+
EventsTable[("Events Table<br/>id, type, data,<br/>processed_at, errors,<br/>backoff_until")]
15+
16+
AppCommit -->|"Both saved atomically ✅"| EventsTable
17+
end
18+
19+
subgraph WakeupSignals["Wakeup Signals (Optional)"]
20+
direction TB
21+
PGTrigger["PostgreSQL Trigger<br/>(auto-created)"]
22+
PGNotify["PostgreSQL NOTIFY<br/>(LISTEN connection)"]
23+
MongoStream["MongoDB Change Streams<br/>(watches for inserts)"]
24+
25+
EventsTable -->|"INSERT triggers"| PGTrigger
26+
PGTrigger -->|"sends NOTIFY"| PGNotify
27+
EventsTable -->|"INSERT operations"| MongoStream
28+
end
29+
30+
subgraph Polling["Polling Component<br/>(Decoupled from Processing)"]
31+
direction TB
32+
WakeupListener["Wakeup Signal Listener<br/>(on 'wakeup' event)"]
33+
ThrottledPoll["Throttled Poll Function<br/>(leading + trailing edge)<br/>Prevents excessive polling"]
34+
FallbackLoop["Fallback Polling Loop<br/>(runs periodically)"]
35+
TimeoutCheck["Check: time since last wakeup<br/>> wakeupTimeoutMs?"]
36+
37+
PGNotify -->|"emits 'wakeup'"| WakeupListener
38+
MongoStream -->|"emits 'wakeup'"| WakeupListener
39+
WakeupListener -->|"triggers"| ThrottledPoll
40+
FallbackLoop -->|"waits pollingIntervalMs"| TimeoutCheck
41+
TimeoutCheck -->|"yes: no recent wakeup"| ThrottledPoll
42+
TimeoutCheck -->|"no: recent wakeup"| FallbackLoop
43+
end
44+
45+
subgraph Processing["Processing Queue & Processor"]
46+
direction TB
47+
PollQuery["getEventsToProcess()<br/>SELECT unprocessed events<br/>(FOR UPDATE SKIP LOCKED)"]
48+
EventQueue["Processing Queue<br/>(PQueue with concurrency limit)"]
49+
ProcessEvent["processEvent()"]
50+
LockEvent["Lock event in transaction<br/>(getEventByIdForUpdateSkipLocked)"]
51+
ExecuteHandlers["Execute handlers concurrently<br/>(pLimit for handler concurrency)<br/>send email, webhook, etc."]
52+
UpdateEvent["updateEvent()<br/>UPDATE with results & processed_at"]
53+
ErrorHandling["On failure:<br/>increment errors,<br/>set backoff_until"]
54+
55+
ThrottledPoll -->|"calls"| PollQuery
56+
PollQuery -->|"returns event IDs"| EventQueue
57+
EventQueue -->|"queues for processing"| ProcessEvent
58+
ProcessEvent --> LockEvent
59+
LockEvent -->|"event locked"| ExecuteHandlers
60+
ExecuteHandlers -->|"all handlers complete"| UpdateEvent
61+
ExecuteHandlers -->|"handler fails"| ErrorHandling
62+
ErrorHandling --> UpdateEvent
63+
UpdateEvent -->|"commits transaction"| EventsTable
64+
end
65+
66+
EventsTable -->|"queries for<br/>unprocessed events"| PollQuery
67+
68+
style Application fill:#e1f5ff
69+
style Database fill:#fff4e1
70+
style WakeupSignals fill:#f0f4ff
71+
style Polling fill:#e8f5e9
72+
style Processing fill:#fce4ec

examples/pg/processor.ts

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,40 @@ import { randomUUID } from "node:crypto";
33
import {
44
ErrorUnprocessableEventHandler,
55
EventProcessor,
6+
WakeupEmitter,
67
} from "../../src/index.js";
7-
import { createProcessorClient } from "../../src/pg/client.js";
8+
import {
9+
createProcessorClient,
10+
createWakeupEmitter,
11+
} from "../../src/pg/client.js";
812
import { migrate, type EventType, eventTypes } from "./server.js";
913
import dotenv from "dotenv";
1014
import { sleep } from "../../src/sleep.js";
1115
dotenv.config();
1216

1317
let processor: EventProcessor<EventType> | undefined = undefined;
18+
let wakeupEmitter: WakeupEmitter | undefined = undefined;
1419

1520
(async () => {
16-
const client = new pg.Client({
21+
const clientConfig: pg.ClientConfig = {
1722
user: process.env.POSTGRES_USER,
1823
password: process.env.POSTGRES_PASSWORD,
1924
database: process.env.POSTGRES_DB,
2025
port: parseInt(process.env.POSTGRES_PORT || "5434"),
21-
});
26+
};
27+
const client = new pg.Client(clientConfig);
2228
await client.connect();
2329
await migrate(client);
2430

31+
wakeupEmitter = await createWakeupEmitter({
32+
listenClientConfig: clientConfig,
33+
createTrigger: true,
34+
querier: client,
35+
});
36+
2537
processor = new EventProcessor<EventType>({
26-
client: createProcessorClient<EventType>(client),
38+
client: createProcessorClient<EventType>({ querier: client }),
39+
wakeupEmitter,
2740
handlerMap: {
2841
ResourceSaved: {
2942
thing1: async (event) => {
@@ -91,20 +104,18 @@ let processor: EventProcessor<EventType> | undefined = undefined;
91104

92105
const shutdown = (() => {
93106
let shutdownStarted = false;
94-
return () => {
107+
return async () => {
95108
if (shutdownStarted) return;
96109

97110
shutdownStarted = true;
98111

99-
processor
100-
?.stop()
101-
.then(() => {
102-
process.exit(0);
103-
})
104-
.catch((err) => {
105-
console.error(err);
106-
process.exit(1);
107-
});
112+
try {
113+
await wakeupEmitter?.close();
114+
} catch (err) {
115+
console.error(err);
116+
process.exit(1);
117+
}
118+
process.exit(0);
108119
};
109120
})();
110121
process.once("SIGTERM", shutdown);

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@
5959
},
6060
"dependencies": {
6161
"p-limit": "^7.2.0",
62-
"p-queue": "^9.0.1"
62+
"p-queue": "^9.0.1",
63+
"throttle-debounce": "^5.0.2"
6364
},
6465
"peerDependencies": {
6566
"mongodb": "^7.0.0",
@@ -69,6 +70,7 @@
6970
"@types/mongodb": "^4.0.7",
7071
"@types/node": "^25.0.3",
7172
"@types/pg": "^8.10.9",
73+
"@types/throttle-debounce": "^5.0.2",
7274
"@vitest/coverage-v8": "^4.0.15",
7375
"copyfiles": "^2.4.1",
7476
"mongodb": "^7.0.0",

src/mongodb/client.ts

Lines changed: 96 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
import { MongoClient, ObjectId } from "mongodb";
1+
import { EventEmitter } from "node:events";
2+
import { MongoClient, ObjectId, type ChangeStream } from "mongodb";
23
import type {
34
TxOBEvent,
45
TxOBProcessorClient,
56
TxOBProcessorClientOpts,
67
TxOBTransactionProcessorClient,
8+
WakeupEmitter,
79
} from "../processor.js";
810
import { getDate } from "../date.js";
911

@@ -20,12 +22,17 @@ const createReadyToProcessFilter = (maxErrors: number) => ({
2022
errors: { $lt: maxErrors },
2123
});
2224

25+
export type CreateProcessorClientOpts<EventType extends string> = {
26+
mongo: MongoClient;
27+
db: string;
28+
collection?: string;
29+
limit?: number;
30+
};
31+
2332
export const createProcessorClient = <EventType extends string>(
24-
mongo: MongoClient,
25-
db: string,
26-
collection: string = "events",
27-
limit: number = 100,
33+
opts: CreateProcessorClientOpts<EventType>,
2834
): TxOBProcessorClient<EventType> => {
35+
const { mongo, db, collection = "events", limit = 100 } = opts;
2936
const getEventsToProcess = async (
3037
opts: TxOBProcessorClientOpts,
3138
): Promise<Pick<TxOBEvent<EventType>, "id" | "errors">[]> => {
@@ -142,3 +149,87 @@ export const createProcessorClient = <EventType extends string>(
142149
transaction,
143150
};
144151
};
152+
153+
type CreateWakeupEmitterOpts = {
154+
mongo: MongoClient;
155+
db: string;
156+
collection?: string;
157+
};
158+
159+
/**
160+
* Creates a MongoDB Change Stream-based wakeup emitter for reducing polling frequency.
161+
* This watches for INSERT operations on the events collection and emits wakeup signals.
162+
*
163+
* **Important**: MongoDB Change Streams require a replica set or sharded cluster.
164+
* If your MongoDB instance is a standalone server, you must convert it to a single-node
165+
* replica set by running `rs.initiate()` in the mongo shell.
166+
*
167+
* If the database is not configured for Change Streams, an error will be emitted via
168+
* the 'error' event on the returned WakeupEmitter. The error typically occurs when
169+
* the change stream attempts to connect.
170+
*
171+
* See: https://www.mongodb.com/docs/manual/changeStreams/
172+
*
173+
* @param opts - Options for the wakeup emitter
174+
* @returns A WakeupEmitter that emits 'wakeup' events when new events are inserted.
175+
* Errors (including replica set requirement failures) are emitted via the 'error' event.
176+
* @throws Does not throw synchronously. Errors are emitted via the 'error' event.
177+
*/
178+
export const createWakeupEmitter = async (
179+
opts: CreateWakeupEmitterOpts,
180+
): Promise<WakeupEmitter & { close: () => Promise<void> }> => {
181+
const { mongo, db, collection = "events" } = opts;
182+
const emitter = new EventEmitter();
183+
184+
// Get the collection to watch
185+
const eventsCollection = mongo.db(db).collection(collection);
186+
187+
// Create a change stream that watches for insert operations
188+
// We only care about inserts - retries after backoff are handled by fallback polling
189+
// Note: watch() may not error immediately - errors typically occur when the change
190+
// stream attempts to connect, which happens asynchronously
191+
const changeStream: ChangeStream = eventsCollection.watch(
192+
[
193+
{
194+
$match: {
195+
operationType: "insert",
196+
},
197+
},
198+
],
199+
{
200+
fullDocument: "default",
201+
},
202+
);
203+
204+
// Handle change stream events
205+
changeStream.on("change", () => {
206+
emitter.emit("wakeup");
207+
});
208+
209+
// Handle change stream errors
210+
// Common errors include:
211+
// - "Change streams are only supported on replica sets" (standalone MongoDB)
212+
// - Connection errors
213+
// - Permission errors
214+
changeStream.on("error", (err) => {
215+
emitter.emit("error", err);
216+
});
217+
218+
// Handle change stream close
219+
changeStream.on("close", () => {
220+
emitter.emit("error", new Error("MongoDB Change Stream closed"));
221+
});
222+
223+
// Return a WakeupEmitter that wraps the EventEmitter
224+
return {
225+
on: (event: "wakeup", listener: () => void) => {
226+
emitter.on(event, listener);
227+
},
228+
off: (event: "wakeup", listener: () => void) => {
229+
emitter.off(event, listener);
230+
},
231+
close: async () => {
232+
await changeStream.close();
233+
},
234+
};
235+
};

src/pg/client.test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ describe("createProcessorClient", () => {
66
const pgClient = {
77
query: vi.fn(),
88
} as any;
9-
const client = createProcessorClient(pgClient);
9+
const client = createProcessorClient({ querier: pgClient });
1010
expect(typeof client.getEventsToProcess).toBe("function");
1111
expect(typeof client.transaction).toBe("function");
1212
});
@@ -25,7 +25,7 @@ describe("getEventsToProcess", () => {
2525
const opts = {
2626
maxErrors: 10,
2727
};
28-
const client = createProcessorClient(pgClient);
28+
const client = createProcessorClient({ querier: pgClient });
2929
const result = await client.getEventsToProcess(opts);
3030
expect(pgClient.query).toHaveBeenCalledOnce();
3131
expect(pgClient.query).toHaveBeenCalledWith(
@@ -41,7 +41,7 @@ describe("transaction", () => {
4141
const pgClient = {
4242
query: vi.fn<any>(() => Promise.resolve()),
4343
} as any;
44-
const client = createProcessorClient(pgClient);
44+
const client = createProcessorClient({ querier: pgClient });
4545
await client.transaction(async () => {});
4646
expect(pgClient.query).toHaveBeenCalledTimes(2);
4747
expect(pgClient.query).toHaveBeenNthCalledWith(1, "BEGIN");
@@ -51,7 +51,7 @@ describe("transaction", () => {
5151
const pgClient = {
5252
query: vi.fn<any>(() => Promise.resolve()),
5353
} as any;
54-
const client = createProcessorClient(pgClient);
54+
const client = createProcessorClient({ querier: pgClient });
5555
await client
5656
.transaction(async () => {
5757
throw new Error("error");
@@ -74,7 +74,7 @@ describe("transaction", () => {
7474
),
7575
} as any;
7676
const eventId = "123";
77-
const client = createProcessorClient(pgClient);
77+
const client = createProcessorClient({ querier: pgClient });
7878
let result: any;
7979
await client.transaction(async (txClient) => {
8080
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {
@@ -101,7 +101,7 @@ describe("transaction", () => {
101101
),
102102
} as any;
103103
const eventId = "123";
104-
const client = createProcessorClient(pgClient);
104+
const client = createProcessorClient({ querier: pgClient });
105105
let result: any;
106106
await client.transaction(async (txClient) => {
107107
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {
@@ -141,7 +141,7 @@ describe("transaction", () => {
141141
},
142142
correlation_id: "abc123",
143143
};
144-
const client = createProcessorClient(pgClient);
144+
const client = createProcessorClient({ querier: pgClient });
145145
await client.transaction(async (txClient) => {
146146
await txClient.updateEvent(event);
147147
});
@@ -176,7 +176,7 @@ describe("transaction", () => {
176176
handler_results: {},
177177
errors: 0,
178178
};
179-
const client = createProcessorClient(pgClient);
179+
const client = createProcessorClient({ querier: pgClient });
180180
await client.transaction(async (txClient) => {
181181
await txClient.createEvent(event);
182182
});
@@ -219,7 +219,7 @@ describe("transaction", () => {
219219
}),
220220
} as any;
221221

222-
const client = createProcessorClient(pgClient);
222+
const client = createProcessorClient({ querier: pgClient });
223223

224224
try {
225225
await client.transaction(async () => {

0 commit comments

Comments
 (0)