Skip to content

Commit c00639d

Browse files
update functions to accept arguments as object
1 parent f000d80 commit c00639d

5 files changed

Lines changed: 91 additions & 75 deletions

File tree

README.md

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ const client = new pg.Client({
129129
});
130130
await client.connect();
131131

132-
const processor = EventProcessor(createProcessorClient(client), {
132+
const processor = EventProcessor(createProcessorClient({ querier: client }), {
133133
UserCreated: {
134134
// Handlers are processed concurrently and independently with retries
135135
// If one handler fails, others continue processing
@@ -191,15 +191,19 @@ For better performance, you can set up wakeup signals to reduce polling frequenc
191191
// PostgreSQL: Use Postgres NOTIFY
192192
import { createWakeupEmitter } from "txob/pg";
193193

194-
const wakeupEmitter = await createWakeupEmitter(clientConfig, {
194+
const wakeupEmitter = await createWakeupEmitter({
195+
listenClientConfig: clientConfig,
195196
createTrigger: true,
196197
querier: client,
197198
});
198199

199200
// MongoDB: Use Change Streams
200201
import { createWakeupEmitter } from "txob/mongodb";
201202

202-
const wakeupEmitter = await createWakeupEmitter(mongoClient, "myapp");
203+
const wakeupEmitter = await createWakeupEmitter({
204+
mongo: mongoClient,
205+
db: "myapp",
206+
});
203207

204208
// Use with EventProcessor
205209
const processor = new EventProcessor({
@@ -492,11 +496,11 @@ const client = new pg.Client({
492496
});
493497
await client.connect();
494498

495-
const processorClient = createProcessorClient(
496-
client,
497-
"events", // Optional: table name (default: "events")
498-
100, // Optional: max events per poll (default: 100)
499-
);
499+
const processorClient = createProcessorClient({
500+
querier: client,
501+
table: "events", // Optional: table name (default: "events")
502+
limit: 100, // Optional: max events per poll (default: 100)
503+
});
500504
```
501505

502506
**4. (Optional) Set up wakeup signals to reduce polling:**
@@ -506,7 +510,8 @@ import { createWakeupEmitter } from "txob/pg";
506510

507511
// Create a wakeup emitter using Postgres NOTIFY
508512
// This will automatically create a trigger that sends NOTIFY on INSERT
509-
const wakeupEmitter = await createWakeupEmitter(clientConfig, {
513+
const wakeupEmitter = await createWakeupEmitter({
514+
listenClientConfig: clientConfig,
510515
createTrigger: true,
511516
querier: client,
512517
table: "events", // Optional: table name (default: "events")
@@ -552,12 +557,12 @@ await eventsCollection.createIndex({ correlation_id: 1 });
552557
```typescript
553558
import { createProcessorClient } from "txob/mongodb";
554559

555-
const processorClient = createProcessorClient(
556-
mongoClient,
557-
"myapp", // Database name
558-
"events", // Optional: collection name (default: "events")
559-
100, // Optional: max events per poll (default: 100)
560-
);
560+
const processorClient = createProcessorClient({
561+
mongo: mongoClient,
562+
db: "myapp", // Database name
563+
collection: "events", // Optional: collection name (default: "events")
564+
limit: 100, // Optional: max events per poll (default: 100)
565+
});
561566
```
562567

563568
**3. (Optional) Set up wakeup signals to reduce polling:**
@@ -567,7 +572,9 @@ import { createWakeupEmitter } from "txob/mongodb";
567572

568573
// Create a wakeup emitter using MongoDB Change Streams
569574
// Note: Requires a replica set or sharded cluster
570-
const wakeupEmitter = await createWakeupEmitter(mongoClient, "myapp", {
575+
const wakeupEmitter = await createWakeupEmitter({
576+
mongo: mongoClient,
577+
db: "myapp",
571578
collection: "events", // Optional: collection name (default: "events")
572579
});
573580

@@ -712,7 +719,7 @@ await client.connect();
712719

713720
// 3. Create and start the processor
714721
const processor = EventProcessor(
715-
createProcessorClient<EventType>(client),
722+
createProcessorClient<EventType>({ querier: client }),
716723
{
717724
UserCreated: {
718725
sendEmail: async (event, { signal }) => {
@@ -833,7 +840,7 @@ gracefulShutdown(server, {
833840
### Multiple Event Types
834841

835842
```typescript
836-
const processor = EventProcessor(createProcessorClient(client), {
843+
const processor = EventProcessor(createProcessorClient({ querier: client }), {
837844
UserCreated: {
838845
sendWelcomeEmail: async (event) => {
839846
/* ... */
@@ -926,7 +933,7 @@ const kafka = new Kafka({ brokers: ["localhost:9092"] });
926933
const producer = kafka.producer();
927934
await producer.connect();
928935

929-
const processor = EventProcessor(createProcessorClient(client), {
936+
const processor = EventProcessor(createProcessorClient({ querier: client }), {
930937
UserCreated: {
931938
// Publish to Kafka with guaranteed consistency
932939
publishToKafka: async (event) => {
@@ -980,7 +987,7 @@ const client = new pg.Client({
980987
});
981988
await client.connect();
982989

983-
const processor = EventProcessor(createProcessorClient(client), {
990+
const processor = EventProcessor(createProcessorClient({ querier: client }), {
984991
// All your handlers...
985992
});
986993

@@ -1050,11 +1057,11 @@ Creates a PostgreSQL processor client.
10501057
```typescript
10511058
import { createProcessorClient } from "txob/pg";
10521059

1053-
createProcessorClient<EventType>(
1054-
client: pg.Client,
1055-
tableName?: string, // Default: "events"
1056-
limit?: number // Default: 100
1057-
): TxOBProcessorClient<EventType>
1060+
createProcessorClient<EventType>(opts: {
1061+
querier: pg.Client;
1062+
table?: string; // Default: "events"
1063+
limit?: number; // Default: 100
1064+
}): TxOBProcessorClient<EventType>
10581065
```
10591066

10601067
### `createProcessorClient` (MongoDB)
@@ -1064,12 +1071,12 @@ Creates a MongoDB processor client.
10641071
```typescript
10651072
import { createProcessorClient } from "txob/mongodb";
10661073

1067-
createProcessorClient<EventType>(
1068-
mongo: mongodb.MongoClient,
1069-
db: string, // Database name
1070-
collectionName?: string, // Default: "events"
1071-
limit?: number // Default: 100
1072-
): TxOBProcessorClient<EventType>
1074+
createProcessorClient<EventType>(opts: {
1075+
mongo: mongodb.MongoClient;
1076+
db: string; // Database name
1077+
collection?: string; // Default: "events"
1078+
limit?: number; // Default: 100
1079+
}): TxOBProcessorClient<EventType>
10731080
```
10741081

10751082
### `TxOBError`
@@ -1110,7 +1117,8 @@ Creates a Postgres NOTIFY-based wakeup emitter to reduce polling frequency.
11101117
```typescript
11111118
import { createWakeupEmitter } from "txob/pg";
11121119

1113-
const wakeupEmitter = await createWakeupEmitter(clientConfig, {
1120+
const wakeupEmitter = await createWakeupEmitter({
1121+
listenClientConfig: clientConfig,
11141122
createTrigger: true, // Automatically create database trigger
11151123
querier: client, // Required if createTrigger is true
11161124
table: "events", // Optional: table name (default: "events")
@@ -1127,7 +1135,9 @@ Creates a MongoDB Change Stream-based wakeup emitter to reduce polling frequency
11271135
```typescript
11281136
import { createWakeupEmitter } from "txob/mongodb";
11291137

1130-
const wakeupEmitter = await createWakeupEmitter(mongoClient, "myapp", {
1138+
const wakeupEmitter = await createWakeupEmitter({
1139+
mongo: mongoClient,
1140+
db: "myapp",
11311141
collection: "events", // Optional: collection name (default: "events")
11321142
});
11331143
```
@@ -1299,7 +1309,7 @@ If using `FOR UPDATE SKIP LOCKED` properly (which txob does), stuck events are n
12991309
- Lower `maxEventConcurrency`
13001310
- Profile handlers for memory leaks
13011311
- Archive old events
1302-
- Reduce `limit` in `createProcessorClient(client, table, limit)`
1312+
- Reduce `limit` in `createProcessorClient({ querier: client, table, limit })`
13031313

13041314
### Duplicate handler executions
13051315

@@ -1578,7 +1588,7 @@ type EventType = keyof typeof eventTypes;
15781588

15791589
// TypeScript will enforce all event types have handlers
15801590
const processor = EventProcessor<EventType>(
1581-
createProcessorClient<EventType>(client),
1591+
createProcessorClient<EventType>({ querier: client }),
15821592
{
15831593
UserCreated: {
15841594
/* handlers */

examples/pg/processor.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@ let wakeupEmitter: WakeupEmitter | undefined = undefined;
2828
await client.connect();
2929
await migrate(client);
3030

31-
wakeupEmitter = await createWakeupEmitter(clientConfig, {
31+
wakeupEmitter = await createWakeupEmitter({
32+
listenClientConfig: clientConfig,
3233
createTrigger: true,
3334
querier: client,
3435
});
3536

3637
processor = new EventProcessor<EventType>({
37-
client: createProcessorClient<EventType>(client),
38+
client: createProcessorClient<EventType>({ querier: client }),
3839
wakeupEmitter,
3940
handlerMap: {
4041
ResourceSaved: {

src/mongodb/client.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@ const createReadyToProcessFilter = (maxErrors: number) => ({
2222
errors: { $lt: maxErrors },
2323
});
2424

25+
export type CreateProcessorClientOpts<EventType extends string> = {
26+
mongo: MongoClient;
27+
db: string;
28+
collection?: string;
29+
limit?: number;
30+
};
31+
2532
export const createProcessorClient = <EventType extends string>(
26-
mongo: MongoClient,
27-
db: string,
28-
collection: string = "events",
29-
limit: number = 100,
33+
opts: CreateProcessorClientOpts<EventType>,
3034
): TxOBProcessorClient<EventType> => {
35+
const { mongo, db, collection = "events", limit = 100 } = opts;
3136
const getEventsToProcess = async (
3237
opts: TxOBProcessorClientOpts,
3338
): Promise<Pick<TxOBEvent<EventType>, "id" | "errors">[]> => {
@@ -146,6 +151,8 @@ export const createProcessorClient = <EventType extends string>(
146151
};
147152

148153
type CreateWakeupEmitterOpts = {
154+
mongo: MongoClient;
155+
db: string;
149156
collection?: string;
150157
};
151158

@@ -163,19 +170,15 @@ type CreateWakeupEmitterOpts = {
163170
*
164171
* See: https://www.mongodb.com/docs/manual/changeStreams/
165172
*
166-
* @param mongo - MongoDB client instance
167-
* @param db - Database name
168173
* @param opts - Options for the wakeup emitter
169174
* @returns A WakeupEmitter that emits 'wakeup' events when new events are inserted.
170175
* Errors (including replica set requirement failures) are emitted via the 'error' event.
171176
* @throws Does not throw synchronously. Errors are emitted via the 'error' event.
172177
*/
173178
export const createWakeupEmitter = async (
174-
mongo: MongoClient,
175-
db: string,
176-
opts?: CreateWakeupEmitterOpts,
179+
opts: CreateWakeupEmitterOpts,
177180
): Promise<WakeupEmitter & { close: () => Promise<void> }> => {
178-
const collection = opts?.collection ?? "events";
181+
const { mongo, db, collection = "events" } = opts;
179182
const emitter = new EventEmitter();
180183

181184
// Get the collection to watch

src/pg/client.test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ describe("createProcessorClient", () => {
66
const pgClient = {
77
query: vi.fn(),
88
} as any;
9-
const client = createProcessorClient(pgClient);
9+
const client = createProcessorClient({ querier: pgClient });
1010
expect(typeof client.getEventsToProcess).toBe("function");
1111
expect(typeof client.transaction).toBe("function");
1212
});
@@ -25,7 +25,7 @@ describe("getEventsToProcess", () => {
2525
const opts = {
2626
maxErrors: 10,
2727
};
28-
const client = createProcessorClient(pgClient);
28+
const client = createProcessorClient({ querier: pgClient });
2929
const result = await client.getEventsToProcess(opts);
3030
expect(pgClient.query).toHaveBeenCalledOnce();
3131
expect(pgClient.query).toHaveBeenCalledWith(
@@ -41,7 +41,7 @@ describe("transaction", () => {
4141
const pgClient = {
4242
query: vi.fn<any>(() => Promise.resolve()),
4343
} as any;
44-
const client = createProcessorClient(pgClient);
44+
const client = createProcessorClient({ querier: pgClient });
4545
await client.transaction(async () => {});
4646
expect(pgClient.query).toHaveBeenCalledTimes(2);
4747
expect(pgClient.query).toHaveBeenNthCalledWith(1, "BEGIN");
@@ -51,7 +51,7 @@ describe("transaction", () => {
5151
const pgClient = {
5252
query: vi.fn<any>(() => Promise.resolve()),
5353
} as any;
54-
const client = createProcessorClient(pgClient);
54+
const client = createProcessorClient({ querier: pgClient });
5555
await client
5656
.transaction(async () => {
5757
throw new Error("error");
@@ -74,7 +74,7 @@ describe("transaction", () => {
7474
),
7575
} as any;
7676
const eventId = "123";
77-
const client = createProcessorClient(pgClient);
77+
const client = createProcessorClient({ querier: pgClient });
7878
let result: any;
7979
await client.transaction(async (txClient) => {
8080
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {
@@ -101,7 +101,7 @@ describe("transaction", () => {
101101
),
102102
} as any;
103103
const eventId = "123";
104-
const client = createProcessorClient(pgClient);
104+
const client = createProcessorClient({ querier: pgClient });
105105
let result: any;
106106
await client.transaction(async (txClient) => {
107107
result = await txClient.getEventByIdForUpdateSkipLocked(eventId, {
@@ -141,7 +141,7 @@ describe("transaction", () => {
141141
},
142142
correlation_id: "abc123",
143143
};
144-
const client = createProcessorClient(pgClient);
144+
const client = createProcessorClient({ querier: pgClient });
145145
await client.transaction(async (txClient) => {
146146
await txClient.updateEvent(event);
147147
});
@@ -176,7 +176,7 @@ describe("transaction", () => {
176176
handler_results: {},
177177
errors: 0,
178178
};
179-
const client = createProcessorClient(pgClient);
179+
const client = createProcessorClient({ querier: pgClient });
180180
await client.transaction(async (txClient) => {
181181
await txClient.createEvent(event);
182182
});
@@ -219,7 +219,7 @@ describe("transaction", () => {
219219
}),
220220
} as any;
221221

222-
const client = createProcessorClient(pgClient);
222+
const client = createProcessorClient({ querier: pgClient });
223223

224224
try {
225225
await client.transaction(async () => {

0 commit comments

Comments
 (0)