Skip to content

Commit de437d8

Browse files
committed
Improve shared database row loading
1 parent de3e340 commit de437d8

9 files changed

Lines changed: 965 additions & 172 deletions

File tree

src/application/database-blob/index.ts

Lines changed: 239 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { stringify as uuidStringify } from 'uuid';
22

3+
import * as Y from 'yjs';
4+
5+
import { hasRowConditionData, invalidateRowConditionCache } from '@/application/database-yjs/condition-value-cache';
36
import { getRowKey } from '@/application/database-yjs/row_meta';
47
import { getCachedProviderDoc, openCollabDBWithProvider } from '@/application/db';
58
import { getCachedRowDoc } from '@/application/services/js-services/cache';
@@ -25,17 +28,69 @@ type PrefetchOptions = {
2528
onSeedsReady?: () => void;
2629
};
2730

31+
type SharedPrefetchEntry = {
32+
priorityRowIds: Set<string>;
33+
onSeedsReadyCallbacks: Set<() => void>;
34+
seedsReady: boolean;
35+
settled?: boolean;
36+
clearWhenSettled?: boolean;
37+
promise?: Promise<database_blob.DatabaseBlobDiffResponse>;
38+
};
39+
2840
const RID_CACHE_PREFIX = 'af_database_blob_rid:';
2941
const APPLY_CONCURRENCY = 6;
3042
const MAX_ROW_DOC_SEEDS = 2000;
3143
const MAX_ROW_DOC_SEEDS_LOOKUP = 10000;
3244

3345
const readyStatus = database_blob.DiffStatus.READY;
46+
const sharedPrefetchEntries = new Map<string, SharedPrefetchEntry>();
3447

3548
function ridCacheKey(databaseId: string) {
3649
return `${RID_CACHE_PREFIX}${databaseId}`;
3750
}
3851

