Skip to content

Commit e44f18c

Browse files
Merge pull request #58 from dillonstreator/decouple-polling-and-processing
Decouple polling and processing
2 parents f874c8a + 67aa496 commit e44f18c

9 files changed

Lines changed: 718 additions & 496 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ await db.query("COMMIT");
8080
-**Configurable error handling** - Exponential backoff, max retries, and custom error hooks
8181
-**TypeScript-first** - Full type safety and autocompletion
8282
-**Handler result tracking** - Track the execution status of each handler independently
83-
-**Minimal dependencies** - Only `p-limit` (plus your database driver)
83+
-**Minimal dependencies** - Only `p-limit` and `p-queue` (plus your database driver)
8484

8585
## Quick Start
8686

examples/pg/processor.ts

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import {
77
import { createProcessorClient } from "../../src/pg/client.js";
88
import { migrate, type EventType, eventTypes } from "./server.js";
99
import dotenv from "dotenv";
10+
import { sleep } from "../../src/sleep.js";
1011
dotenv.config();
1112

12-
let processor: ReturnType<typeof EventProcessor> | undefined = undefined;
13+
let processor: EventProcessor<EventType> | undefined = undefined;
1314

1415
(async () => {
1516
const client = new pg.Client({
@@ -21,9 +22,9 @@ let processor: ReturnType<typeof EventProcessor> | undefined = undefined;
2122
await client.connect();
2223
await migrate(client);
2324

24-
processor = EventProcessor(
25-
createProcessorClient<EventType>(client),
26-
{
25+
processor = new EventProcessor<EventType>({
26+
client: createProcessorClient<EventType>(client),
27+
handlerMap: {
2728
ResourceSaved: {
2829
thing1: async (event) => {
2930
console.log(`${event.id} thing1 ${event.correlation_id}`);
@@ -40,6 +41,7 @@ let processor: ReturnType<typeof EventProcessor> | undefined = undefined;
4041
return;
4142
},
4243
thing3: async (event) => {
44+
await sleep(Math.random() * 10_000);
4345
console.log(`${event.id} thing3 ${event.correlation_id}`);
4446
if (Math.random() > 0.75) throw new Error("some issue");
4547

@@ -51,41 +53,39 @@ let processor: ReturnType<typeof EventProcessor> | undefined = undefined;
5153
// For example, you might want to send alerts or log to external systems
5254
},
5355
},
54-
{
55-
sleepTimeMs: 5000,
56-
logger: console,
57-
onEventMaxErrorsReached: async ({ event, txClient, signal }) => {
58-
// Transactionally persist an 'event max errors reached' event
59-
// This hook is called when:
60-
// - Maximum allowed errors are reached
61-
// - An unprocessable error is encountered
62-
// - Event handler map is missing for the event type
56+
pollingIntervalMs: 5000,
57+
logger: console,
58+
onEventMaxErrorsReached: async ({ event, txClient, signal }) => {
59+
// Transactionally persist an 'event max errors reached' event
60+
// This hook is called when:
61+
// - Maximum allowed errors are reached
62+
// - An unprocessable error is encountered
63+
// - Event handler map is missing for the event type
6364

64-
// Use the abort signal for cleanup during graceful shutdown
65-
if (signal?.aborted) {
66-
return;
67-
}
68-
69-
await txClient.createEvent({
70-
id: randomUUID(),
71-
timestamp: new Date(),
72-
type: eventTypes.EventMaxErrorsReached,
73-
data: {
74-
failedEventId: event.id,
75-
failedEventType: event.type,
76-
failedEventCorrelationId: event.correlation_id,
77-
},
78-
correlation_id: event.correlation_id,
79-
handler_results: {},
80-
errors: 0,
81-
});
65+
// Use the abort signal for cleanup during graceful shutdown
66+
if (signal?.aborted) {
67+
return;
68+
}
8269

83-
console.log("Event max errors reached event created", {
70+
await txClient.createEvent({
71+
id: randomUUID(),
72+
timestamp: new Date(),
73+
type: eventTypes.EventMaxErrorsReached,
74+
data: {
8475
failedEventId: event.id,
85-
});
86-
},
76+
failedEventType: event.type,
77+
failedEventCorrelationId: event.correlation_id,
78+
},
79+
correlation_id: event.correlation_id,
80+
handler_results: {},
81+
errors: 0,
82+
});
83+
84+
console.log("Event max errors reached event created", {
85+
failedEventId: event.id,
86+
});
8787
},
88-
);
88+
});
8989
processor.start();
9090
})();
9191

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@
5858
"format": "prettier -w ."
5959
},
6060
"dependencies": {
61-
"p-limit": "^7.2.0"
61+
"p-limit": "^7.2.0",
62+
"p-queue": "^9.0.1"
6263
},
6364
"peerDependencies": {
6465
"mongodb": "^7.0.0",

src/mongodb/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export const createProcessorClient = <EventType extends string>(
3737
.find(filter)
3838
.project({ id: 1, errors: 1 })
3939
.limit(limit)
40+
.sort('timestamp', 'asc')
4041
.toArray()) as Pick<TxOBEvent<EventType>, "id" | "errors">[];
4142

4243
return events;

src/pg/client.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ describe("getEventsToProcess", () => {
2929
const result = await client.getEventsToProcess(opts);
3030
expect(pgClient.query).toHaveBeenCalledOnce();
3131
expect(pgClient.query).toHaveBeenCalledWith(
32-
'SELECT id, errors FROM "events" WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 LIMIT 100',
32+
'SELECT id, errors FROM "events" WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT 100',
3333
[opts.maxErrors],
3434
);
3535
expect(result).toBe(rows);

src/pg/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export const createProcessorClient = <EventType extends string>(
2424
const events = await querier.query<
2525
Pick<TxOBEvent<EventType>, "id" | "errors">
2626
>(
27-
`SELECT id, errors FROM ${escapeIdentifier(table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 LIMIT ${limit}`,
27+
`SELECT id, errors FROM ${escapeIdentifier(table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT ${limit}`,
2828
[opts.maxErrors],
2929
);
3030
return events.rows;

0 commit comments

Comments
 (0)