|
1 | 1 | /// <reference no-default-lib="true"/> |
2 | 2 | /// <reference types="@cloudflare/workers-types" /> |
3 | 3 |
|
4 | | -import { RefUpdateError } from '@pierre/storage' |
| 4 | +import { ApiError, RefUpdateError, type Repo } from '@pierre/storage' |
5 | 5 | import { SupabaseClient } from '@supabase/supabase-js' |
6 | 6 | import { |
7 | 7 | DB, |
@@ -53,6 +53,7 @@ import { EventData, writeDataPoint } from './utils/analytics' |
53 | 53 | import { createPierreClient } from './utils/createPierreClient' |
54 | 54 | import { createSupabaseClient } from './utils/createSupabaseClient' |
55 | 55 | import { getRoomDurableObject } from './utils/durableObjects' |
| 56 | +import { reconstructSnapshotFromPierre } from './utils/pierreSnapshot' |
56 | 57 | import { isRateLimited } from './utils/rateLimit' |
57 | 58 | import { getSlug } from './utils/roomOpenMode' |
58 | 59 | import { throttle } from './utils/throttle' |
@@ -211,6 +212,7 @@ export class TLFileDurableObject extends DurableObject { |
211 | 212 | // For persistence |
212 | 213 | supabaseClient: SupabaseClient | void |
213 | 214 | pierreClient: ReturnType<typeof createPierreClient> |
| 215 | + pierreState: PierreState | null = null |
214 | 216 |
|
215 | 217 | // For analytics |
216 | 218 | measure: Analytics | undefined |
@@ -371,16 +373,9 @@ export class TLFileDurableObject extends DurableObject { |
371 | 373 | if (!repo) { |
372 | 374 | return new Response('Pierre not available', { status: 503 }) |
373 | 375 | } |
374 | | - const fileStream = await repo.getFileStream({ |
375 | | - path: SNAPSHOT_FILE_NAME, |
376 | | - ref: commitHash, |
377 | | - }) |
378 | | - dataText = await fileStream.text() |
379 | | - await repo.restoreCommit({ |
380 | | - targetCommitSha: commitHash, |
381 | | - targetBranch: 'main', |
382 | | - author: PIERRE_AUTHOR, |
383 | | - }) |
| 376 | + const snapshot = await reconstructSnapshotFromPierre(repo, commitHash) |
| 377 | + dataText = JSON.stringify(snapshot) |
| 378 | + this.pierreState = null |
384 | 379 | } else { |
385 | 380 | const timestamp = ((await req.json()) as any).timestamp |
386 | 381 | if (!timestamp) { |
@@ -973,6 +968,7 @@ export class TLFileDurableObject extends DurableObject { |
973 | 968 |
|
974 | 969 | const key = getR2KeyForRoom({ slug: slug, isApp: this.documentInfo.isApp }) |
975 | 970 | await this._uploadSnapshotToR2(snapshot, key) |
| 971 | + await this.persistToPierre(storage, snapshot) |
976 | 972 |
|
977 | 973 | this.logEvent({ type: 'persist_success', attempts: attempt }) |
978 | 974 | this._lastPersistedClock = snapshot.documentClock |
@@ -1023,7 +1019,6 @@ export class TLFileDurableObject extends DurableObject { |
1023 | 1019 | // Then upload to version cache |
1024 | 1020 | const versionKey = `${key}/${new Date().toISOString()}` |
1025 | 1021 | await this._uploadSnapshotToBucket(this.r2.versionCache, snapshot, versionKey) |
1026 | | - await this.persistToPierre(versionKey) |
1027 | 1022 | } |
1028 | 1023 |
|
1029 | 1024 | private async _uploadSnapshotToBucket(bucket: R2Bucket, snapshot: RoomSnapshot, key: string) { |
@@ -1120,44 +1115,139 @@ export class TLFileDurableObject extends DurableObject { |
1120 | 1115 | ) { |
1121 | 1116 | return null |
1122 | 1117 | } |
1123 | | - const repoId = `${this.env.TLDRAW_ENV}/snapshots/${this.documentInfo.slug}` |
| 1118 | + const repoId = `${this.env.TLDRAW_ENV}/files/${this.documentInfo.slug}` |
1124 | 1119 | return ( |
1125 | 1120 | (await this.pierreClient.findOne({ id: repoId })) ?? |
1126 | 1121 | (await this.pierreClient.createRepo({ id: repoId })) |
1127 | 1122 | ) |
1128 | 1123 | } |
1129 | 1124 |
|
1130 | | - private async persistToPierre(key: string) { |
| 1125 | + /** |
| 1126 | + * Sync local Pierre tracking state from the remote repo. Fetches HEAD sha |
| 1127 | + * and meta.json (for documentClock). For empty repos, sets documentClock |
| 1128 | + * to -1 so getChangesSince returns all records. |
| 1129 | + */ |
| 1130 | + private async syncPierreState(repo: Repo) { |
| 1131 | + let headCommit: { sha: string } | undefined |
| 1132 | + try { |
| 1133 | + const { commits } = await repo.listCommits({ limit: 1 }) |
| 1134 | + headCommit = commits[0] |
| 1135 | + } catch (error) { |
| 1136 | + if (error instanceof ApiError && error.status === 404) { |
| 1137 | + this.pierreState = { headSha: undefined, documentClock: -1 } |
| 1138 | + return |
| 1139 | + } |
| 1140 | + throw error |
| 1141 | + } |
| 1142 | + |
| 1143 | + if (!headCommit) { |
| 1144 | + this.pierreState = { headSha: undefined, documentClock: -1 } |
| 1145 | + return |
| 1146 | + } |
| 1147 | + |
| 1148 | + const metaResp = await repo.getFileStream({ path: 'meta.json', ref: headCommit.sha }) |
| 1149 | + const meta = (await metaResp.json()) as PierreMeta |
| 1150 | + |
| 1151 | + this.pierreState = { |
| 1152 | + headSha: headCommit.sha, |
| 1153 | + documentClock: meta.documentClock ?? 0, |
| 1154 | + } |
| 1155 | + } |
| 1156 | + |
| 1157 | + private async persistToPierre(storage: TLSyncStorage<TLRecord>, snapshot: RoomSnapshot) { |
1131 | 1158 | try { |
1132 | 1159 | const repo = await this.getPierreRepo() |
1133 | 1160 | if (!repo) return |
1134 | 1161 |
|
1135 | | - // Get the snapshot from R2 to create a readable stream |
1136 | | - const r2Object = await this.r2.versionCache.get(key) |
1137 | | - if (!r2Object) { |
1138 | | - console.warn('Failed to get R2 object for Pierre upload:', key) |
1139 | | - return |
1140 | | - } |
| 1162 | + const MAX_CAS_RETRIES = 3 |
| 1163 | + for (let attempt = 0; attempt < MAX_CAS_RETRIES; attempt++) { |
| 1164 | + if (!this.pierreState) { |
| 1165 | + await this.syncPierreState(repo) |
| 1166 | + } |
| 1167 | + |
| 1168 | + const { headSha, documentClock: pierreDocClock } = this.pierreState! |
| 1169 | + |
| 1170 | + const { result: changes, documentClock } = storage.transaction((txn) => |
| 1171 | + txn.getChangesSince(pierreDocClock) |
| 1172 | + ) |
| 1173 | + |
| 1174 | + if (!changes) return |
| 1175 | + |
| 1176 | + const { diff } = changes |
| 1177 | + const hasPuts = Object.keys(diff.puts).length > 0 |
| 1178 | + const hasDeletes = diff.deletes.length > 0 |
1141 | 1179 |
|
1142 | | - // Create commit with the snapshot |
1143 | | - const timestamp = new Date().toISOString() |
1144 | | - await repo |
1145 | | - .createCommit({ |
| 1180 | + if (!hasPuts && !hasDeletes && pierreDocClock === documentClock) { |
| 1181 | + return |
| 1182 | + } |
| 1183 | + |
| 1184 | + const timestamp = new Date().toISOString() |
| 1185 | + const commitBuilder = repo.createCommit({ |
1146 | 1186 | targetBranch: 'main', |
1147 | 1187 | commitMessage: `Snapshot at ${timestamp}`, |
1148 | 1188 | author: PIERRE_AUTHOR, |
| 1189 | + expectedHeadSha: headSha, |
1149 | 1190 | }) |
1150 | | - .addFile(SNAPSHOT_FILE_NAME, r2Object.body) |
1151 | | - .send() |
1152 | | - .catch((e) => { |
1153 | | - // ignore no changes to commit errors |
1154 | | - if (e instanceof RefUpdateError && e.message.match('no changes to commit')) { |
1155 | | - return |
| 1191 | + |
| 1192 | + const meta: PierreMeta = { |
| 1193 | + documentClock, |
| 1194 | + schema: snapshot.schema, |
| 1195 | + } |
| 1196 | + commitBuilder.addFileFromString('meta.json', JSON.stringify(meta)) |
| 1197 | + |
| 1198 | + for (const [id, put] of Object.entries(diff.puts)) { |
| 1199 | + const state = Array.isArray(put) ? put[1] : put |
| 1200 | + commitBuilder.addFileFromString(`records/${id}.json`, JSON.stringify(state)) |
| 1201 | + } |
| 1202 | + |
| 1203 | + // Only apply diff.deletes when we have a parent commit and we're not in wipeAll. |
| 1204 | + // - Empty repo (no headSha): those paths don't exist in Pierre; deletePath would fail. |
| 1205 | + // - wipeAll with existing repo: the cleanup loop below already deletes any file not in |
| 1206 | + // putIds, so applying diff.deletes here would duplicate deletePath for the same file. |
| 1207 | + if (headSha && !changes.wipeAll) { |
| 1208 | + for (const id of diff.deletes) { |
| 1209 | + commitBuilder.deletePath(`records/${id}.json`) |
1156 | 1210 | } |
1157 | | - throw e |
1158 | | - }) |
| 1211 | + } |
| 1212 | + |
| 1213 | + // On wipeAll with an existing repo, pruned tombstones may not appear in diff.deletes, |
| 1214 | + // so scan Pierre for stale record files and remove them. |
| 1215 | + if (changes.wipeAll && headSha) { |
| 1216 | + const putIds = new Set(Object.keys(diff.puts)) |
| 1217 | + const { paths } = await repo.listFiles({ ref: headSha }) |
| 1218 | + for (const path of paths) { |
| 1219 | + if (!path.startsWith('records/')) continue |
| 1220 | + const id = path.slice('records/'.length, -'.json'.length) |
| 1221 | + if (!putIds.has(id)) { |
| 1222 | + commitBuilder.deletePath(path) |
| 1223 | + } |
| 1224 | + } |
| 1225 | + } |
| 1226 | + |
| 1227 | + try { |
| 1228 | + const result = await commitBuilder.send().catch((e) => { |
| 1229 | + if (e instanceof RefUpdateError && e.message.match('no changes to commit')) { |
| 1230 | + return null |
| 1231 | + } |
| 1232 | + throw e |
| 1233 | + }) |
| 1234 | + |
| 1235 | + this.pierreState = { |
| 1236 | + headSha: result ? result.refUpdate.newSha : headSha, |
| 1237 | + documentClock, |
| 1238 | + } |
| 1239 | + return |
| 1240 | + } catch (error) { |
| 1241 | + if (error instanceof RefUpdateError) { |
| 1242 | + console.warn('Pierre CAS conflict, retrying:', error.message) |
| 1243 | + this.pierreState = null |
| 1244 | + continue |
| 1245 | + } |
| 1246 | + throw error |
| 1247 | + } |
| 1248 | + } |
| 1249 | + console.error('Pierre: exhausted CAS retries') |
1159 | 1250 | } catch (error) { |
1160 | | - // Log but don't fail the main persist operation |
1161 | 1251 | console.error('Failed to persist to Pierre:', error) |
1162 | 1252 | this.reportError(error) |
1163 | 1253 | } |
@@ -1368,5 +1458,15 @@ async function listAllObjectKeys(bucket: R2Bucket, prefix: string): Promise<stri |
1368 | 1458 | const PERSIST_RETRIES_NOTIFY_THRESHOLD = 10 |
1369 | 1459 | const PERSIST_RETRIES_MAX = 100 |
1370 | 1460 |
|
1371 | | -const SNAPSHOT_FILE_NAME = 'snapshot.json' |
1372 | 1461 | const PIERRE_AUTHOR = { email: 'huppy@tldraw.com', name: 'huppy [bot]' } |
| 1462 | + |
| 1463 | +interface PierreState { |
| 1464 | + headSha: string | undefined |
| 1465 | + documentClock: number |
| 1466 | +} |
| 1467 | + |
| 1468 | +/** Shape of meta.json stored in Pierre archives. */ |
| 1469 | +export interface PierreMeta { |
| 1470 | + documentClock: number |
| 1471 | + schema: RoomSnapshot['schema'] |
| 1472 | +} |
0 commit comments