Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 265 additions & 43 deletions README.md

Large diffs are not rendered by default.

72 changes: 72 additions & 0 deletions architecture.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
graph TB
subgraph Application["Your Application"]
AppTx["BEGIN TRANSACTION"]
AppData["1. Insert/Update business data<br/>(users, orders, etc.)"]
AppEvent["2. Insert event record"]
AppCommit["COMMIT TRANSACTION"]

AppTx --> AppData
AppData --> AppEvent
AppEvent --> AppCommit
end

subgraph Database["Database"]
EventsTable[("Events Table<br/>id, type, data,<br/>processed_at, errors,<br/>backoff_until")]

AppCommit -->|"Both saved atomically ✅"| EventsTable
end

subgraph WakeupSignals["Wakeup Signals (Optional)"]
direction TB
PGTrigger["PostgreSQL Trigger<br/>(auto-created)"]
PGNotify["PostgreSQL NOTIFY<br/>(LISTEN connection)"]
MongoStream["MongoDB Change Streams<br/>(watches for inserts)"]

EventsTable -->|"INSERT triggers"| PGTrigger
PGTrigger -->|"sends NOTIFY"| PGNotify
EventsTable -->|"INSERT operations"| MongoStream
end

subgraph Polling["Polling Component<br/>(Decoupled from Processing)"]
direction TB
WakeupListener["Wakeup Signal Listener<br/>(on 'wakeup' event)"]
ThrottledPoll["Throttled Poll Function<br/>(leading + trailing edge)<br/>Prevents excessive polling"]
FallbackLoop["Fallback Polling Loop<br/>(runs periodically)"]
TimeoutCheck["Check: time since last wakeup<br/>> wakeupTimeoutMs?"]

PGNotify -->|"emits 'wakeup'"| WakeupListener
MongoStream -->|"emits 'wakeup'"| WakeupListener
WakeupListener -->|"triggers"| ThrottledPoll
FallbackLoop -->|"waits pollingIntervalMs"| TimeoutCheck
TimeoutCheck -->|"yes: no recent wakeup"| ThrottledPoll
TimeoutCheck -->|"no: recent wakeup"| FallbackLoop
end

subgraph Processing["Processing Queue & Processor"]
direction TB
PollQuery["getEventsToProcess()<br/>SELECT unprocessed events<br/>(FOR UPDATE SKIP LOCKED)"]
EventQueue["Processing Queue<br/>(PQueue with concurrency limit)"]
ProcessEvent["processEvent()"]
LockEvent["Lock event in transaction<br/>(getEventByIdForUpdateSkipLocked)"]
ExecuteHandlers["Execute handlers concurrently<br/>(pLimit for handler concurrency)<br/>send email, webhook, etc."]
UpdateEvent["updateEvent()<br/>UPDATE with results & processed_at"]
ErrorHandling["On failure:<br/>increment errors,<br/>set backoff_until"]

ThrottledPoll -->|"calls"| PollQuery
PollQuery -->|"returns event IDs"| EventQueue
EventQueue -->|"queues for processing"| ProcessEvent
ProcessEvent --> LockEvent
LockEvent -->|"event locked"| ExecuteHandlers
ExecuteHandlers -->|"all handlers complete"| UpdateEvent
ExecuteHandlers -->|"handler fails"| ErrorHandling
ErrorHandling --> UpdateEvent
UpdateEvent -->|"commits transaction"| EventsTable
end

EventsTable -->|"queries for<br/>unprocessed events"| PollQuery

style Application fill:#e1f5ff
style Database fill:#fff4e1
style WakeupSignals fill:#f0f4ff
style Polling fill:#e8f5e9
style Processing fill:#fce4ec
39 changes: 25 additions & 14 deletions examples/pg/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,40 @@ import { randomUUID } from "node:crypto";
import {
ErrorUnprocessableEventHandler,
EventProcessor,
WakeupEmitter,
} from "../../src/index.js";
import { createProcessorClient } from "../../src/pg/client.js";
import {
createProcessorClient,
createWakeupEmitter,
} from "../../src/pg/client.js";
import { migrate, type EventType, eventTypes } from "./server.js";
import dotenv from "dotenv";
import { sleep } from "../../src/sleep.js";
dotenv.config();

