Skip to content

Commit 4e6e31e

Browse files
set max queued events to prevent memory leaks
1 parent 948ea41 commit 4e6e31e

3 files changed

Lines changed: 19 additions & 13 deletions

File tree

examples/pg/processor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ let processor: EventProcessor<EventType> | undefined = undefined;
4141
return;
4242
},
4343
thing3: async (event) => {
44-
await sleep(20_000);
44+
await sleep(Math.random() * 10_000);
4545
console.log(`${event.id} thing3 ${event.correlation_id}`);
4646
if (Math.random() > 0.75) throw new Error("some issue");
4747

src/pg/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export const createProcessorClient = <EventType extends string>(
2424
const events = await querier.query<
2525
Pick<TxOBEvent<EventType>, "id" | "errors">
2626
>(
27-
`SELECT id, errors FROM ${escapeIdentifier(table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 LIMIT ${limit}`,
27+
`SELECT id, errors FROM ${escapeIdentifier(table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT ${limit}`,
2828
[opts.maxErrors],
2929
);
3030
return events.rows;

src/processor.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ const defaultPollingIntervalMs = 5000;
7777
const defaultMaxErrors = 5;
7878
const defaultMaxEventConcurrency = 20;
7979
const defaultMaxHandlerConcurrency = 10;
80+
const defaultMaxQueuedEvents = 100;
8081

8182
type TxOBProcessEventsOpts<TxOBEventType extends string> = {
8283
maxErrors: number;
@@ -85,6 +86,7 @@ type TxOBProcessEventsOpts<TxOBEventType extends string> = {
8586
logger?: Logger;
8687
maxEventConcurrency?: number;
8788
maxHandlerConcurrency?: number;
89+
maxQueuedEvents?: number;
8890
onEventMaxErrorsReached?: (opts: {
8991
event: Readonly<TxOBEvent<TxOBEventType>>;
9092
txClient: TxOBTransactionProcessorClient<TxOBEventType>;
@@ -327,17 +329,6 @@ const processEvent = async <TxOBEventType extends string>({
327329
lockedEvent.processed_at = getDate();
328330
}
329331

330-
logger?.debug(
331-
{
332-
eventId: lockedEvent.id,
333-
type: lockedEvent.type,
334-
lockedEvent,
335-
correlationId: lockedEvent.correlation_id,
336-
errored,
337-
},
338-
"updating event",
339-
);
340-
341332
await txClient.updateEvent(lockedEvent);
342333
});
343334
};
@@ -369,6 +360,7 @@ export class EventProcessor<TxOBEventType extends string> {
369360
private handlerMap: TxOBEventHandlerMap<TxOBEventType>;
370361
private opts: Omit<TxOBProcessEventsOpts<TxOBEventType>, "signal"> & {
371362
pollingIntervalMs: number;
363+
maxQueuedEvents: number;
372364
};
373365
private abortController: AbortController;
374366
private queue: PQueue;
@@ -390,6 +382,7 @@ export class EventProcessor<TxOBEventType extends string> {
390382
backoff: defaultBackoff,
391383
maxEventConcurrency: defaultMaxEventConcurrency,
392384
maxHandlerConcurrency: defaultMaxHandlerConcurrency,
385+
maxQueuedEvents: defaultMaxQueuedEvents,
393386
...opts,
394387
};
395388
this.client = client;
@@ -415,6 +408,19 @@ export class EventProcessor<TxOBEventType extends string> {
415408
try {
416409
do {
417410
try {
411+
// Skip polling if we're at capacity to prevent memory leaks
412+
if (queuedEventIds.size >= this.opts.maxQueuedEvents) {
413+
this.opts.logger?.debug(
414+
{
415+
queuedCount: queuedEventIds.size,
416+
maxQueuedEvents: this.opts.maxQueuedEvents,
417+
},
418+
"skipping poll - queue at capacity",
419+
);
420+
await sleep(this.opts.pollingIntervalMs);
421+
continue;
422+
}
423+
418424
const events = await this.client.getEventsToProcess({
419425
...this.opts,
420426
signal: this.abortController.signal,

0 commit comments

Comments
 (0)