diff --git a/package-lock.json b/package-lock.json index e5dce64f3..4273c7480 100644 --- a/package-lock.json +++ b/package-lock.json @@ -68,7 +68,6 @@ "integrity": "sha512-/JXIUuKsvkaneaiA9ckk3ksFTqvu0mDNlChASrTe2BnDsvMbhQdPWyqQjJ9WRJWVhhs5TWn1/0Pp1G6Rv8Syrw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "escape-string-regexp": "^5.0.0", "execa": "^5.1.1" @@ -985,7 +984,6 @@ "integrity": "sha512-cQbWBpxcbbs/IUredIPkHiAGULLV8iwgNRMFzvbhEXISp4f3rUUXE5+TIw6KwUWUR3DwyI6gmBRnmAtYaWehwQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@ampproject/remapping": "^2.1.0", "@babel/code-frame": "^7.18.6", @@ -2647,23 +2645,6 @@ "tslib": "^2.4.0" } }, - "node_modules/@floating-ui/core": { - "version": "1.7.2", - "resolved": "https://registry.npmjs.org/@floating-ui/core/-/core-1.7.2.tgz", - "integrity": "sha512-wNB5ooIKHQc+Kui96jE/n69rHFWAVoxn5CAzL1Xdd8FG03cgY3MLO+GF9U3W737fYDSgPWA6MReKhBQBop6Pcw==", - "license": "MIT", - "optional": true, - "dependencies": { - "@floating-ui/utils": "^0.2.10" - } - }, - "node_modules/@floating-ui/utils": { - "version": "0.2.10", - "resolved": "https://registry.npmjs.org/@floating-ui/utils/-/utils-0.2.10.tgz", - "integrity": "sha512-aGTxbpbg8/b5JfU1HXSrbH3wXZuLPJcNEcZQFMxLs3oSzgtVu6nFPkbbGGUvBcUjKV2YyB9Wxxabo+HEH9tcRQ==", - "license": "MIT", - "optional": true - }, "node_modules/@gar/promisify": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/@gar/promisify/-/promisify-1.1.3.tgz", @@ -2691,6 +2672,10 @@ "resolved": "packages/extension-redis", "link": true }, + "node_modules/@hocuspocus/extension-redis-affinity": { + "resolved": "packages/extension-redis-affinity", + "link": true + }, "node_modules/@hocuspocus/extension-s3": { "resolved": "packages/extension-s3", "link": true @@ -3726,7 +3711,6 @@ "dev": true, "hasInstallScript": true, "license": "MIT", - "peer": true, "dependencies": { "@napi-rs/wasm-runtime": "0.2.4", "@yarnpkg/lockfile": "^1.1.0", @@ -5107,7 +5091,6 @@ "integrity": "sha512-/g2d4sW9nUDJOMz3mabVQvOGhVa4e/BN/Um7yca9Bb2XTzPPnfTWHWQg+IsEYO7M3Vx+EXvaM/I2pJWIMun1bg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@octokit/auth-token": "^4.0.0", "@octokit/graphql": "^7.1.0", @@ -5291,7 +5274,6 @@ "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.6.0.tgz", "integrity": "sha512-aR0uffYI700OEEH4gYnitAnv3vzVGXCFvYfdpu/CJKvk4pHfLPEy/JSZyrpQ+15WhXe1yJRXLtfQ84s4mEXnPg==", "license": "MIT", - "peer": true, "dependencies": { "cluster-key-slot": "1.1.2", "generic-pool": "3.9.0", @@ -6954,7 +6936,6 @@ "resolved": "https://registry.npmjs.org/@tiptap/core/-/core-3.0.1.tgz", "integrity": "sha512-H0xOnDE5TF3bsCLq2FiFg69TWTzyHxyJdQ9D5m/P++QgLN8t2olGGznk4s1I+lxI3FB1YtIKMwBggRQuSQsclg==", "license": "MIT", - "peer": true, "funding": { "type": "github", "url": "https://github.com/sponsors/ueberdosis" @@ -6968,7 +6949,6 @@ "resolved": "https://registry.npmjs.org/@tiptap/pm/-/pm-3.0.1.tgz", "integrity": "sha512-G6eusuS7BMFVNQvA1irkJtSeJCoj6GczalJifRnukklfd2ZD18ZDx+xmzu25oLISQH9cPKmKIREmTTuMt+s2og==", "license": "MIT", - "peer": true, "dependencies": { "prosemirror-changeset": "^2.3.0", "prosemirror-collab": "^1.3.1", @@ -7210,7 +7190,6 @@ "resolved": "https://registry.npmjs.org/@tiptap/extension-list/-/extension-list-3.0.1.tgz", "integrity": "sha512-dLla05A9yp2owQYGKsE0ZMDdgieZXQANOHt4zHzqG97Ttnt7PD4reNNqyvbKQsgHqzmZ1w7HwBgP12D4NDACmw==", "license": "MIT", - "peer": true, "funding": { "type": "github", "url": "https://github.com/sponsors/ueberdosis" @@ -7316,7 +7295,6 @@ "resolved": "https://registry.npmjs.org/@tiptap/extensions/-/extensions-3.0.1.tgz", "integrity": "sha512-A5SrGDFDn230ucTWh1eByimHHc4THPP5No0+ptqLkc2LzWgxlNT1dUbyILoGqjsVjZdkgJravPPDXH6u/h/o2w==", "license": "MIT", - "peer": true, "funding": { "type": "github", "url": "https://github.com/sponsors/ueberdosis" @@ -7654,7 +7632,6 @@ "integrity": "sha512-oxLPMytKchWGbnQM9O7D67uPa9paTNxO7jVoNMXgkkErULBPhPARCfkKL9ytcIJJRGjbsVwW4ugJzyFFvm/Tiw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.0.2" } @@ -8559,7 +8536,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "caniuse-lite": "^1.0.30001688", "electron-to-chromium": "^1.5.73", @@ -12195,7 +12171,6 @@ "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.6.1.tgz", "integrity": "sha512-UxC0Yv1Y4WRJiGQxQkP0hfdL0/5/6YvdfOOClRgJ0qppSarkhneSa6UvkMkms0AkdGimSH3Ikqm+6mkMmX7vGA==", "license": "MIT", - "peer": true, "dependencies": { "@ioredis/commands": "^1.1.1", "cluster-key-slot": "^1.1.0", @@ -13467,7 +13442,6 @@ "dev": true, "hasInstallScript": true, "license": "MIT", - "peer": true, "dependencies": { "@napi-rs/wasm-runtime": "0.2.4", "@yarnpkg/lockfile": "^1.1.0", @@ -13716,9 +13690,9 @@ } }, "node_modules/lib0": { - "version": "0.2.104", - "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.104.tgz", - "integrity": "sha512-1tqKRANSPTcjs/yjPoKh52oRM2u5AYdd8jie8sDiN8/5kpWWiQSHUGgtB4VEXLw1chVL3QPSPp8q9RWqzSn2FA==", + "version": "0.2.114", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.114.tgz", + "integrity": "sha512-gcxmNFzA4hv8UYi8j43uPlQ7CGcyMJ2KQb5kZASw6SnAKAf10hK12i2fjrS3Cl/ugZa5Ui6WwIu1/6MIXiHttQ==", "license": "MIT", "dependencies": { "isomorphic.js": "^0.2.4" @@ -16576,7 +16550,6 @@ "integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -17028,7 +17001,6 @@ "resolved": "https://registry.npmjs.org/prosemirror-model/-/prosemirror-model-1.25.1.tgz", "integrity": "sha512-AUvbm7qqmpZa5d9fPKMvH1Q5bqYQvAZWOGRvxsB6iFLyycvC9MwNemNVjHVrWgjaoxAfY8XVg7DbvQ/qxvI9Eg==", "license": "MIT", - "peer": true, "dependencies": { "orderedmap": "^2.0.0" } @@ -17058,7 +17030,6 @@ "resolved": "https://registry.npmjs.org/prosemirror-state/-/prosemirror-state-1.4.3.tgz", "integrity": "sha512-goFKORVbvPuAQaXhpbemJFRKJ2aixr+AZMGiquiqKxaucC6hlpHNZHWgz5R7dS4roHiwq9vDctE//CZ++o0W1Q==", "license": "MIT", - "peer": true, "dependencies": { "prosemirror-model": "^1.0.0", "prosemirror-transform": "^1.0.0", @@ -17119,7 +17090,6 @@ "resolved": "https://registry.npmjs.org/prosemirror-view/-/prosemirror-view-1.39.2.tgz", "integrity": "sha512-BmOkml0QWNob165gyUxXi5K5CVUgVPpqMEAAml/qzgKn9boLUWVPzQ6LtzXw8Cn1GtRQX4ELumPxqtLTDaAKtg==", "license": "MIT", - "peer": true, "dependencies": { "prosemirror-model": "^1.20.0", "prosemirror-state": "^1.0.0", @@ -17268,7 +17238,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.1.0.tgz", "integrity": "sha512-FS+XFBNvn3GTAWq26joslQgWNoFu08F4kl0J4CgdNKADkdSGXQyTCnKteIAJy96Br6YbpEU1LSzV5dYtjMkMDg==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -17278,7 +17247,6 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.1.0.tgz", "integrity": "sha512-Xs1hdnE+DyKgeHJeJznQmYMIBG3TKIHJJT95Q58nHLSrElKlGQqDTR2HQ9fx5CN/Gk6Vh/kupBTDLU11/nDk/g==", "license": "MIT", - "peer": true, "dependencies": { "scheduler": "^0.26.0" }, @@ -17893,7 +17861,6 @@ "integrity": "sha512-Noe455xmA96nnqH5piFtLobsGbCij7Tu+tb3c1vYjNbTkfzGqXqQXG3wJaYXkRZuQ0vEYN4bhwg7QnIrqB5B+w==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/estree": "1.0.7" }, @@ -19758,6 +19725,12 @@ "node": ">=4" } }, + "node_modules/tseep": { + "version": "1.3.1", + "resolved": "https://registry.npmjs.org/tseep/-/tseep-1.3.1.tgz", + "integrity": "sha512-ZPtfk1tQnZVyr7BPtbJ93qaAh2lZuIOpTMjhrYa4XctT8xe7t4SAW9LIxrySDuYMsfNNayE51E/WNGrNVgVicQ==", + "license": "MIT" + }, "node_modules/tslib": { "version": "2.8.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", @@ -19858,7 +19831,6 @@ "integrity": "sha512-aJn6wq13/afZp/jT9QZmwEjDqqvSGp1VT5GVg+f/t6/oVyrgXM6BY1h9BRh/O5p3PlUPAe+WuiEZOmb/49RqoQ==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -20738,7 +20710,6 @@ "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.6.26.tgz", "integrity": "sha512-wiARO3wixu7mtoRP5f7LqpUtsURP9SmNgXUt3RlnZg4qDuF7dUjthwIvwxIDmK55dPw4Wl4QdW5A3ag0atwu7g==", "license": "MIT", - "peer": true, "dependencies": { "lib0": "^0.2.99" }, @@ -20839,6 +20810,21 @@ "yjs": "^13.6.8" } }, + "packages/extension-redis-affinity": { + "name": "@hocuspocus/extension-redis-affinity", + "version": "3.2.5", + "license": "MIT", + "dependencies": { + "@hocuspocus/server": "^3.2.5", + "ioredis": "^5.6.1", + "lib0": "^0.2.114", + "tseep": "^1.3.1" + }, + "peerDependencies": { + "y-protocols": "^1.0.6", + "yjs": "^13.6.8" + } + }, "packages/extension-s3": { "name": "@hocuspocus/extension-s3", "version": "3.2.5", diff --git a/packages/extension-redis-affinity/CHANGELOG.md b/packages/extension-redis-affinity/CHANGELOG.md new file mode 100644 index 000000000..e4d87c4d4 --- /dev/null +++ b/packages/extension-redis-affinity/CHANGELOG.md @@ -0,0 +1,4 @@ +# Change Log + +All notable changes to this project will be documented in this file. +See [Conventional Commits](https://conventionalcommits.org) for commit guidelines. diff --git a/packages/extension-redis-affinity/package.json b/packages/extension-redis-affinity/package.json new file mode 100644 index 000000000..6fbc318a2 --- /dev/null +++ b/packages/extension-redis-affinity/package.json @@ -0,0 +1,40 @@ +{ + "name": "@hocuspocus/extension-redis-affinity", + "version": "3.2.5", + "description": "Guarantee server affinity with Redis", + "homepage": "https://hocuspocus.dev", + "keywords": [ + "hocuspocus", + "redis", + "yjs" + ], + "license": "MIT", + "type": "module", + "main": "dist/hocuspocus-redis-affinity.cjs", + "module": "dist/hocuspocus-redis-affinity.esm.js", + "types": "dist/packages/extension-redis-affinity/src/index.d.ts", + "exports": { + "source": { + "import": "./src/index.ts" + }, + "default": { + "import": "./dist/hocuspocus-redis-affinity.esm.js", + "require": "./dist/hocuspocus-redis-affinity.cjs", + "types": "./dist/packages/extension-redis-affinity/src/index.d.ts" + } + }, + "files": [ + "src", + "dist" + ], + "dependencies": { + "@hocuspocus/server": "^3.2.5", + "ioredis": "^5.6.1", + "lib0": "^0.2.114", + "tseep": "^1.3.1" + }, + "peerDependencies": { + "ws": "^8.5.0" + }, + "gitHead": "b3454a4ca289a84ddfb7fa5607a2d4b8d5c37e9d" +} diff --git a/packages/extension-redis-affinity/src/HocusPocusProxySocket.ts b/packages/extension-redis-affinity/src/HocusPocusProxySocket.ts new file mode 100644 index 000000000..eb56f8778 --- /dev/null +++ b/packages/extension-redis-affinity/src/HocusPocusProxySocket.ts @@ -0,0 +1,41 @@ +import type RedisClient from 'ioredis' +import {EventEmitter} from 'tseep' +import type { + Pack, + RSAMessageClose, + RSAMessagePing, + RSAMessageSend +} from './RedisServerAffinity' + +export class HocusPocusProxySocket extends EventEmitter { + private replyTo: string + private socketId: string + private pub: RedisClient + private pack: Pack + readyState = 1 + constructor(pub: RedisClient, pack: Pack, replyTo: string, socketId: string) { + super() + this.replyTo = replyTo + this.socketId = socketId + this.pub = pub + this.pack = pack + this.on('close', () => { + this.readyState = 3 + }) + } + private publish(msg: RSAMessageClose | RSAMessagePing | RSAMessageSend) { + this.pub.publish(this.replyTo, this.pack(msg)) + } + close(code?: number, reason?: string) { + const msg: RSAMessageClose = {type: 'close', code, reason, socketId: this.socketId} + this.publish(msg) + } + ping() { + const msg: RSAMessagePing = {type: 'ping', socketId: this.socketId} + this.publish(msg) + } + send(message: Uint8Array) { + const msg: RSAMessageSend = {type: 'send', socketId: this.socketId, message} + this.publish(msg) + } +} diff --git a/packages/extension-redis-affinity/src/README.md b/packages/extension-redis-affinity/src/README.md new file mode 100644 index 000000000..dc2f4ee03 --- /dev/null +++ b/packages/extension-redis-affinity/src/README.md @@ -0,0 +1,179 @@ +# RedisServerAffinity for Hocuspocus + +`RedisServerAffinity` is a Hocuspocus extension that enforces **server/page affinity** using Redis. It ensures that a TipTap page is open on **only a single server instance** at a time. This is particularly useful in horizontally scaled environments where multiple Hocuspocus servers are running. + +_While path-based routing at the load balancer is generally preferred_, it is acknowledged that infrastructure changes are not always possible. For example, one can use this extension before calling `hocuspocus.openDirectConnection` in order to guarantee that server-based connections do not open the same document. This extension guaurantees server affinity by having each server act as a proxy to the server that currently owns the document. + +--- + +## Installation + +```bash +npm install ioredis @hocuspocus/server @hocuspocus/extension-redis-affinity +``` + +--- + +## Features + +* Guarantees that a document is locked to a single server. +* Maintains Redis locks to prevent multiple servers from opening the same document. +* Handles proxying of WebSocket messages between servers. +* Supports custom event handling across servers. +* Automatic lock maintenance and cleanup when documents unload or sockets disconnect. + +--- + +## Constructor + +```ts +new RedisServerAffinity(configuration: Configuration) +``` + +### Configuration Options + +| Option | Type | Description | | +| ---------------- | ------------------------------------------------------------------------------ | ----------------------------------------------------------------------------- | -------------------------------------------- | +| `redis` | `RedisClient` | ioredis instance. Used for pub/sub and locks. | | +| `pack` | `(msg: RSAMessage) => string \| Buffer` | Function to serialize messages for Redis. | +| `unpack` | `(packedMessage: Uint8Array \| Buffer) => RSAMessage` | Function to deserialize messages from Redis. | +| `serverId` | `string` | Unique identifier for this server instance. | | +| `lockTTL` | `number` | (Optional) Duration in ms to maintain document locks. Default: `10_000`. | | +| `proxySocketTTL` | `number` | (Optional) Duration in ms for keeping proxy sockets alive. Default: `30_000`. | | +| `customEventTTL` | `number` | (Optional) Timeout for custom event replies. Default: `30_000`. | | +| `prefix` | `string` | (Optional) Prefix for Redis keys. Default: `'rsa'`. | | +| `customEvents` | `Record Promise>` | (Optional) Map of custom event handlers. | | + +--- + +## Public Methods + +### `lockDocument(documentName: string)` + +Locks a document to the current server. Useful for freshly created documents. +Below is an example of locking the document before establishing a direct connection & streaming LLM output to it. + +```ts +const release = await redisAffinity.lockDocument(documentName); +const conn = await hocuspocus.openDirectConnection(documentName, {}) + for await (const content of contentGenerator) { + await conn.transact((doc) => { + for (const block of blocks) { + frag.insert(frag.length - 1, content) + } + }) + } +await conn.disconnect() +await release(); +``` + +* Throws an error if another server owns the document. +* Returns a function that releases the lock when called. + +--- + +--- + +### `releaseLock(documentName: string)` + +Releases a document lock and stops the interval that maintains it. + +```ts +await redisAffinity.releaseLock('my-doc'); +``` + +--- + +### `handleEvent(eventName: TName, documentName: string, payload: unknown): Promise>` + +Emits a **custom event** to the server that owns the document. +Example usage: if documents link to each other, updating the title of one may trigger updating text in backlinked documents. + +```ts +const updateLinkedTitles = (documentName: string, payload: {docId: string, title: string}) => { + const conn = await hocuspocus.openDirectConnection(documentName, {}) + await conn.transact((document) => { + const node = getNode(docId) + node.setAttribute('title', title) + }) + await conn.disconnect() +} +const redisHocusPocus = new RedisServerAffinity({customEvents: {updateLinkedTitles}}) + +const reuslt = await redisHocusPocus.handleEvent('updateLinkedTitles', documentName, {docId,title}) + +``` + +* If the document is loaded locally, the event is handled immediately. +* If another server owns the document, the event is proxied via Redis. + +--- + +## WebSocket Server Hooks + +These hooks integrate with your WebSocket server to maintain affinity and proxy messages. +`onSocketMessage` is required until the `beforeHandleMessage` can drop messages without throwing an error + +### `onSocketOpen(ws: BaseWebSocket, serializedHTTPRequest: SerializedHTTPRequest, context = {})` + +Registers a new client WebSocket and routes the connection to Hocuspocus. + +--- + +### `onSocketMessage(ws: BaseWebSocket, serializedHTTPRequest: SerializedHTTPRequest, detachableMsg: ArrayBuffer)` + +Handles incoming messages. + +* Sends messages directly if the document is loaded locally. +* Proxies to the owning server otherwise. + +--- + +### `onSocketClose(socketId: string, code?: number, reason?: ArrayBuffer)` + +Closes a client WebSocket connection and cleans up proxy sockets if needed. + +--- + +## Hocuspocus Hooks + +These are standard Hocuspocus lifecycle hooks implemented by the extension. + +* **`onConfigure({instance})`** – Sets the Hocuspocus instance for this extension. +* **`onLoadDocument({documentName})`** – Starts maintaining the lock for a loaded document. +* **`afterUnloadDocument({documentName})`** – Releases the lock and broadcasts an unload message to the cluster. +* **`onDisconnect({requestHeaders})`** – Cleans up a disconnected client and closes proxy sockets. +* **`onDestroy()`** – Disconnects Redis clients and cleans up all resources. + +--- + +## Example Usage + +```ts +import {Hocuspocus} from '@hocuspocus/server' +import Redis from 'ioredis' +import {RedisServerAffinity} from './RedisServerAffinity' +import {pack, unpack} from 'msgpackr' +const redis = new Redis() + +const redisAffinity = new RedisServerAffinity({ + redis, + serverId: 'server-1', + pack, + unpack, + customEvents: { + async myCustomEvent(docName, payload) { + return {handled: true} + } + } +}) + +const server = new Hocuspocus({port: 1234, extensions: [redisAffinity]}) +``` + +--- + +## Notes + +* Uses Redis `SETNX` instead of Redlock. This guarantees a lock for a single redis instance. +* Ensure that all servers in your cluster use the same Redis instance for locks and pub/sub (or PR to support `RedisCluster`) diff --git a/packages/extension-redis-affinity/src/RedisServerAffinity.ts b/packages/extension-redis-affinity/src/RedisServerAffinity.ts new file mode 100644 index 000000000..2c84c0c6e --- /dev/null +++ b/packages/extension-redis-affinity/src/RedisServerAffinity.ts @@ -0,0 +1,416 @@ +import type EventEmitter from 'node:events' +import type {IncomingMessage} from 'node:http' +import type {IncomingHttpHeaders} from 'node:http2' +import { + type Extension, + type Hocuspocus, + IncomingMessage as SocketIncomingMessage, + type afterUnloadDocumentPayload, + type onConfigurePayload, + type onDisconnectPayload, + type onLoadDocumentPayload +} from '@hocuspocus/server' +import type RedisClient from 'ioredis' +import {readVarString} from 'lib0/decoding.js' +import type {WebSocket} from 'ws' +import {HocusPocusProxySocket} from './HocusPocusProxySocket' + +export type SecondParam = T extends (arg1: unknown, arg2: infer A, ...args: unknown[]) => unknown ? A : never +export type RSAMessageProxy = { + type: 'proxy' + replyTo: string + message: Uint8Array + serializedHTTPRequest: SerializedHTTPRequest +} + +export type RSAMessageCloseProxy = { + type: 'closeProxy' + socketId: string +} + +export type RSAMessageUnload = { + type: 'unload' + documentName: string +} + +export type RSAMessageClose = { + type: 'close' + code?: number + reason?: string + socketId: string +} + +export type RSAMessagePing = { + type: 'ping' + socketId: string +} + +export type RSAMessageSend = { + type: 'send' + message: Uint8Array + socketId: string +} + +export type RSAMessageCustomEventStart = { + type: 'customEventStart' + documentName: string + eventName: TName + payload: TPayload + replyTo: string + replyId: number +} + +export type RSAMessageCustomEventComplete = { + type: 'customEventComplete' + replyId: number + payload: unknown +} + +export type RSAMessage = + | RSAMessageProxy + | RSAMessageCloseProxy + | RSAMessageUnload + | RSAMessageClose + | RSAMessagePing + | RSAMessageSend + | RSAMessageCustomEventStart + | RSAMessageCustomEventComplete + +export type SerializedHTTPRequest = { + method: string + url: string + headers: IncomingHttpHeaders & {'sec-websocket-key': string} + socket: {remoteAddress: string} +} +export type Pack = (msg: RSAMessage) => string | Buffer +type Unpack = (packedMessage: Uint8Array | Buffer) => RSAMessage +type ServerId = string +type DocumentName = string +type SocketId = string +type CustomEventName = string +type CustomEvents = Record Promise> + +interface Configuration { + redis: RedisClient + pack: Pack + unpack: Unpack + serverId: ServerId + lockTTL?: number + customEventTTL?: number + proxySocketTTL?: number + prefix?: string + customEvents?: TCE +} + +interface BaseWebSocket extends EventEmitter { + readyState: number + close(code?: number, reason?: string): void + ping(): void + send(message: Uint8Array): void +} + +export class RedisServerAffinity implements Extension { + priority = 1000 + private pub: RedisClient + private sub: RedisClient + private pack: Pack + private unpack: Unpack + private originSockets: Record = {} + private locks: Record = {} + private lockPromises: Record> = {} + private proxySockets: Record = + {} + private prefix: string + private lockPrefix: string + private msgChannel: string + private serverId: ServerId + private customEventTTL: number + private lockTTL: number + private proxySocketTTL: number + private instance!: Hocuspocus + private customEvents: TCE + private replyIdCounter = 0 + private pendingReplies: Record['resolve']> = {} + constructor(configuration: Configuration) { + const { + redis, + pack, + unpack, + serverId, + lockTTL, + prefix, + proxySocketTTL, + customEvents, + customEventTTL + } = configuration + this.pub = redis.duplicate() + this.sub = redis.duplicate() + this.pack = pack + this.unpack = unpack + this.serverId = serverId + this.lockTTL = lockTTL ?? 10_000 + this.proxySocketTTL = proxySocketTTL ?? 30_000 + this.customEventTTL = customEventTTL ?? 30_000 + this.prefix = prefix ?? 'rsa' + this.lockPrefix = `${this.prefix}Lock` + this.msgChannel = `${this.prefix}Msg` + this.customEvents = (customEvents ?? {}) as unknown as TCE + this.sub.subscribe(this.msgChannel, `${this.msgChannel}:${this.serverId}`) + this.sub.on('messageBuffer', this.handleRedisMessage) + } + private getKey(documentName: string) { + return `${this.lockPrefix}:${documentName}` + } + + private closeProxy(socketId: string) { + const socketRecord = this.proxySockets[socketId] + if (!socketRecord) return + clearTimeout(socketRecord.cleanup) + delete this.proxySockets[socketId] + } + + private handleProxyMessage( + msg: Pick + ) { + const {replyTo, message, serializedHTTPRequest} = msg + const {headers} = serializedHTTPRequest + const socketId = headers['sec-websocket-key'] + let socketRecord = this.proxySockets[socketId] + const cleanup = setTimeout(() => { + delete this.proxySockets[socketId] + }, this.proxySocketTTL) + if (!socketRecord) { + const socket = new HocusPocusProxySocket(this.pub, this.pack, replyTo, socketId) + socketRecord = {socket, cleanup} + this.proxySockets[socketId] = socketRecord + this.instance.handleConnection(socket as unknown as WebSocket, serializedHTTPRequest as unknown as IncomingMessage, {}) + } else { + clearTimeout(socketRecord.cleanup) + socketRecord.cleanup = cleanup + } + socketRecord.socket.emit('message', message) + } + + private getOrClaimLock(documentName: string) { + const lockPromise = this.pub.set( + this.getKey(documentName), + this.serverId, + 'PX', + this.lockTTL, + 'NX', + 'GET' + ) + this.lockPromises[documentName] = lockPromise + // Briefly cache the serverId that claimed the doc to reduce load on redis + // When the claimant unloads the doc, it will send an unload message to immediately clear this + // a lockTTL / 2 guarantees stale reads < lockTTL upon server crash + setTimeout(() => { + delete this.lockPromises[documentName] + }, this.lockTTL / 2) + return lockPromise + } + + private getOrClaimLockThrottled(documentName: string) { + const existingWorkerIdPromise = this.lockPromises[documentName] + if (existingWorkerIdPromise) return existingWorkerIdPromise + return this.getOrClaimLock(documentName) + } + + private handleRedisMessage = async (_channel: Buffer, packedMessage: Buffer) => { + const msg = this.unpack(packedMessage) as RSAMessage + const {type} = msg + if (type === 'proxy') { + this.handleProxyMessage(msg) + return + } + if (type === 'closeProxy') { + this.closeProxy(msg.socketId) + return + } + if (type === 'unload') { + delete this.lockPromises[msg.documentName] + return + } + if (type === 'customEventStart') { + const {documentName, eventName, payload, replyTo, replyId} = msg + const res = await this.handleEventLocally( + eventName as Extract, + documentName, + payload + ) + const reply: RSAMessageCustomEventComplete = { + type: 'customEventComplete', + replyId, + payload: res + } + this.pub.publish(`${replyTo}`, this.pack(reply)) + return + } + if (type === 'customEventComplete') { + const {replyId, payload} = msg + const resolveFn = this.pendingReplies[replyId] + if (!resolveFn) return + resolveFn(payload) + return + } + const {socketId} = msg + const socket = this.originSockets[socketId] + if (!socket) { + // origin socket already cleaned up + return + } + if (type === 'close') { + socket.close(msg.code, msg.reason) + } else if (type === 'ping') { + socket.ping() + } else if (type === 'send') { + socket.send(msg.message) + } + } + + async maintainLock(documentName: string) { + this.locks[documentName] = setInterval(() => { + this.pub.set(this.getKey(documentName), this.serverId, 'PX', this.lockTTL) + }, this.lockTTL / 2) + } + + async releaseLock(documentName: string) { + clearInterval(this.locks[documentName]) + delete this.locks[documentName] + return this.pub.del(this.getKey(documentName)) + } + + private async handleEventLocally>( + eventName: TName, + documentName: string, + payload: unknown + ) { + const handler = this.customEvents[eventName] + if (!handler) throw new Error(`Invalid eventName: ${eventName}`) + const result = await handler(documentName, payload) + return result as Promise> + } + + async handleEvent>( + eventName: TName, + documentName: string, + payload: unknown + ) { + const isDocLoadedOnInstance = this.instance.documents.has(documentName) + + if (isDocLoadedOnInstance) { + return this.handleEventLocally(eventName, documentName, payload) + } + + const proxyTo = await this.getOrClaimLockThrottled(documentName) + if (proxyTo && proxyTo !== this.serverId) { + ++this.replyIdCounter // bug in biome thinks this.replyIdCounter is not used if written on the line below + const replyId = this.replyIdCounter + // another server owns the doc + const proxyMessage: RSAMessageCustomEventStart = { + eventName, + documentName, + payload, + replyTo: `${this.msgChannel}:${this.serverId}`, + replyId, + type: 'customEventStart' + } + const msg = this.pack(proxyMessage) + this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg) + const {promise, resolve, reject} = Promise.withResolvers() + this.pendingReplies[replyId] = resolve + setTimeout(() => { + reject('TIMEOUT') + }, this.customEventTTL) + return promise as Promise> + } + // This server owns the document, but hocuspocus hasn't loaded it yet + return this.handleEventLocally(eventName, documentName, payload) + } + + async lockDocument(documentName: string) { + const proxyTo = await this.getOrClaimLockThrottled(documentName) + if (proxyTo && proxyTo !== this.serverId) { + throw new Error(`Could not lock document: ${documentName}`) + } + this.maintainLock(documentName) + return () => this.releaseLock(documentName) + } + + /* WebSocket Server Hooks */ + onSocketOpen(ws: BaseWebSocket, serializedHTTPRequest: SerializedHTTPRequest, context = {}) { + const socketId = serializedHTTPRequest.headers['sec-websocket-key'] + this.originSockets[socketId] = ws + this.instance.handleConnection(ws as unknown as WebSocket, serializedHTTPRequest as unknown as IncomingMessage, context) + } + + async onSocketMessage( + ws: BaseWebSocket, + serializedHTTPRequest: SerializedHTTPRequest, + detachableMsg: ArrayBuffer + ) { + const message = new Uint8Array(detachableMsg.slice()) + const tmpMsg = new SocketIncomingMessage(detachableMsg) + const documentName = readVarString(tmpMsg.decoder) + const isDocLoadedOnInstance = this.instance.documents.has(documentName) + + if (isDocLoadedOnInstance) { + ws.emit('message', message) + return + } + + const proxyTo = await this.getOrClaimLockThrottled(documentName) + if (proxyTo && proxyTo !== this.serverId) { + // another server owns the doc + const proxyMessage: RSAMessageProxy = { + serializedHTTPRequest: serializedHTTPRequest, + replyTo: `${this.msgChannel}:${this.serverId}`, + message, + type: 'proxy' + } + const msg = this.pack(proxyMessage) + this.pub.publish(`${this.msgChannel}:${proxyTo}`, msg) + return + } + // This server owns the document, but hocuspocus hasn't loaded it yet + ws.emit('message', message) + } + + onSocketClose(socketId: string, code?: number, reason?: ArrayBuffer) { + const socket = this.originSockets[socketId] + socket?.emit('close', code, reason) + } + + /* Hocuspocus hooks */ + async onConfigure({instance}: onConfigurePayload) { + this.instance = instance + } + + async onLoadDocument(data: onLoadDocumentPayload) { + const {documentName} = data + // Refresh the lock TTL + this.maintainLock(documentName) + } + + async afterUnloadDocument(data: afterUnloadDocumentPayload) { + const {documentName} = data + this.releaseLock(documentName) + // Broadcast to cluster to immediately remove the cached redis value + const msg: RSAMessageUnload = {type: 'unload', documentName} + this.pub.publish(this.msgChannel, this.pack(msg)) + } + + async onDisconnect(data: onDisconnectPayload) { + const {requestHeaders} = data + const socketId = requestHeaders['sec-websocket-key'] + if (!socketId) return + delete this.originSockets[socketId] + const msg: RSAMessageCloseProxy = {type: 'closeProxy', socketId} + this.pub.publish(this.msgChannel, this.pack(msg)) + } + + async onDestroy() { + this.pub.disconnect(false) + this.sub.disconnect(false) + } +} diff --git a/packages/extension-redis-affinity/src/index.ts b/packages/extension-redis-affinity/src/index.ts new file mode 100644 index 000000000..1692d69af --- /dev/null +++ b/packages/extension-redis-affinity/src/index.ts @@ -0,0 +1 @@ +export * from "./RedisServerAffinity.ts";