52+
function sharedPrefetchKey(workspaceId: string, databaseId: string) {
53+
return `${workspaceId}:${databaseId}`;
54+
}
55+
56+
function applyPrefetchOptions(entry: SharedPrefetchEntry, options?: PrefetchOptions) {
57+
options?.priorityRowIds?.forEach((rowId) => entry.priorityRowIds.add(rowId));
58+
59+
if (!options?.onSeedsReady) return;
60+
61+
if (entry.seedsReady) {
62+
void Promise.resolve().then(options.onSeedsReady);
63+
return;
64+
}
65+
66+
entry.onSeedsReadyCallbacks.add(options.onSeedsReady);
67+
}
68+
69+
function notifySeedsReady(entry: SharedPrefetchEntry) {
70+
entry.seedsReady = true;
71+
const callbacks = Array.from(entry.onSeedsReadyCallbacks);
72+
73+
entry.onSeedsReadyCallbacks.clear();
74+
callbacks.forEach((callback) => callback());
75+
}
76+
77+
function clearSharedPrefetchEntryAfterSettle(
78+
databaseId: string,
79+
sharedKey: string,
80+
entry: SharedPrefetchEntry
81+
) {
82+
if (entry.clearWhenSettled) return;
83+
entry.clearWhenSettled = true;
84+
85+
entry.promise?.finally(() => {
86+
entry.clearWhenSettled = false;
87+
88+
if ((rowDocSeedCacheRetainCounts.get(databaseId) ?? 0) === 0 && sharedPrefetchEntries.get(sharedKey) === entry) {
89+
clearDatabaseRowDocSeedCache(databaseId);
90+
}
91+
}).catch(() => undefined);
92+
}
93+
3994
function parseRid(rid?: database_blob.IDatabaseBlobRowRid | null): DatabaseBlobRowRid | null {
4095
if (!rid) return null;
4196

@@ -81,6 +136,22 @@ function compareRid(a: DatabaseBlobRowRid, b: DatabaseBlobRowRid) {
81136

82137
const rowDocSeedCache = new Map<string, RowDocSeed>();
83138
const rowDocSeedLookup = new Map<string, RowDocSeed>();
139+
const rowDocSeedDocCache = new Map<string, YDoc>();
140+
const rowDocSeedCacheRetainCounts = new Map<string, number>();
141+
142+
function applySeedToSharedRowDoc(rowKey: string, seed: RowDocSeed) {
143+
const doc = rowDocSeedDocCache.get(rowKey);
144+
145+
if (!doc) return;
146+
147+
try {
148+
applyYDoc(doc, seed.bytes, seed.encoderVersion);
149+
invalidateRowConditionCache(doc);
150+
} catch {
151+
doc.destroy();
152+
rowDocSeedDocCache.delete(rowKey);
153+
}
154+
}
84155

85156
function trimRowDocSeedLookup() {
86157
while (rowDocSeedLookup.size > MAX_ROW_DOC_SEEDS_LOOKUP) {
@@ -92,12 +163,15 @@ function trimRowDocSeedLookup() {
92163
}
93164

94165
function cacheRowDocSeed(rowKey: string, docState?: database_blob.ICollabDocState | null) {
95-
if (getCachedRowDoc(rowKey)) return;
166+
const cachedDoc = getCachedRowDoc(rowKey);
167+
168+
if (hasRowConditionData(cachedDoc)) return;
96169

97170
const seed = getDocState(docState);
98171

99172
if (!seed) return;
100173

174+
applySeedToSharedRowDoc(rowKey, seed);
101175
rowDocSeedCache.set(rowKey, seed);
102176

103177
while (rowDocSeedCache.size > MAX_ROW_DOC_SEEDS) {
@@ -144,8 +218,74 @@ export function takeDatabaseRowDocSeed(rowKey: string): RowDocSeed | null {
144218
return seed;
145219
}
146220

221+
/**
222+
* Non-destructive read of a row doc seed. Multiple database views may need
223+
* the same row seed at the same time: one to build an in-memory doc for
224+
* filter/sort, another to open the IndexedDB-backed row doc for rendering.
225+
*/
226+
export function peekDatabaseRowDocSeed(rowKey: string): RowDocSeed | null {
227+
return rowDocSeedCache.get(rowKey) ?? rowDocSeedLookup.get(rowKey) ?? null;
228+
}
229+
230+
/**
231+
* Shared read-only row doc for filter/sort. Reuses an existing live row doc
232+
* when one is already cached; otherwise builds one shared in-memory doc from
233+
* seed bytes so multiple views of the same database can reuse cell values.
234+
*/
235+
export function getDatabaseRowDocFromSeed(rowKey: string): YDoc | null {
236+
const liveDoc = getCachedRowDoc(rowKey);
237+
238+
if (hasRowConditionData(liveDoc)) return liveDoc;
239+
240+
const cachedDoc = rowDocSeedDocCache.get(rowKey);
241+
242+
if (cachedDoc) {
243+
if (hasRowConditionData(cachedDoc)) return cachedDoc;
244+
cachedDoc.destroy();
245+
rowDocSeedDocCache.delete(rowKey);
246+
}
247+
248+
const seed = peekDatabaseRowDocSeed(rowKey);
249+
250+
if (!seed) return null;
251+
252+
const doc = new Y.Doc({ guid: rowKey }) as YDoc;
253+
254+
try {
255+
applyYDoc(doc, seed.bytes, seed.encoderVersion);
256+
} catch {
257+
doc.destroy();
258+
return null;
259+
}
260+
261+
const dataSection = doc.getMap(YjsEditorKey.data_section);
262+
263+
if (!dataSection.has(YjsEditorKey.database_row)) {
264+
doc.destroy();
265+
return null;
266+
}
267+
268+
rowDocSeedDocCache.set(rowKey, doc);
269+
return doc;
270+
}
271+
147272
export function clearDatabaseRowDocSeedCache(databaseId: string) {
148273
const prefix = `${databaseId}_rows_`;
274+
const sharedPrefetchSuffix = `:${databaseId}`;
275+
let hasUnsettledPrefetch = false;
276+
277+
for (const [key, entry] of sharedPrefetchEntries.entries()) {
278+
if (key.endsWith(sharedPrefetchSuffix)) {
279+
entry.onSeedsReadyCallbacks.clear();
280+
281+
if (entry.promise && !entry.settled) {
282+
clearSharedPrefetchEntryAfterSettle(databaseId, key, entry);
283+
hasUnsettledPrefetch = true;
284+
}
285+
}
286+
}
287+
288+
if (hasUnsettledPrefetch) return;
149289

150290
for (const key of rowDocSeedCache.keys()) {
151291
if (key.startsWith(prefix)) {
@@ -158,6 +298,36 @@ export function clearDatabaseRowDocSeedCache(databaseId: string) {
158298
rowDocSeedLookup.delete(key);
159299
}
160300
}
301+
302+
for (const [key, doc] of rowDocSeedDocCache.entries()) {
303+
if (key.startsWith(prefix)) {
304+
doc.destroy();
305+
rowDocSeedDocCache.delete(key);
306+
}
307+
}
308+
309+
for (const [key, entry] of sharedPrefetchEntries.entries()) {
310+
if (key.endsWith(sharedPrefetchSuffix)) {
311+
entry.onSeedsReadyCallbacks.clear();
312+
sharedPrefetchEntries.delete(key);
313+
}
314+
}
315+
}
316+
317+
export function retainDatabaseRowDocSeedCache(databaseId: string) {
318+
rowDocSeedCacheRetainCounts.set(databaseId, (rowDocSeedCacheRetainCounts.get(databaseId) ?? 0) + 1);
319+
}
320+
321+
export function releaseDatabaseRowDocSeedCache(databaseId: string) {
322+
const count = rowDocSeedCacheRetainCounts.get(databaseId) ?? 0;
323+
324+
if (count > 1) {
325+
rowDocSeedCacheRetainCounts.set(databaseId, count - 1);
326+
return;
327+
}
328+
329+
rowDocSeedCacheRetainCounts.delete(databaseId);
330+
clearDatabaseRowDocSeedCache(databaseId);
161331
}
162332

163333
function maxRidFromDiff(diff: database_blob.DatabaseBlobDiffResponse): DatabaseBlobRowRid | null {
@@ -231,6 +401,7 @@ function applySeedToCachedDoc(rowKey: string, seed: RowDocSeed) {
231401
if (!cachedDoc) return false;
232402

233403
applyYDoc(cachedDoc, seed.bytes, seed.encoderVersion);
404+
invalidateRowConditionCache(cachedDoc);
234405
return true;
235406
}
236407

@@ -259,6 +430,7 @@ function seedRowDocCacheFromDiff(databaseId: string, diff: database_blob.Databas
259430

260431
if (!seed) return;
261432

433+
applySeedToSharedRowDoc(rowKey, seed);
262434
rowDocSeedLookup.set(rowKey, seed);
263435
if (rowDocSeedLookup.size > MAX_ROW_DOC_SEEDS_LOOKUP) {
264436
trimRowDocSeedLookup();
@@ -299,6 +471,7 @@ function seedRowDocCacheFromDiff(databaseId: string, diff: database_blob.Databas
299471

300472
if (!seed) return;
301473

474+
applySeedToSharedRowDoc(rowKey, seed);
302475
rowDocSeedLookup.set(rowKey, seed);
303476
if (rowDocSeedLookup.size > MAX_ROW_DOC_SEEDS_LOOKUP) {
304477
trimRowDocSeedLookup();
@@ -381,6 +554,7 @@ async function applyCollabUpdate(objectId: string, docState: database_blob.IColl
381554
...beforeState,
382555
});
383556
applyYDoc(cachedDoc, state.bytes, state.encoderVersion);
557+
invalidateRowConditionCache(cachedDoc);
384558

385559
const afterState = inspectDocRowData(cachedDoc, objectId);
386560

@@ -609,41 +783,79 @@ export async function prefetchDatabaseBlobDiff(
609783
databaseId: string,
610784
options?: PrefetchOptions
611785
) {
612-
const diff = await fetchReadyDiff(workspaceId, databaseId);
613-
const seedSummary = seedRowDocCacheFromDiff(databaseId, diff, options);
786+
const sharedKey = sharedPrefetchKey(workspaceId, databaseId);
787+
const existingEntry = sharedPrefetchEntries.get(sharedKey);
614788

615-
Log.debug('[Database] blob seed cache prepared', {
616-
databaseId,
617-
...seedSummary,
618-
seedCount: rowDocSeedCache.size,
619-
lookupCount: rowDocSeedLookup.size,
620-
});
789+
if (existingEntry?.promise) {
790+
applyPrefetchOptions(existingEntry, options);
791+
return existingEntry.promise;
792+
}
621793

622-
// Signal that seeds are available before the slow IndexedDB persist
623-
options?.onSeedsReady?.();
794+
const entry: SharedPrefetchEntry = {
795+
priorityRowIds: new Set(),
796+
onSeedsReadyCallbacks: new Set(),
797+
seedsReady: false,
798+
};
624799

625-
const applyStartedAt = Date.now();
800+
applyPrefetchOptions(entry, options);
626801

627-
try {
628-
await applyDiff(databaseId, diff, { seedCache: false });
629-
Log.debug('[Database] blob diff persisted to IndexedDB', {
802+
const promise = (async () => {
803+
const diff = await fetchReadyDiff(workspaceId, databaseId);
804+
const seedSummary = seedRowDocCacheFromDiff(databaseId, diff, {
805+
priorityRowIds: Array.from(entry.priorityRowIds),
806+
});
807+
808+
Log.debug('[Database] blob seed cache prepared', {
630809
databaseId,
631-
durationMs: Date.now() - applyStartedAt,
632-
...summarizeDiff(diff),
810+
...seedSummary,
811+
seedCount: rowDocSeedCache.size,
812+
lookupCount: rowDocSeedLookup.size,
633813
});
634814

635-
const maxRid = maxRidFromDiff(diff);
815+
// Signal that seeds are available before the slow IndexedDB persist.
816+
notifySeedsReady(entry);
817+
818+
const applyStartedAt = Date.now();
819+
820+
try {
821+
await applyDiff(databaseId, diff, { seedCache: false });
822+
Log.debug('[Database] blob diff persisted to IndexedDB', {
823+
databaseId,
824+
durationMs: Date.now() - applyStartedAt,
825+
...summarizeDiff(diff),
826+
});
827+
828+
const maxRid = maxRidFromDiff(diff);
636829

637-
if (maxRid) {
638-
writeCachedRid(databaseId, maxRid);
639-
Log.debug('[Database] blob updated rid cache', { databaseId, maxRid });
830+
if (maxRid) {
831+
writeCachedRid(databaseId, maxRid);
832+
Log.debug('[Database] blob updated rid cache', { databaseId, maxRid });
833+
}
834+
} catch (error) {
835+
Log.warn('[Database] blob diff persist failed', {
836+
databaseId,
837+
error,
838+
});
640839
}
840+
841+
return diff;
842+
})().finally(() => {
843+
entry.settled = true;
844+
});
845+
846+
entry.promise = promise;
847+
sharedPrefetchEntries.set(sharedKey, entry);
848+
849+
try {
850+
return await promise;
641851
} catch (error) {
642-
Log.warn('[Database] blob diff persist failed', {
643-
databaseId,
644-
error,
645-
});
646-
}
852+
const currentEntry = sharedPrefetchEntries.get(sharedKey);
647853

648-
return diff;
854+
if (currentEntry === entry) {
855+
sharedPrefetchEntries.delete(sharedKey);
856+
}
857+
858+
entry.onSeedsReadyCallbacks.clear();
859+
throw error;
860+
}
649861
}

0 commit comments

Comments
 (0)