let processor: EventProcessor<EventType> | undefined = undefined;
let wakeupEmitter: WakeupEmitter | undefined = undefined;

(async () => {
const client = new pg.Client({
const clientConfig: pg.ClientConfig = {
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD,
database: process.env.POSTGRES_DB,
port: parseInt(process.env.POSTGRES_PORT || "5434"),
});
};
const client = new pg.Client(clientConfig);
await client.connect();
await migrate(client);

wakeupEmitter = await createWakeupEmitter({
listenClientConfig: clientConfig,
createTrigger: true,
querier: client,
});

processor = new EventProcessor<EventType>({
client: createProcessorClient<EventType>(client),
client: createProcessorClient<EventType>({ querier: client }),
wakeupEmitter,
handlerMap: {
ResourceSaved: {
thing1: async (event) => {
Expand Down Expand Up @@ -91,20 +104,18 @@ let processor: EventProcessor<EventType> | undefined = undefined;

const shutdown = (() => {
let shutdownStarted = false;
return () => {
return async () => {
if (shutdownStarted) return;

shutdownStarted = true;

processor
?.stop()
.then(() => {
process.exit(0);
})
.catch((err) => {
console.error(err);
process.exit(1);
});
try {
await wakeupEmitter?.close();
} catch (err) {
console.error(err);
process.exit(1);
}
process.exit(0);
};
})();
process.once("SIGTERM", shutdown);
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@
},
"dependencies": {
"p-limit": "^7.2.0",
"p-queue": "^9.0.1"
"p-queue": "^9.0.1",
"throttle-debounce": "^5.0.2"
},
"peerDependencies": {
"mongodb": "^7.0.0",
Expand All @@ -69,6 +70,7 @@
"@types/mongodb": "^4.0.7",
"@types/node": "^25.0.3",
"@types/pg": "^8.10.9",
"@types/throttle-debounce": "^5.0.2",
"@vitest/coverage-v8": "^4.0.15",
"copyfiles": "^2.4.1",
"mongodb": "^7.0.0",
Expand Down
101 changes: 96 additions & 5 deletions src/mongodb/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { MongoClient, ObjectId } from "mongodb";
import { EventEmitter } from "node:events";
import { MongoClient, ObjectId, type ChangeStream } from "mongodb";
import type {
TxOBEvent,
TxOBProcessorClient,
TxOBProcessorClientOpts,
TxOBTransactionProcessorClient,
WakeupEmitter,
} from "../processor.js";
import { getDate } from "../date.js";

Expand All @@ -20,12 +22,17 @@ const createReadyToProcessFilter = (maxErrors: number) => ({
errors: { $lt: maxErrors },
});

export type CreateProcessorClientOpts<EventType extends string> = {
mongo: MongoClient;
db: string;
collection?: string;
limit?: number;
};

export const createProcessorClient = <EventType extends string>(
mongo: MongoClient,
db: string,
collection: string = "events",
limit: number = 100,
opts: CreateProcessorClientOpts<EventType>,
): TxOBProcessorClient<EventType> => {
const { mongo, db, collection = "events", limit = 100 } = opts;
const getEventsToProcess = async (
opts: TxOBProcessorClientOpts,
): Promise<Pick<TxOBEvent<EventType>, "id" | "errors">[]> => {
Expand Down Expand Up @@ -142,3 +149,87 @@ export const createProcessorClient = <EventType extends string>(
transaction,
};
};

type CreateWakeupEmitterOpts = {
mongo: MongoClient;
db: string;
collection?: string;
};

/**
* Creates a MongoDB Change Stream-based wakeup emitter for reducing polling frequency.
* This watches for INSERT operations on the events collection and emits wakeup signals.
*
* **Important**: MongoDB Change Streams require a replica set or sharded cluster.
* If your MongoDB instance is a standalone server, you must convert it to a single-node
* replica set by running `rs.initiate()` in the mongo shell.
*
* If the database is not configured for Change Streams, an error will be emitted via
* the 'error' event on the returned WakeupEmitter. The error typically occurs when
* the change stream attempts to connect.
*
* See: https://www.mongodb.com/docs/manual/changeStreams/
*
* @param opts - Options for the wakeup emitter
* @returns A WakeupEmitter that emits 'wakeup' events when new events are inserted.
* Errors (including replica set requirement failures) are emitted via the 'error' event.
* @throws Does not throw synchronously. Errors are emitted via the 'error' event.
*/
export const createWakeupEmitter = async (
opts: CreateWakeupEmitterOpts,
): Promise<WakeupEmitter & { close: () => Promise<void> }> => {
const { mongo, db, collection = "events" } = opts;
const emitter = new EventEmitter();

// Get the collection to watch
const eventsCollection = mongo.db(db).collection(collection);

// Create a change stream that watches for insert operations
// We only care about inserts - retries after backoff are handled by fallback polling
// Note: watch() may not error immediately - errors typically occur when the change
// stream attempts to connect, which happens asynchronously
const changeStream: ChangeStream = eventsCollection.watch(
[
{
$match: {
operationType: "insert",
},
},
],
{
fullDocument: "default",
},
);

// Handle change stream events
changeStream.on("change", () => {
emitter.emit("wakeup");
});

// Handle change stream errors
// Common errors include:
// - "Change streams are only supported on replica sets" (standalone MongoDB)
// - Connection errors
// - Permission errors
changeStream.on("error", (err) => {
emitter.emit("error", err);
});

// Handle change stream close
changeStream.on("close", () => {
emitter.emit("error", new Error("MongoDB Change Stream closed"));
});

// Return a WakeupEmitter that wraps the EventEmitter
return {
on: (event: "wakeup", listener: () => void) => {
emitter.on(event, listener);
},
off: (event: "wakeup", listener: () => void) => {
emitter.off(event, listener);
},
close: async () => {
await changeStream.close();
},
};
};
18 changes: 9 additions & 9 deletions src/pg/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ describe("createProcessorClient", () => {
const pgClient = {
query: vi.fn(),
} as any;
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
expect(typeof client.getEventsToProcess).toBe("function");
expect(typeof client.transaction).toBe("function");
});
Expand All @@ -25,7 +25,7 @@ describe("getEventsToProcess", () => {
const opts = {
maxErrors: 10,
};
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
const result = await client.getEventsToProcess(opts);
expect(pgClient.query).toHaveBeenCalledOnce();
expect(pgClient.query).toHaveBeenCalledWith(
Expand All @@ -41,7 +41,7 @@ describe("transaction", () => {
const pgClient = {
query: vi.fn<any>(() => Promise.resolve()),
} as any;
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
await client.transaction(async () => {});
expect(pgClient.query).toHaveBeenCalledTimes(2);
expect(pgClient.query).toHaveBeenNthCalledWith(1, "BEGIN");
Expand All @@ -51,7 +51,7 @@ describe("transaction", () => {
const pgClient = {
query: vi.fn<any>(() => Promise.resolve()),
} as any;
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
await client
.transaction(async () => {
throw new Error("error");
Expand All @@ -74,7 +74,7 @@ describe("transaction", () => {
),
} as any;
const eventId = "123";
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
let result: any;
await client.transaction(async (txClient) => {
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {
Expand All @@ -101,7 +101,7 @@ describe("transaction", () => {
),
} as any;
const eventId = "123";
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
let result: any;
await client.transaction(async (txClient) => {
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {
Expand Down Expand Up @@ -141,7 +141,7 @@ describe("transaction", () => {
},
correlation_id: "abc123",
};
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
await client.transaction(async (txClient) => {
await txClient.updateEvent(event);
});
Expand Down Expand Up @@ -176,7 +176,7 @@ describe("transaction", () => {
handler_results: {},
errors: 0,
};
const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });
await client.transaction(async (txClient) => {
await txClient.createEvent(event);
});
Expand Down Expand Up @@ -219,7 +219,7 @@ describe("transaction", () => {
}),
} as any;

const client = createProcessorClient(pgClient);
const client = createProcessorClient({ querier: pgClient });

try {
await client.transaction(async () => {
Expand Down
Loading