Skip to content

Commit f80038e

Browse files
authored
ENG-1493 - Split 2 sync worker (#838)
* fix(roam): harden setHintingShapes against atom reaction cycle errors * feat(sync-worker): add Cloudflare tldraw sync worker foundation * add prettierignore
1 parent bb34542 commit f80038e

9 files changed

Lines changed: 1322 additions & 11 deletions

File tree

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,6 @@ yarn-error.log*
3737

3838
# Local development files
3939
local/*
40-
.cursor/debug.log
40+
.cursor/debug.log
41+
apps/tldraw-sync-worker/tsconfig.tsbuildinfo
42+
apps/tldraw-sync-worker/.wrangler/*

.prettierignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
packages/database/src/dbTypes.ts
2+
pnpm-lock.yaml
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "@repo/tldraw-sync-worker",
3+
"private": true,
4+
"version": "0.1.0",
5+
"license": "Apache-2.0",
6+
"type": "module",
7+
"scripts": {
8+
"dev": "wrangler dev",
9+
"deploy": "wrangler deploy",
10+
"check-types": "tsc --noEmit -p tsconfig.json"
11+
},
12+
"dependencies": {
13+
"@tldraw/sync-core": "2.4.6",
14+
"@tldraw/tlschema": "2.4.6",
15+
"cloudflare-workers-unfurl": "^0.0.7",
16+
"itty-router": "^5.0.17",
17+
"lodash.throttle": "^4.1.1"
18+
},
19+
"devDependencies": {
20+
"@cloudflare/workers-types": "^4.20240208.0",
21+
"@types/lodash.throttle": "^4",
22+
"typescript": "^5.0.2",
23+
"wrangler": "^3.64.0"
24+
}
25+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"compilerOptions": {
3+
"composite": true,
4+
"skipLibCheck": true,
5+
"lib": ["ES2020"],
6+
"module": "ESNext",
7+
"moduleResolution": "bundler",
8+
"allowSyntheticDefaultImports": true,
9+
"types": ["@cloudflare/workers-types"],
10+
"strict": true,
11+
"noUnusedLocals": true,
12+
"noUnusedParameters": true,
13+
"noFallthroughCasesInSwitch": true
14+
},
15+
"include": ["worker"]
16+
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
import { RoomSnapshot, TLSocketRoom } from "@tldraw/sync-core";
2+
import {
3+
TLRecord,
4+
createTLSchema,
5+
defaultBindingSchemas,
6+
defaultShapeSchemas,
7+
} from "@tldraw/tlschema";
8+
import { AutoRouter, IRequest, error } from "itty-router";
9+
import throttle from "lodash.throttle";
10+
import { Environment } from "./types";
11+
12+
type RoomSchemaConfig = {
13+
shapeTypes: string[];
14+
bindingTypes: string[];
15+
};
16+
17+
const STORAGE_SCHEMA_CONFIG_KEY = "schemaConfig";
18+
19+
const createRoomSchema = ({ shapeTypes, bindingTypes }: RoomSchemaConfig) => {
20+
const customShapeSchemas = Object.fromEntries(
21+
shapeTypes.map((type) => [type, {}]),
22+
);
23+
const customBindingSchemas = Object.fromEntries(
24+
bindingTypes.map((type) => [type, {}]),
25+
);
26+
27+
return createTLSchema({
28+
shapes: {
29+
...defaultShapeSchemas,
30+
...customShapeSchemas,
31+
},
32+
bindings: {
33+
...defaultBindingSchemas,
34+
...customBindingSchemas,
35+
},
36+
});
37+
};
38+
39+
const dedupeAndSort = (values: string[]): string[] => {
40+
return Array.from(new Set(values.filter(Boolean))).sort((a, b) =>
41+
a.localeCompare(b),
42+
);
43+
};
44+
45+
const mergeSchemaConfig = (
46+
baseConfig: RoomSchemaConfig,
47+
incomingConfig: RoomSchemaConfig,
48+
): RoomSchemaConfig => ({
49+
shapeTypes: dedupeAndSort(
50+
baseConfig.shapeTypes.concat(incomingConfig.shapeTypes),
51+
),
52+
bindingTypes: dedupeAndSort(
53+
baseConfig.bindingTypes.concat(incomingConfig.bindingTypes),
54+
),
55+
});
56+
57+
const isSameSchemaConfig = (
58+
a: RoomSchemaConfig,
59+
b: RoomSchemaConfig,
60+
): boolean => {
61+
return (
62+
a.shapeTypes.length === b.shapeTypes.length &&
63+
a.bindingTypes.length === b.bindingTypes.length &&
64+
a.shapeTypes.every((value, index) => value === b.shapeTypes[index]) &&
65+
a.bindingTypes.every((value, index) => value === b.bindingTypes[index])
66+
);
67+
};
68+
69+
// each whiteboard room is hosted in a DurableObject:
70+
// https://developers.cloudflare.com/durable-objects/
71+
72+
// there's only ever one durable object instance per room. it keeps all the room state in memory and
73+
// handles websocket connections. periodically, it persists the room state to the R2 bucket.
74+
export class TldrawDurableObject {
75+
private r2: R2Bucket;
76+
// the room ID will be missing whilst the room is being initialized
77+
private roomId: string | null = null;
78+
private roomSchemaConfig: RoomSchemaConfig = {
79+
shapeTypes: [],
80+
bindingTypes: [],
81+
};
82+
// when we load the room from the R2 bucket, we keep it here. it's a promise so we only ever
83+
// load it once.
84+
private roomPromise: Promise<TLSocketRoom<TLRecord, void>> | null = null;
85+
86+
constructor(
87+
private readonly ctx: DurableObjectState,
88+
env: Environment,
89+
) {
90+
this.r2 = env.TLDRAW_BUCKET;
91+
92+
ctx.blockConcurrencyWhile(async () => {
93+
this.roomId = ((await this.ctx.storage.get("roomId")) ?? null) as
94+
| string
95+
| null;
96+
const schemaConfig =
97+
((await this.ctx.storage.get(
98+
STORAGE_SCHEMA_CONFIG_KEY,
99+
)) as RoomSchemaConfig) ?? null;
100+
if (schemaConfig) {
101+
this.roomSchemaConfig = {
102+
shapeTypes: dedupeAndSort(schemaConfig.shapeTypes ?? []),
103+
bindingTypes: dedupeAndSort(schemaConfig.bindingTypes ?? []),
104+
};
105+
}
106+
});
107+
}
108+
109+
private readonly router = AutoRouter({
110+
catch: (e) => {
111+
console.log(e);
112+
return error(e);
113+
},
114+
})
115+
// when we get a connection request, we stash the room id if needed and handle the connection
116+
.get("/connect/:roomId", async (request) => {
117+
if (!this.roomId) {
118+
await this.ctx.blockConcurrencyWhile(async () => {
119+
await this.ctx.storage.put("roomId", request.params.roomId);
120+
this.roomId = request.params.roomId;
121+
});
122+
}
123+
return this.handleConnect(request);
124+
});
125+
126+
// `fetch` is the entry point for all requests to the Durable Object
127+
fetch(request: Request): Response | Promise<Response> {
128+
return this.router.fetch(request);
129+
}
130+
131+
// what happens when someone tries to connect to this room?
132+
async handleConnect(request: IRequest): Promise<Response> {
133+
// extract query params from request
134+
const sessionId = request.query.sessionId as string;
135+
if (!sessionId) return error(400, "Missing sessionId");
136+
const incomingSchemaConfig = this.getIncomingSchemaConfig(request);
137+
await this.ensureSchemaConfig(incomingSchemaConfig);
138+
139+
// Create the websocket pair for the client
140+
const { 0: clientWebSocket, 1: serverWebSocket } = new WebSocketPair();
141+
serverWebSocket.accept();
142+
143+
// load the room, or retrieve it if it's already loaded
144+
const room = await this.getRoom();
145+
146+
// connect the client to the room
147+
room.handleSocketConnect({ sessionId, socket: serverWebSocket });
148+
149+
// return the websocket connection to the client
150+
return new Response(null, { status: 101, webSocket: clientWebSocket });
151+
}
152+
153+
private getIncomingSchemaConfig(request: IRequest): RoomSchemaConfig {
154+
const url = new URL(request.url);
155+
return {
156+
shapeTypes: dedupeAndSort(url.searchParams.getAll("shapeType")),
157+
bindingTypes: dedupeAndSort(url.searchParams.getAll("bindingType")),
158+
};
159+
}
160+
161+
private async ensureSchemaConfig(
162+
incomingSchemaConfig: RoomSchemaConfig,
163+
): Promise<void> {
164+
const nextConfig = mergeSchemaConfig(
165+
this.roomSchemaConfig,
166+
incomingSchemaConfig,
167+
);
168+
if (isSameSchemaConfig(this.roomSchemaConfig, nextConfig)) return;
169+
170+
this.roomSchemaConfig = nextConfig;
171+
await this.ctx.storage.put(
172+
STORAGE_SCHEMA_CONFIG_KEY,
173+
this.roomSchemaConfig,
174+
);
175+
176+
if (this.roomPromise) {
177+
const previousRoom = await this.roomPromise;
178+
const snapshot = previousRoom.getCurrentSnapshot();
179+
previousRoom.close();
180+
this.roomPromise = Promise.resolve(this.createRoom(snapshot));
181+
}
182+
}
183+
184+
private createRoom(
185+
initialSnapshot?: RoomSnapshot,
186+
): TLSocketRoom<TLRecord, void> {
187+
return new TLSocketRoom<TLRecord, void>({
188+
schema: createRoomSchema(this.roomSchemaConfig),
189+
initialSnapshot,
190+
onDataChange: () => {
191+
// and persist whenever the data in the room changes
192+
this.schedulePersistToR2();
193+
},
194+
});
195+
}
196+
197+
getRoom() {
198+
const roomId = this.roomId;
199+
if (!roomId) throw new Error("Missing roomId");
200+
201+
if (!this.roomPromise) {
202+
this.roomPromise = (async () => {
203+
// fetch the room from R2
204+
const roomFromBucket = await this.r2.get(`rooms/${roomId}`);
205+
206+
// if it doesn't exist, we'll just create a new empty room
207+
const initialSnapshot = roomFromBucket
208+
? ((await roomFromBucket.json()) as RoomSnapshot)
209+
: undefined;
210+
211+
// create a new TLSocketRoom. This handles all the sync protocol & websocket connections.
212+
// it's up to us to persist the room state to R2 when needed though.
213+
return this.createRoom(initialSnapshot);
214+
})();
215+
}
216+
217+
return this.roomPromise;
218+
}
219+
220+
// we throttle persistance so it only happens every 10 seconds
221+
schedulePersistToR2: () => void = throttle(async () => {
222+
if (!this.roomPromise || !this.roomId) return;
223+
try {
224+
const room = await this.getRoom();
225+
const snapshot = JSON.stringify(room.getCurrentSnapshot());
226+
await this.r2.put(`rooms/${this.roomId}`, snapshot);
227+
} catch (e) {
228+
console.error("Failed to persist room to R2", {
229+
roomId: this.roomId,
230+
error: e,
231+
});
232+
}
233+
}, 10_000);
234+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// the contents of the environment should mostly be determined by wrangler.toml. These entries match
2+
// the bindings defined there.
3+
export interface Environment {
4+
TLDRAW_BUCKET: R2Bucket;
5+
TLDRAW_DURABLE_OBJECT: DurableObjectNamespace;
6+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import { handleUnfurlRequest } from "cloudflare-workers-unfurl";
2+
import { AutoRouter, error, IRequest } from "itty-router";
3+
import { Environment } from "./types";
4+
5+
// make sure our sync durable object is made available to cloudflare
6+
export { TldrawDurableObject } from "./TldrawDurableObject";
7+
8+
const ALLOWED_ORIGINS = [
9+
"https://roamresearch.com",
10+
"http://localhost:3000",
11+
"app://obsidian.md",
12+
];
13+
14+
const isVercelPreviewUrl = (origin: string): boolean =>
15+
/^https:\/\/.*-discourse-graph-[a-z0-9]+\.vercel\.app$/.test(origin);
16+
17+
const isAllowedOrigin = (origin: string): boolean =>
18+
ALLOWED_ORIGINS.some((allowedOrigin) => origin === allowedOrigin) ||
19+
isVercelPreviewUrl(origin);
20+
21+
const setCorsHeaders = ({
22+
request,
23+
response,
24+
}: {
25+
request: IRequest;
26+
response: Response;
27+
}): Response => {
28+
if (response.status === 101) return response; // WebSocket upgrade response; headers are immutable here, so skip CORS header mutation.
29+
30+
const origin = request.headers.get("origin");
31+
if (origin && isAllowedOrigin(origin)) {
32+
response.headers.set("Access-Control-Allow-Origin", origin);
33+
response.headers.set("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
34+
response.headers.set(
35+
"Access-Control-Allow-Headers",
36+
"Content-Type, Authorization, x-vercel-protection-bypass",
37+
);
38+
}
39+
return response;
40+
};
41+
42+
const handlePreflight = (request: IRequest): Response => {
43+
const origin = request.headers.get("origin");
44+
if (!origin || !isAllowedOrigin(origin)) {
45+
return error(403, "Origin not allowed");
46+
}
47+
48+
return new Response(null, {
49+
status: 204,
50+
headers: {
51+
"Access-Control-Allow-Origin": origin,
52+
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
53+
"Access-Control-Allow-Headers":
54+
"Content-Type, Authorization, x-vercel-protection-bypass",
55+
"Access-Control-Max-Age": "86400",
56+
},
57+
});
58+
};
59+
60+
const enforceAllowedOrigin = (request: IRequest): Response | void => {
61+
if (request.method === "OPTIONS") return;
62+
const origin = request.headers.get("origin");
63+
if (origin && !isAllowedOrigin(origin)) {
64+
return error(403, "Origin not allowed");
65+
}
66+
};
67+
68+
const router = AutoRouter<IRequest, [env: Environment, ctx: ExecutionContext]>({
69+
before: [enforceAllowedOrigin],
70+
catch: (e) => {
71+
console.error(e);
72+
return error(e);
73+
},
74+
})
75+
.options("*", handlePreflight)
76+
// requests to /connect are routed to the Durable Object, and handle realtime websocket syncing
77+
.get("/connect/:roomId", async (request, env) => {
78+
const id = env.TLDRAW_DURABLE_OBJECT.idFromName(request.params.roomId);
79+
const room = env.TLDRAW_DURABLE_OBJECT.get(id);
80+
const response = await room.fetch(request.url, {
81+
headers: request.headers,
82+
body: request.body,
83+
});
84+
return setCorsHeaders({ request, response });
85+
})
86+
87+
// bookmarks need to extract metadata from pasted URLs:
88+
.get("/unfurl", async (request) => {
89+
const response = await handleUnfurlRequest(request);
90+
return setCorsHeaders({ request, response });
91+
});
92+
93+
// export our router for cloudflare
94+
export default router;

0 commit comments

Comments
 (0)