diff --git a/README.md b/README.md index 69a4b38..8a2c7b4 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,7 @@ await db.query("COMMIT"); - ✅ **Graceful shutdown** - Finish processing in-flight events before shutting down - ✅ **Horizontal scalability** - Run multiple processors without conflicts using row-level locking - ✅ **Database agnostic** - Built-in support for PostgreSQL and MongoDB, or implement your own +- ✅ **Reduced polling frequency** - Optional wakeup signals (Postgres NOTIFY, MongoDB Change Streams) to reduce database load and reduce latency - ✅ **Configurable error handling** - Exponential backoff, max retries, and custom error hooks - ✅ **TypeScript-first** - Full type safety and autocompletion - ✅ **Handler result tracking** - Track the execution status of each handler independently @@ -128,7 +129,7 @@ const client = new pg.Client({ }); await client.connect(); -const processor = EventProcessor(createProcessorClient(client), { +const processor = EventProcessor(createProcessorClient({ querier: client }), { UserCreated: { // Handlers are processed concurrently and independently with retries // If one handler fails, others continue processing @@ -182,6 +183,44 @@ await client.query("COMMIT"); That's it! The processor will automatically poll for new events and execute your handlers. +**Optional: Reduce polling with wakeup signals** + +For better performance, you can set up wakeup signals to reduce polling frequency: + +```typescript +// PostgreSQL: Use Postgres NOTIFY +import { createWakeupEmitter } from "txob/pg"; + +const wakeupEmitter = await createWakeupEmitter({ + listenClientConfig: clientConfig, + createTrigger: true, + querier: client, +}); + +// MongoDB: Use Change Streams +import { createWakeupEmitter } from "txob/mongodb"; + +const wakeupEmitter = await createWakeupEmitter({ + mongo: mongoClient, + db: "myapp", +}); + +// Use with EventProcessor +const processor = new EventProcessor({ + client: processorClient, + wakeupEmitter, // Polls immediately when new events arrive + handlerMap: { + /* ... */ + }, +}); +``` + +When a wakeup emitter is provided, the processor will: + +- Poll immediately when new events are inserted (via wakeup signal) +- Still poll periodically as a fallback if wakeup signals are missed +- Throttle wakeup signals to prevent excessive polling during bursts + ## How It Works ``` @@ -201,13 +240,38 @@ That's it! The processor will automatically poll for new events and execute your │ [id] [type] [data] [processed_at] [errors] [backoff_until] │ └─────────────────────────────────────────────────────────────────┘ │ - │ Polls every few seconds + ┌──────────────────┴──────────────────┐ + │ │ + │ (Optional) Wakeup Signal │ + │ (Postgres NOTIFY / MongoDB Stream) │ + │ │ + ▼ ▼ +┌──────────────────────────────┐ ┌──────────────────────────────┐ +│ Polling Component │ │ Fallback Polling Loop │ +│ (Decoupled from Processing) │ │ (If wakeup signals missed)│ +├──────────────────────────────┤ ├──────────────────────────────┤ +│ • Listens for wakeup signals│ │ • Polls periodically │ +│ • Throttles rapid signals │ │ • Only if no recent wakeup │ +│ • Triggers immediate poll │ │ • Uses same throttled poll │ +└──────────────────────────────┘ └──────────────────────────────┘ + │ │ + └──────────────────┬──────────────────┘ + │ + │ SELECT unprocessed events + │ (FOR UPDATE SKIP LOCKED) + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Processing Queue │ +│ (Concurrency-controlled event queue) │ +└─────────────────────────────────────────────────────────────────┘ + │ + │ Process events concurrently ▼ ┌─────────────────────────────────────────────────────────────────┐ -│ Event Processor (txob) │ +│ Event Processor │ ├─────────────────────────────────────────────────────────────────┤ -│ 1. SELECT unprocessed events (FOR UPDATE SKIP LOCKED) │ -│ 2. Execute handlers (send email, webhook, etc.) │ +│ 1. Lock event in transaction │ +│ 2. Execute handlers concurrently (send email, webhook, etc.) │ │ 3. UPDATE event with results and processed_at │ │ 4. On failure: increment errors, set backoff_until │ └─────────────────────────────────────────────────────────────────┘ @@ -220,6 +284,12 @@ That's it! The processor will automatically poll for new events and execute your - The processor runs independently and guarantees **at-least-once delivery** - Multiple processors can run concurrently using database row locking - Failed events are retried with exponential backoff +- Polling and processing are **decoupled** - polling finds events, processing queue handles execution +- **Wakeup signals** (Postgres NOTIFY or MongoDB Change Streams) reduce polling latency +- **Throttled polling** prevents excessive database queries during event bursts +- **Fallback polling** ensures events are processed even if wakeup signals are missed + +For a detailed architecture diagram, see [architecture.mmd](./architecture.mmd). ## Core Concepts @@ -333,7 +403,34 @@ EventProcessor(client, handlers, { }); ``` -**2. Unprocessable Events** +**2. Custom Backoff with TxOBError** + +You can throw `TxOBError` to specify a custom backoff time for retries: + +```typescript +import { TxOBError } from "txob"; + +UserCreated: { + sendEmail: async (event) => { + try { + await emailService.send(event.data.email); + } catch (err) { + if (err.code === "RATE_LIMIT_EXCEEDED") { + // Retry after 1 minute instead of using default backoff + throw new TxOBError("Rate limit exceeded", { + cause: err, + backoffUntil: new Date(Date.now() + 60000), + }); + } + throw err; // Use default backoff for other errors + } + }; +} +``` + +**Note:** If multiple handlers throw `TxOBError` with different `backoffUntil` dates, the processor will use the latest (maximum) backoff time. + +**3. Unprocessable Events** Sometimes an event cannot be processed (e.g., invalid data). Mark it as unprocessable to stop retrying: @@ -352,7 +449,7 @@ UserCreated: { } ``` -**3. Max Errors Hook** +**4. Max Errors Hook** When an event reaches max errors, you can create a "dead letter" event: @@ -433,11 +530,39 @@ const client = new pg.Client({ }); await client.connect(); -const processorClient = createProcessorClient( - client, - "events", // Optional: table name (default: "events") - 100, // Optional: max events per poll (default: 100) -); +const processorClient = createProcessorClient({ + querier: client, + table: "events", // Optional: table name (default: "events") + limit: 100, // Optional: max events per poll (default: 100) +}); +``` + +**4. (Optional) Set up wakeup signals to reduce polling:** + +```typescript +import { createWakeupEmitter } from "txob/pg"; + +// Create a wakeup emitter using Postgres NOTIFY +// This will automatically create a trigger that sends NOTIFY on INSERT +const wakeupEmitter = await createWakeupEmitter({ + listenClientConfig: clientConfig, + createTrigger: true, + querier: client, + table: "events", // Optional: table name (default: "events") + channel: "txob_events", // Optional: NOTIFY channel (default: "txob_events") +}); + +// Use with EventProcessor +const processor = new EventProcessor({ + client: processorClient, + wakeupEmitter, // Reduces polling frequency when new events arrive + handlerMap: { + /* ... */ + }, + pollingIntervalMs: 5000, // Still used as fallback if wakeup signals are missed + wakeupTimeoutMs: 60000, // Fallback poll if no wakeup signal received in 60s + wakeupThrottleMs: 1000, // Throttle wakeup signals to prevent excessive polling +}); ``` ### MongoDB @@ -468,14 +593,50 @@ await eventsCollection.createIndex({ correlation_id: 1 }); ```typescript import { createProcessorClient } from "txob/mongodb"; -const processorClient = createProcessorClient( - db, - "events", // Optional: collection name (default: "events") - 100, // Optional: max events per poll (default: 100) -); +const processorClient = createProcessorClient({ + mongo: mongoClient, + db: "myapp", // Database name + collection: "events", // Optional: collection name (default: "events") + limit: 100, // Optional: max events per poll (default: 100) +}); ``` -**Note:** MongoDB transactions require a replica set or sharded cluster. [See MongoDB docs](https://www.mongodb.com/docs/manual/core/transactions/). +**3. (Optional) Set up wakeup signals to reduce polling:** + +```typescript +import { createWakeupEmitter } from "txob/mongodb"; + +// Create a wakeup emitter using MongoDB Change Streams +// Note: Requires a replica set or sharded cluster +const wakeupEmitter = await createWakeupEmitter({ + mongo: mongoClient, + db: "myapp", + collection: "events", // Optional: collection name (default: "events") +}); + +// Use with EventProcessor +const processor = new EventProcessor({ + client: processorClient, + wakeupEmitter, // Reduces polling frequency when new events arrive + handlerMap: { + /* ... */ + }, + pollingIntervalMs: 5000, // Still used as fallback if wakeup signals are missed + wakeupTimeoutMs: 60000, // Fallback poll if no wakeup signal received in 60s + wakeupThrottleMs: 100, // Throttle wakeup signals to prevent excessive polling +}); + +// Handle wakeup emitter errors (e.g., if not configured for replica set) +wakeupEmitter.on("error", (err) => { + console.error("Wakeup emitter error:", err); + // Processor will automatically fall back to polling +}); +``` + +**Note:** + +- MongoDB transactions require a replica set or sharded cluster. [See MongoDB docs](https://www.mongodb.com/docs/manual/core/transactions/). +- MongoDB Change Streams (used for wakeup signals) also require a replica set or sharded cluster. If your MongoDB instance is a standalone server, you must convert it to a single-node replica set by running `rs.initiate()` in the mongo shell. ### Custom Database @@ -552,15 +713,18 @@ EventProcessor(client, handlerMap, { ### Configuration Reference -| Option | Type | Default | Description | -| ------------------------- | ------------------------- | ----------- | ------------------------------------------- | -| `sleepTimeMs` | `number` | `5000` | Milliseconds between polling cycles | -| `maxErrors` | `number` | `5` | Max retry attempts before marking as failed | -| `backoff` | `(count: number) => Date` | Exponential | Calculate next retry time | -| `maxEventConcurrency` | `number` | `5` | Max events processed simultaneously | -| `maxHandlerConcurrency` | `number` | `10` | Max handlers per event running concurrently | -| `logger` | `Logger` | `undefined` | Custom logger interface | -| `onEventMaxErrorsReached` | `function` | `undefined` | Hook for max errors | +| Option | Type | Default | Description | +| ------------------------- | ------------------------- | ----------- | ----------------------------------------------------------------------------------- | +| `pollingIntervalMs` | `number` | `5000` | Milliseconds between polling cycles (used when no wakeup emitter or as fallback) | +| `maxErrors` | `number` | `5` | Max retry attempts before marking as failed | +| `backoff` | `(count: number) => Date` | Exponential | Calculate next retry time | +| `maxEventConcurrency` | `number` | `5` | Max events processed simultaneously | +| `maxHandlerConcurrency` | `number` | `10` | Max handlers per event running concurrently | +| `wakeupEmitter` | `WakeupEmitter` | `undefined` | Optional wakeup signal emitter (Postgres NOTIFY or MongoDB Change Streams) | +| `wakeupTimeoutMs` | `number` | `30000` | Fallback poll if no wakeup signal received (only used with wakeupEmitter) | +| `wakeupThrottleMs` | `number` | `1000` | Throttle wakeup signals to prevent excessive polling (only used with wakeupEmitter) | +| `logger` | `Logger` | `undefined` | Custom logger interface | +| `onEventMaxErrorsReached` | `function` | `undefined` | Hook for max errors | ## Usage Examples @@ -594,7 +758,7 @@ await client.connect(); // 3. Create and start the processor const processor = EventProcessor( - createProcessorClient(client), + createProcessorClient({ querier: client }), { UserCreated: { sendEmail: async (event, { signal }) => { @@ -715,7 +879,7 @@ gracefulShutdown(server, { ### Multiple Event Types ```typescript -const processor = EventProcessor(createProcessorClient(client), { +const processor = EventProcessor(createProcessorClient({ querier: client }), { UserCreated: { sendWelcomeEmail: async (event) => { /* ... */ @@ -808,7 +972,7 @@ const kafka = new Kafka({ brokers: ["localhost:9092"] }); const producer = kafka.producer(); await producer.connect(); -const processor = EventProcessor(createProcessorClient(client), { +const processor = EventProcessor(createProcessorClient({ querier: client }), { UserCreated: { // Publish to Kafka with guaranteed consistency publishToKafka: async (event) => { @@ -862,7 +1026,7 @@ const client = new pg.Client({ }); await client.connect(); -const processor = EventProcessor(createProcessorClient(client), { +const processor = EventProcessor(createProcessorClient({ querier: client }), { // All your handlers... }); @@ -932,11 +1096,11 @@ Creates a PostgreSQL processor client. ```typescript import { createProcessorClient } from "txob/pg"; -createProcessorClient( - client: pg.Client, - tableName?: string, // Default: "events" - limit?: number // Default: 100 -): TxOBProcessorClient +createProcessorClient(opts: { + querier: pg.Client; + table?: string; // Default: "events" + limit?: number; // Default: 100 +}): TxOBProcessorClient ``` ### `createProcessorClient` (MongoDB) @@ -946,13 +1110,35 @@ Creates a MongoDB processor client. ```typescript import { createProcessorClient } from "txob/mongodb"; -createProcessorClient( - db: mongodb.Db, - collectionName?: string, // Default: "events" - limit?: number // Default: 100 -): TxOBProcessorClient +createProcessorClient(opts: { + mongo: mongodb.MongoClient; + db: string; // Database name + collection?: string; // Default: "events" + limit?: number; // Default: 100 +}): TxOBProcessorClient +``` + +### `TxOBError` + +Error class to specify custom backoff times for retries. + +```typescript +import { TxOBError } from "txob"; + +// Throw with custom backoff time +throw new TxOBError("Rate limit exceeded", { + backoffUntil: new Date(Date.now() + 60000), // Retry after 1 minute +}); + +// Throw with cause and custom backoff +throw new TxOBError("Processing failed", { + cause: originalError, + backoffUntil: new Date(Date.now() + 30000), // Retry after 30 seconds +}); ``` +**Note:** If multiple handlers throw `TxOBError` with different `backoffUntil` dates, the processor will use the latest (maximum) backoff time. + ### `ErrorUnprocessableEventHandler` Error class to mark a handler as unprocessable (stops retrying). @@ -963,6 +1149,42 @@ import { ErrorUnprocessableEventHandler } from "txob"; throw new ErrorUnprocessableEventHandler(new Error("Invalid data")); ``` +### `createWakeupEmitter` (PostgreSQL) + +Creates a Postgres NOTIFY-based wakeup emitter to reduce polling frequency. + +```typescript +import { createWakeupEmitter } from "txob/pg"; + +const wakeupEmitter = await createWakeupEmitter({ + listenClientConfig: clientConfig, + createTrigger: true, // Automatically create database trigger + querier: client, // Required if createTrigger is true + table: "events", // Optional: table name (default: "events") + channel: "txob_events", // Optional: NOTIFY channel (default: "txob_events") +}); +``` + +The trigger automatically sends NOTIFY when new events are inserted. The wakeup emitter emits `wakeup` events that trigger immediate polling, reducing the need for constant polling. + +### `createWakeupEmitter` (MongoDB) + +Creates a MongoDB Change Stream-based wakeup emitter to reduce polling frequency. + +```typescript +import { createWakeupEmitter } from "txob/mongodb"; + +const wakeupEmitter = await createWakeupEmitter({ + mongo: mongoClient, + db: "myapp", + collection: "events", // Optional: collection name (default: "events") +}); +``` + +**Important:** MongoDB Change Streams require a replica set or sharded cluster. If your MongoDB instance is a standalone server, you must convert it to a single-node replica set by running `rs.initiate()` in the mongo shell. + +If the database is not configured for Change Streams, an error will be emitted via the `error` event on the returned `WakeupEmitter`. The processor will automatically fall back to polling. + ### Types ```typescript @@ -1126,7 +1348,7 @@ If using `FOR UPDATE SKIP LOCKED` properly (which txob does), stuck events are n - Lower `maxEventConcurrency` - Profile handlers for memory leaks - Archive old events -- Reduce `limit` in `createProcessorClient(client, table, limit)` +- Reduce `limit` in `createProcessorClient({ querier: client, table, limit })` ### Duplicate handler executions @@ -1405,7 +1627,7 @@ type EventType = keyof typeof eventTypes; // TypeScript will enforce all event types have handlers const processor = EventProcessor( - createProcessorClient(client), + createProcessorClient({ querier: client }), { UserCreated: { /* handlers */ diff --git a/architecture.mmd b/architecture.mmd new file mode 100644 index 0000000..c23f323 --- /dev/null +++ b/architecture.mmd @@ -0,0 +1,72 @@ +graph TB + subgraph Application["Your Application"] + AppTx["BEGIN TRANSACTION"] + AppData["1. Insert/Update business data
(users, orders, etc.)"] + AppEvent["2. Insert event record"] + AppCommit["COMMIT TRANSACTION"] + + AppTx --> AppData + AppData --> AppEvent + AppEvent --> AppCommit + end + + subgraph Database["Database"] + EventsTable[("Events Table
id, type, data,
processed_at, errors,
backoff_until")] + + AppCommit -->|"Both saved atomically ✅"| EventsTable + end + + subgraph WakeupSignals["Wakeup Signals (Optional)"] + direction TB + PGTrigger["PostgreSQL Trigger
(auto-created)"] + PGNotify["PostgreSQL NOTIFY
(LISTEN connection)"] + MongoStream["MongoDB Change Streams
(watches for inserts)"] + + EventsTable -->|"INSERT triggers"| PGTrigger + PGTrigger -->|"sends NOTIFY"| PGNotify + EventsTable -->|"INSERT operations"| MongoStream + end + + subgraph Polling["Polling Component
(Decoupled from Processing)"] + direction TB + WakeupListener["Wakeup Signal Listener
(on 'wakeup' event)"] + ThrottledPoll["Throttled Poll Function
(leading + trailing edge)
Prevents excessive polling"] + FallbackLoop["Fallback Polling Loop
(runs periodically)"] + TimeoutCheck["Check: time since last wakeup
> wakeupTimeoutMs?"] + + PGNotify -->|"emits 'wakeup'"| WakeupListener + MongoStream -->|"emits 'wakeup'"| WakeupListener + WakeupListener -->|"triggers"| ThrottledPoll + FallbackLoop -->|"waits pollingIntervalMs"| TimeoutCheck + TimeoutCheck -->|"yes: no recent wakeup"| ThrottledPoll + TimeoutCheck -->|"no: recent wakeup"| FallbackLoop + end + + subgraph Processing["Processing Queue & Processor"] + direction TB + PollQuery["getEventsToProcess()
SELECT unprocessed events
(FOR UPDATE SKIP LOCKED)"] + EventQueue["Processing Queue
(PQueue with concurrency limit)"] + ProcessEvent["processEvent()"] + LockEvent["Lock event in transaction
(getEventByIdForUpdateSkipLocked)"] + ExecuteHandlers["Execute handlers concurrently
(pLimit for handler concurrency)
send email, webhook, etc."] + UpdateEvent["updateEvent()
UPDATE with results & processed_at"] + ErrorHandling["On failure:
increment errors,
set backoff_until"] + + ThrottledPoll -->|"calls"| PollQuery + PollQuery -->|"returns event IDs"| EventQueue + EventQueue -->|"queues for processing"| ProcessEvent + ProcessEvent --> LockEvent + LockEvent -->|"event locked"| ExecuteHandlers + ExecuteHandlers -->|"all handlers complete"| UpdateEvent + ExecuteHandlers -->|"handler fails"| ErrorHandling + ErrorHandling --> UpdateEvent + UpdateEvent -->|"commits transaction"| EventsTable + end + + EventsTable -->|"queries for
unprocessed events"| PollQuery + + style Application fill:#e1f5ff + style Database fill:#fff4e1 + style WakeupSignals fill:#f0f4ff + style Polling fill:#e8f5e9 + style Processing fill:#fce4ec diff --git a/examples/pg/processor.ts b/examples/pg/processor.ts index 577b6f9..0143268 100644 --- a/examples/pg/processor.ts +++ b/examples/pg/processor.ts @@ -3,27 +3,40 @@ import { randomUUID } from "node:crypto"; import { ErrorUnprocessableEventHandler, EventProcessor, + WakeupEmitter, } from "../../src/index.js"; -import { createProcessorClient } from "../../src/pg/client.js"; +import { + createProcessorClient, + createWakeupEmitter, +} from "../../src/pg/client.js"; import { migrate, type EventType, eventTypes } from "./server.js"; import dotenv from "dotenv"; import { sleep } from "../../src/sleep.js"; dotenv.config(); let processor: EventProcessor | undefined = undefined; +let wakeupEmitter: WakeupEmitter | undefined = undefined; (async () => { - const client = new pg.Client({ + const clientConfig: pg.ClientConfig = { user: process.env.POSTGRES_USER, password: process.env.POSTGRES_PASSWORD, database: process.env.POSTGRES_DB, port: parseInt(process.env.POSTGRES_PORT || "5434"), - }); + }; + const client = new pg.Client(clientConfig); await client.connect(); await migrate(client); + wakeupEmitter = await createWakeupEmitter({ + listenClientConfig: clientConfig, + createTrigger: true, + querier: client, + }); + processor = new EventProcessor({ - client: createProcessorClient(client), + client: createProcessorClient({ querier: client }), + wakeupEmitter, handlerMap: { ResourceSaved: { thing1: async (event) => { @@ -91,20 +104,18 @@ let processor: EventProcessor | undefined = undefined; const shutdown = (() => { let shutdownStarted = false; - return () => { + return async () => { if (shutdownStarted) return; shutdownStarted = true; - processor - ?.stop() - .then(() => { - process.exit(0); - }) - .catch((err) => { - console.error(err); - process.exit(1); - }); + try { + await wakeupEmitter?.close(); + } catch (err) { + console.error(err); + process.exit(1); + } + process.exit(0); }; })(); process.once("SIGTERM", shutdown); diff --git a/package.json b/package.json index 1a73f0e..898e94d 100644 --- a/package.json +++ b/package.json @@ -59,7 +59,8 @@ }, "dependencies": { "p-limit": "^7.2.0", - "p-queue": "^9.0.1" + "p-queue": "^9.0.1", + "throttle-debounce": "^5.0.2" }, "peerDependencies": { "mongodb": "^7.0.0", @@ -69,6 +70,7 @@ "@types/mongodb": "^4.0.7", "@types/node": "^25.0.3", "@types/pg": "^8.10.9", + "@types/throttle-debounce": "^5.0.2", "@vitest/coverage-v8": "^4.0.15", "copyfiles": "^2.4.1", "mongodb": "^7.0.0", diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts index 0bbb2c2..bc3ea20 100644 --- a/src/mongodb/client.ts +++ b/src/mongodb/client.ts @@ -1,9 +1,11 @@ -import { MongoClient, ObjectId } from "mongodb"; +import { EventEmitter } from "node:events"; +import { MongoClient, ObjectId, type ChangeStream } from "mongodb"; import type { TxOBEvent, TxOBProcessorClient, TxOBProcessorClientOpts, TxOBTransactionProcessorClient, + WakeupEmitter, } from "../processor.js"; import { getDate } from "../date.js"; @@ -20,12 +22,17 @@ const createReadyToProcessFilter = (maxErrors: number) => ({ errors: { $lt: maxErrors }, }); +export type CreateProcessorClientOpts = { + mongo: MongoClient; + db: string; + collection?: string; + limit?: number; +}; + export const createProcessorClient = ( - mongo: MongoClient, - db: string, - collection: string = "events", - limit: number = 100, + opts: CreateProcessorClientOpts, ): TxOBProcessorClient => { + const { mongo, db, collection = "events", limit = 100 } = opts; const getEventsToProcess = async ( opts: TxOBProcessorClientOpts, ): Promise, "id" | "errors">[]> => { @@ -142,3 +149,87 @@ export const createProcessorClient = ( transaction, }; }; + +type CreateWakeupEmitterOpts = { + mongo: MongoClient; + db: string; + collection?: string; +}; + +/** + * Creates a MongoDB Change Stream-based wakeup emitter for reducing polling frequency. + * This watches for INSERT operations on the events collection and emits wakeup signals. + * + * **Important**: MongoDB Change Streams require a replica set or sharded cluster. + * If your MongoDB instance is a standalone server, you must convert it to a single-node + * replica set by running `rs.initiate()` in the mongo shell. + * + * If the database is not configured for Change Streams, an error will be emitted via + * the 'error' event on the returned WakeupEmitter. The error typically occurs when + * the change stream attempts to connect. + * + * See: https://www.mongodb.com/docs/manual/changeStreams/ + * + * @param opts - Options for the wakeup emitter + * @returns A WakeupEmitter that emits 'wakeup' events when new events are inserted. + * Errors (including replica set requirement failures) are emitted via the 'error' event. + * @throws Does not throw synchronously. Errors are emitted via the 'error' event. + */ +export const createWakeupEmitter = async ( + opts: CreateWakeupEmitterOpts, +): Promise Promise }> => { + const { mongo, db, collection = "events" } = opts; + const emitter = new EventEmitter(); + + // Get the collection to watch + const eventsCollection = mongo.db(db).collection(collection); + + // Create a change stream that watches for insert operations + // We only care about inserts - retries after backoff are handled by fallback polling + // Note: watch() may not error immediately - errors typically occur when the change + // stream attempts to connect, which happens asynchronously + const changeStream: ChangeStream = eventsCollection.watch( + [ + { + $match: { + operationType: "insert", + }, + }, + ], + { + fullDocument: "default", + }, + ); + + // Handle change stream events + changeStream.on("change", () => { + emitter.emit("wakeup"); + }); + + // Handle change stream errors + // Common errors include: + // - "Change streams are only supported on replica sets" (standalone MongoDB) + // - Connection errors + // - Permission errors + changeStream.on("error", (err) => { + emitter.emit("error", err); + }); + + // Handle change stream close + changeStream.on("close", () => { + emitter.emit("error", new Error("MongoDB Change Stream closed")); + }); + + // Return a WakeupEmitter that wraps the EventEmitter + return { + on: (event: "wakeup", listener: () => void) => { + emitter.on(event, listener); + }, + off: (event: "wakeup", listener: () => void) => { + emitter.off(event, listener); + }, + close: async () => { + await changeStream.close(); + }, + }; +}; diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts index f0ff1c7..1a8fda8 100644 --- a/src/pg/client.test.ts +++ b/src/pg/client.test.ts @@ -6,7 +6,7 @@ describe("createProcessorClient", () => { const pgClient = { query: vi.fn(), } as any; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); expect(typeof client.getEventsToProcess).toBe("function"); expect(typeof client.transaction).toBe("function"); }); @@ -25,7 +25,7 @@ describe("getEventsToProcess", () => { const opts = { maxErrors: 10, }; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); const result = await client.getEventsToProcess(opts); expect(pgClient.query).toHaveBeenCalledOnce(); expect(pgClient.query).toHaveBeenCalledWith( @@ -41,7 +41,7 @@ describe("transaction", () => { const pgClient = { query: vi.fn(() => Promise.resolve()), } as any; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); await client.transaction(async () => {}); expect(pgClient.query).toHaveBeenCalledTimes(2); expect(pgClient.query).toHaveBeenNthCalledWith(1, "BEGIN"); @@ -51,7 +51,7 @@ describe("transaction", () => { const pgClient = { query: vi.fn(() => Promise.resolve()), } as any; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); await client .transaction(async () => { throw new Error("error"); @@ -74,7 +74,7 @@ describe("transaction", () => { ), } as any; const eventId = "123"; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); let result: any; await client.transaction(async (txClient) => { result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { @@ -101,7 +101,7 @@ describe("transaction", () => { ), } as any; const eventId = "123"; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); let result: any; await client.transaction(async (txClient) => { result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { @@ -141,7 +141,7 @@ describe("transaction", () => { }, correlation_id: "abc123", }; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); await client.transaction(async (txClient) => { await txClient.updateEvent(event); }); @@ -176,7 +176,7 @@ describe("transaction", () => { handler_results: {}, errors: 0, }; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); await client.transaction(async (txClient) => { await txClient.createEvent(event); }); @@ -219,7 +219,7 @@ describe("transaction", () => { }), } as any; - const client = createProcessorClient(pgClient); + const client = createProcessorClient({ querier: pgClient }); try { await client.transaction(async () => { diff --git a/src/pg/client.ts b/src/pg/client.ts index db7fef0..2e33567 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -1,10 +1,18 @@ -import { Client, escapeIdentifier } from "pg"; +import { + Client, + escapeIdentifier, + escapeLiteral, + type ClientConfig, + DatabaseError, +} from "pg"; import type { TxOBEvent, TxOBProcessorClient, TxOBProcessorClientOpts, TxOBTransactionProcessorClient, + WakeupEmitter, } from "../processor.js"; +import { EventEmitter } from "node:events"; interface Querier { query: Client["query"]; @@ -13,18 +21,25 @@ interface Querier { // TODO: leverage the signal option that comes in on options for `getEventsToProcess` and `getEventByIdForUpdateSkipLocked` // to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774 +export type CreateProcessorClientOpts = { + querier: Querier; + table?: string; + limit?: number; +}; + export const createProcessorClient = ( - querier: Querier, - table: string = "events", - limit: number = 100, + opts: CreateProcessorClientOpts, ): TxOBProcessorClient => { + const { querier, table = "events", limit = 100 } = opts; + const _table = table; + const _limit = limit; const getEventsToProcess = async ( opts: TxOBProcessorClientOpts, ): Promise, "id" | "errors">[]> => { const events = await querier.query< Pick, "id" | "errors"> >( - `SELECT id, errors FROM ${escapeIdentifier(table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT ${limit}`, + `SELECT id, errors FROM ${escapeIdentifier(_table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT ${_limit}`, [opts.maxErrors], ); return events.rows; @@ -43,7 +58,7 @@ export const createProcessorClient = ( opts: TxOBProcessorClientOpts, ): Promise | null> => { const event = await querier.query>( - `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${escapeIdentifier(table)} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`, + `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${escapeIdentifier(_table)} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`, [eventId, opts.maxErrors], ); if (event.rowCount === 0) { @@ -54,7 +69,7 @@ export const createProcessorClient = ( }, updateEvent: async (event: TxOBEvent): Promise => { await querier.query( - `UPDATE ${escapeIdentifier(table)} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`, + `UPDATE ${escapeIdentifier(_table)} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`, [ event.handler_results, event.errors, @@ -68,7 +83,7 @@ export const createProcessorClient = ( event: Omit, "processed_at" | "backoff_until">, ): Promise => { await querier.query( - `INSERT INTO ${escapeIdentifier(table)} (id, timestamp, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6, $7)`, + `INSERT INTO ${escapeIdentifier(_table)} (id, timestamp, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6, $7)`, [ event.id, event.timestamp, @@ -107,3 +122,184 @@ export const createProcessorClient = ( transaction, }; }; + +type CreateWakeupEmitterOpts = + | { + listenClientConfig: ClientConfig; + channel?: string; + createTrigger: true; + table?: string; + querier: Querier; + } + | { + listenClientConfig: ClientConfig; + channel?: string; + createTrigger?: false; + table?: string; + querier?: Querier; + }; + +/** + * Creates a Postgres NOTIFY-based wakeup emitter for reducing polling frequency. + * This uses a separate connection for LISTEN, as you cannot LISTEN on a connection + * that's used for queries. + * + * @param opts - Options for the wakeup emitter. If `createTrigger` is `true`, `querier` is required. + * @returns A WakeupEmitter that emits 'wakeup' events when Postgres NOTIFY is received + */ +export const createWakeupEmitter = async ( + opts: CreateWakeupEmitterOpts, +): Promise => { + const { + listenClientConfig, + channel = "txob_events", + table = "events", + } = opts; + const emitter = new EventEmitter(); + + // Create a separate client for LISTEN + const listenClient = new Client(listenClientConfig); + await listenClient.connect(); + + // Set up LISTEN - channel names are identifiers + // Note: Postgres channel names are case-insensitive and converted to lowercase + const listenChannel = channel.toLowerCase(); + await listenClient.query(`LISTEN ${escapeIdentifier(listenChannel)}`); + + // Handle notifications + listenClient.on("notification", (msg) => { + // msg.channel is already lowercase from Postgres + if (msg.channel === listenChannel) { + emitter.emit("wakeup"); + } + }); + + // Handle connection errors + listenClient.on("error", (err) => { + emitter.emit("error", err); + }); + + // Handle disconnection + listenClient.on("end", () => { + emitter.emit("error", new Error("Postgres LISTEN connection ended")); + }); + + // Create trigger if requested + if (opts.createTrigger && opts.querier) { + await createWakeupTrigger({ + querier: opts.querier, + table, + channel: listenChannel, + }); + } + + // Return a WakeupEmitter that wraps the EventEmitter + return { + on: (event: "wakeup", listener: () => void) => { + emitter.on(event, listener); + }, + off: (event: "wakeup", listener: () => void) => { + emitter.off(event, listener); + // If no more listeners, we could optionally close the connection + // But we'll leave it open for potential re-use + }, + // Expose cleanup method (not part of interface but useful) + close: async () => { + await listenClient.query(`UNLISTEN ${escapeIdentifier(listenChannel)}`); + await listenClient.end(); + }, + } as WakeupEmitter & { close: () => Promise }; +}; + +type CreateWakeupTriggerOpts = { + querier: Querier; + table?: string; + channel?: string; +}; + +/** + * Creates a Postgres trigger that sends NOTIFY when events are inserted. + * Wakeup signals are primarily for new events - retries after backoff are handled + * by fallback polling. + * + * This function is safe for concurrent execution - multiple processes can call it + * simultaneously without errors. It gracefully handles cases where the trigger + * already exists by catching and ignoring duplicate object errors. + * + * @param opts - Options for the wakeup trigger + * @returns Promise that resolves when the trigger is created + */ +export const createWakeupTrigger = async ( + opts: CreateWakeupTriggerOpts, +): Promise => { + const { querier, table = "events", channel = "txob_events" } = opts; + const triggerName = `txob_wakeup_trigger_${table}`; + const functionName = `txob_wakeup_notify_${table}`; + + try { + // Create the function that sends NOTIFY + // CREATE OR REPLACE is safe for concurrent execution + // Note: channel name is embedded in the function body using escapeLiteral for safety + // pg_notify expects a text parameter, so we use escapeLiteral to safely embed the channel name + const channelLiteral = escapeLiteral(channel); + await querier.query(` + CREATE OR REPLACE FUNCTION ${escapeIdentifier(functionName)}() + RETURNS TRIGGER AS $$ + BEGIN + -- Send NOTIFY when a new event is inserted + -- Only trigger on INSERT - retries after backoff are handled by fallback polling + IF TG_OP = 'INSERT' THEN + PERFORM pg_notify(${channelLiteral}, ''); + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + `); + + // Try to create the trigger + // If it already exists, PostgreSQL will throw a duplicate_object error (42710) + // which we'll catch and ignore below + await querier.query(` + CREATE TRIGGER ${escapeIdentifier(triggerName)} + AFTER INSERT ON ${escapeIdentifier(table)} + FOR EACH ROW + EXECUTE FUNCTION ${escapeIdentifier(functionName)}(); + `); + } catch (error: unknown) { + // Handle errors that can occur during concurrent trigger creation + // Note: pg errors may not always be DatabaseError instances, but they have a 'code' field + const errorCode = + typeof error === "object" && + error !== null && + "code" in error && + typeof error.code === "string" + ? error.code + : undefined; + const errorMessage = error instanceof Error ? error.message : String(error); + + // 42710: duplicate_object - trigger already exists + // This is safe to ignore - another process already created it + if (errorCode === "42710") { + return; + } + // XX000: internal_error - can include "tuple concurrently updated" + // This happens when multiple processes try to modify system catalogs simultaneously + // It's safe to ignore - one process will succeed, others will get this error + if ( + errorCode === "XX000" && + errorMessage.includes("tuple concurrently updated") + ) { + return; + } + // Also check error message as fallback for duplicate trigger errors + // Sometimes the error code might not be set correctly + if ( + errorMessage.includes("already exists") && + (errorMessage.includes("trigger") || errorMessage.includes("relation")) + ) { + return; + } + // Re-throw other errors + throw error; + } +}; diff --git a/src/processor.ts b/src/processor.ts index 64e19a2..2f09f67 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -4,6 +4,46 @@ import pLimit from "p-limit"; import { deepClone } from "./clone.js"; import PQueue from "p-queue"; import { ErrorUnprocessableEventHandler, TxOBError } from "./error.js"; +import { throttle } from "throttle-debounce"; + +// Helper function to sleep with abort signal support for faster cleanup +const sleepWithAbort = (ms: number, signal?: AbortSignal): Promise => { + if (signal?.aborted) { + return Promise.resolve(); + } + if (!signal) { + return sleep(ms); + } + + let abortHandler: (() => void) | null = null; + + const cleanup = () => { + if (abortHandler) { + signal.removeEventListener("abort", abortHandler); + abortHandler = null; + } + }; + + return Promise.race([ + sleep(ms).finally(() => { + // Always clean up listener when sleep completes (normally or if aborted) + cleanup(); + }), + new Promise((resolve) => { + abortHandler = () => { + cleanup(); + resolve(); + }; + // Check again after setting up listener in case abort happened between checks + if (signal.aborted) { + cleanup(); + resolve(); + return; + } + signal.addEventListener("abort", abortHandler); + }), + ]); +}; type TxOBEventHandlerResult = { processed_at?: Date; @@ -44,6 +84,12 @@ export type TxOBProcessorClientOpts = { maxErrors: number; }; +export interface WakeupEmitter { + on(event: "wakeup", listener: () => void): void; + off(event: "wakeup", listener: () => void): void; + close(): Promise; +} + export interface TxOBProcessorClient { getEventsToProcess( opts: TxOBProcessorClientOpts, @@ -74,11 +120,13 @@ export const defaultBackoff = (errorCount: number): Date => { return retryTimestamp; }; -const defaultPollingIntervalMs = 5000; +const defaultPollingIntervalMs = 5_000; const defaultMaxErrors = 5; const defaultMaxEventConcurrency = 20; const defaultMaxHandlerConcurrency = 10; -const defaultMaxQueuedEvents = 100; +const defaultMaxQueuedEvents = 500; +const defaultWakeupTimeoutMs = 30_000; // 30 second timeout before fallback polling +const defaultWakeupThrottleMs = 1_000; // 1s throttle for wakeup signals to prevent excessive polling type TxOBProcessEventsOpts = { maxErrors: number; @@ -357,17 +405,28 @@ export class EventProcessor { private opts: Omit, "signal"> & { pollingIntervalMs: number; maxQueuedEvents: number; + wakeupTimeoutMs: number; + wakeupThrottleMs: number; }; private abortController: AbortController; private queue: PQueue; private state: "stopped" | "started" | "stopping" = "stopped"; + private wakeupEmitter?: WakeupEmitter; + private wakeupListener?: () => void; + private throttledPoll?: ReturnType; + private lastWakeupTime: number = Date.now(); + private isPolling: boolean = false; constructor({ client, handlerMap, + wakeupEmitter, ...opts }: Omit>, "signal"> & { pollingIntervalMs?: number; + wakeupTimeoutMs?: number; + wakeupThrottleMs?: number; + wakeupEmitter?: WakeupEmitter; } & { client: TxOBProcessorClient; handlerMap: TxOBEventHandlerMap; @@ -379,10 +438,13 @@ export class EventProcessor { maxEventConcurrency: defaultMaxEventConcurrency, maxHandlerConcurrency: defaultMaxHandlerConcurrency, maxQueuedEvents: defaultMaxQueuedEvents, + wakeupTimeoutMs: defaultWakeupTimeoutMs, + wakeupThrottleMs: defaultWakeupThrottleMs, ...opts, }; this.client = client; this.handlerMap = handlerMap; + this.wakeupEmitter = wakeupEmitter; this.opts = _opts; this.abortController = new AbortController(); this.queue = new PQueue({ @@ -400,84 +462,182 @@ export class EventProcessor { const queuedEventIds: Set = new Set(); - (async () => { + // Poll function that can be called from wakeup signals or polling loop + const poll = async (): Promise => { + if (this.abortController.signal.aborted) { + return; + } + + // Prevent concurrent polls + if (this.isPolling) { + this.opts.logger?.debug("skipping poll - already polling"); + return; + } + + this.isPolling = true; try { - do { - try { - // Skip polling if we're at capacity to prevent memory leaks - if (queuedEventIds.size >= this.opts.maxQueuedEvents) { - this.opts.logger?.debug( - { - queuedCount: queuedEventIds.size, - maxQueuedEvents: this.opts.maxQueuedEvents, - }, - "skipping poll - queue at capacity", - ); - await sleep(this.opts.pollingIntervalMs); - continue; - } + // Skip polling if we're at capacity to prevent memory leaks + if (queuedEventIds.size >= this.opts.maxQueuedEvents) { + this.opts.logger?.debug( + { + queuedCount: queuedEventIds.size, + maxQueuedEvents: this.opts.maxQueuedEvents, + }, + "skipping poll - queue at capacity", + ); + return; + } - const events = await this.client.getEventsToProcess({ - ...this.opts, - signal: this.abortController.signal, + const events = await this.client.getEventsToProcess({ + ...this.opts, + signal: this.abortController.signal, + }); + + const unqueuedEvents = events.filter( + (event) => !queuedEventIds.has(event.id), + ); + this.opts.logger?.debug( + `found ${unqueuedEvents.length} events to process`, + ); + + for (const event of unqueuedEvents) { + queuedEventIds.add(event.id); + this.queue + .add( + async () => { + try { + await processEvent({ + client: this.client, + handlerMap: this.handlerMap, + unlockedEvent: event, + opts: { + ...this.opts, + signal: this.abortController.signal, + }, + }); + } catch (error) { + this.opts.logger?.error( + { + eventId: event.id, + error, + }, + "error processing event", + ); + } finally { + queuedEventIds.delete(event.id); + } + }, + { signal: this.abortController.signal }, + ) + .catch((error) => { + // Handle queue.add() rejections (e.g., when aborted) + // The event processing error is already logged in the task's catch block + queuedEventIds.delete(event.id); }); + } + } catch (error) { + this.opts.logger?.error( + { error }, + "error polling for events, will retry", + ); + // Continue polling even on error + } finally { + this.isPolling = false; + } + }; - const unqueuedEvents = events.filter( - (event) => !queuedEventIds.has(event.id), - ); - this.opts.logger?.debug( - `found ${unqueuedEvents.length} events to process`, + if (this.wakeupEmitter) { + // Setup wakeup signal listener with combined leading and trailing edge throttle + // Using throttle-debounce library for robust throttling behavior + // Leading edge: Poll immediately on the first signal + // Trailing edge: If signals keep coming, also poll after a delay from the last signal + // This provides both low latency (leading edge) and ensures we catch events + // even if signals arrive in a continuous burst (trailing edge) + const throttleMs = this.opts.wakeupThrottleMs; + + // Create throttled poll function with both leading and trailing edges + // noLeading: false (default) = execute immediately on first signal (leading edge) + // noTrailing: false (default) = also execute after delay from last signal (trailing edge) + // This throttled poll is used by both wakeup signals and fallback polling to prevent conflicts + this.throttledPoll = throttle(throttleMs, () => { + if (this.abortController.signal.aborted) { + return; + } + this.lastWakeupTime = Date.now(); + this.opts.logger?.debug("triggering throttled poll"); + void poll(); + }); + + this.wakeupListener = () => { + if (this.abortController.signal.aborted) { + return; + } + this.opts.logger?.debug("received wakeup signal"); + this.throttledPoll?.(); + }; + + this.wakeupEmitter.on("wakeup", this.wakeupListener); + this.opts.logger?.debug("wakeup signal listener registered"); + + // Initial poll + void poll(); + + // Start fallback polling loop + // This runs at a lower frequency and only polls if we haven't received a wakeup signal recently + // Uses the same throttled poll function to prevent conflicts with wakeup-triggered polls + (async () => { + try { + do { + await sleepWithAbort( + this.opts.pollingIntervalMs, + this.abortController.signal, ); - for (const event of unqueuedEvents) { - queuedEventIds.add(event.id); - this.queue - .add( - async () => { - try { - await processEvent({ - client: this.client, - handlerMap: this.handlerMap, - unlockedEvent: event, - opts: { - ...this.opts, - signal: this.abortController.signal, - }, - }); - } catch (error) { - this.opts.logger?.error( - { - eventId: event.id, - error, - }, - "error processing event", - ); - } finally { - queuedEventIds.delete(event.id); - } - }, - { signal: this.abortController.signal }, - ) - .catch((error) => { - // Handle queue.add() rejections (e.g., when aborted) - // The event processing error is already logged in the task's catch block - queuedEventIds.delete(event.id); - }); + if (this.abortController.signal.aborted) { + break; } - } catch (error) { - this.opts.logger?.error( - { error }, - "error polling for events, will retry", - ); - // Continue polling even on error - } - await sleep(this.opts.pollingIntervalMs); - } while (!this.abortController.signal.aborted); - } catch (error) { - this.opts.logger?.error({ error }, "polling loop error"); - } - })(); + // Check if we've received a wakeup signal recently + const timeSinceLastWakeup = Date.now() - this.lastWakeupTime; + if (timeSinceLastWakeup >= this.opts.wakeupTimeoutMs) { + this.opts.logger?.debug( + { + timeSinceLastWakeup, + wakeupTimeoutMs: this.opts.wakeupTimeoutMs, + }, + "fallback poll triggered - no wakeup signal received", + ); + this.throttledPoll?.(); + } else { + this.opts.logger?.debug( + { + timeSinceLastWakeup, + wakeupTimeoutMs: this.opts.wakeupTimeoutMs, + }, + "skipping fallback poll - wakeup signal received recently", + ); + } + } while (!this.abortController.signal.aborted); + } catch (error) { + this.opts.logger?.error({ error }, "fallback polling loop error"); + } + })(); + } else { + // Standard polling loop + (async () => { + try { + do { + await poll(); + await sleepWithAbort( + this.opts.pollingIntervalMs, + this.abortController.signal, + ); + } while (!this.abortController.signal.aborted); + } catch (error) { + this.opts.logger?.error({ error }, "polling loop error"); + } + })(); + } } async stop(opts?: { timeoutMs?: number }): Promise { @@ -493,6 +653,18 @@ export class EventProcessor { ...opts, }; + // Clean up wakeup listener + if (this.wakeupListener && this.wakeupEmitter) { + this.wakeupEmitter.off("wakeup", this.wakeupListener); + this.wakeupListener = undefined; + } + + // Cancel throttled poll (cleans up any pending timers) + if (this.throttledPoll) { + (this.throttledPoll as { cancel?: () => void }).cancel?.(); + this.throttledPoll = undefined; + } + this.abortController.abort(); this.queue.pause(); diff --git a/yarn.lock b/yarn.lock index e261d77..043f9ea 100644 --- a/yarn.lock +++ b/yarn.lock @@ -397,6 +397,11 @@ pg-protocol "*" pg-types "^2.2.0" +"@types/throttle-debounce@^5.0.2": + version "5.0.2" + resolved "https://registry.yarnpkg.com/@types/throttle-debounce/-/throttle-debounce-5.0.2.tgz#3489d91673a4be830c2c9e2acf1f6cdab724102c" + integrity sha512-pDzSNulqooSKvSNcksnV72nk8p7gRqN8As71Sp28nov1IgmPKWbOEIwAWvBME5pPTtaXJAvG3O4oc76HlQ4kqQ== + "@types/webidl-conversions@*": version "7.0.3" resolved "https://registry.yarnpkg.com/@types/webidl-conversions/-/webidl-conversions-7.0.3.tgz#1306dbfa53768bcbcfc95a1c8cde367975581859" @@ -2012,6 +2017,11 @@ supports-preserve-symlinks-flag@^1.0.0: resolved "https://registry.yarnpkg.com/supports-preserve-symlinks-flag/-/supports-preserve-symlinks-flag-1.0.0.tgz#6eda4bd344a3c94aea376d4cc31bc77311039e09" integrity sha512-ot0WnXS9fgdkgIcePe6RHNk1WA8+muPa6cSjeR3V8K27q9BB1rTE3R1p7Hv0z1ZyAc8s6Vvv8DIyWf681MAt0w== +throttle-debounce@^5.0.2: + version "5.0.2" + resolved "https://registry.yarnpkg.com/throttle-debounce/-/throttle-debounce-5.0.2.tgz#ec5549d84e053f043c9fd0f2a6dd892ff84456b1" + integrity sha512-B71/4oyj61iNH0KeCamLuE2rmKuTO5byTOSVwECM5FA7TiAiAW+UqTKZ9ERueC4qvgSttUhdmq1mXC3kJqGX7A== + through2@^2.0.1: version "2.0.5" resolved "https://registry.yarnpkg.com/through2/-/through2-2.0.5.tgz#01c1e39eb31d07cb7d03a96a70823260b23132cd"