Skip to content

Commit 7ee0668

Browse files
appflowyclaude
andauthored
fix: send row document source when creating row pages (#418)
* fix: send row document source when creating row pages * Fix favorite view sync * feat(sync): send before/after Yjs state vectors on collab updates Attach before_state_vector/after_state_vector (lib0 v1) to outbound collab Update messages so the server can detect missing updates. Adds proto fields 5/6 to Update (regenerated messages.js/.d.ts), captures the vectors from the Yjs transaction in sync-protocol onUpdate, and threads them through the persistent sync outbox (a merged drain spans the first record's before-vector to the last record's after-vector). Covered by new outbox tests. Backward compatible: empty vectors are treated as absent by the server, so this is inert when the server feature is off. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Lmq1LHFqqDTchv9K2bGEKR * perf(sync): drop after_state_vector from web collab updates The server never trusts a client-provided after vector (it derives the post-update state from the update bytes itself), so sending it was wasted work. Compute and send only before_state_vector: one Y.encodeStateVector call per update instead of two, and fewer bytes on the wire. Removes the field from the outbox record and all build sites (immediate/broadcast, IDB-fallback, merge-drain) and from handleSyncRequest. The Update proto still declares field 6 to stay aligned with the server wire contract; it is simply left unset (serializes as absent -> None on the server). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Lmq1LHFqqDTchv9K2bGEKR --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent b615fa5 commit 7ee0668

20 files changed

Lines changed: 353 additions & 58 deletions

File tree

src/application/database-yjs/context.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
LoadView,
1717
LoadViewMeta,
1818
RowId,
19+
RowDocumentSourcePayload,
1920
Subscription,
2021
TestDatabasePromptConfig,
2122
TimeFormat,
@@ -84,7 +85,7 @@ export interface DatabaseContextState {
8485
* Only available in app mode - not provided in publish mode.
8586
* Returns the doc_state (Y.js update) to initialize the local document.
8687
*/
87-
createRowDocument?: (documentId: string) => Promise<Uint8Array | null>;
88+
createRowDocument?: (documentId: string, source?: RowDocumentSourcePayload) => Promise<Uint8Array | null>;
8889
/** Fire-and-forget: ask the server to duplicate the row document with inline DB deep copy. */
8990
duplicateRowDocument?: (databaseId: string, sourceRowId: string, newRowId: string, clientDocStateB64?: string) => Promise<void>;
9091
navigateToView?: (viewId: string, blockId?: string) => Promise<void>;

src/application/db/tables/sync_outbox.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ export interface SyncOutboxRecord {
99
version?: string | null;
1010
payload: Uint8Array;
1111
createdAt: number;
12+
// Yjs state vector (lib0 v1) of the doc state before this local edit, so the
13+
// server can detect missing updates. Optional: older rows written before this
14+
// field existed drain without it (legacy server path). Not indexed, so no
15+
// schema version bump is needed. We intentionally don't store an after vector —
16+
// the server derives the post-update state itself and never trusts a client one.
17+
beforeStateVector?: Uint8Array;
1218
}
1319

1420
export type SyncOutboxTable = {

src/application/services/js-services/http/page-api.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export async function favoritePageView(
4848
workspaceId: string,
4949
viewId: string,
5050
isFavorite: boolean,
51-
isPinned: boolean = false
51+
isPinned: boolean = true
5252
): Promise<void> {
5353
const url = `/api/workspace/${workspaceId}/page-view/${viewId}/favorite`;
5454

src/application/services/js-services/http/view-api.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { AppOutlineResponse } from '@/application/services/services.type';
2-
import { View } from '@/application/types';
2+
import { CreateOrphanedViewPayload, View } from '@/application/types';
33

44
import { APIResponse, executeAPIRequest, getAxios } from './core';
55

@@ -149,7 +149,7 @@ export async function getAppTrash(workspaceId: string) {
149149
);
150150
}
151151

152-
export async function createOrphanedView(workspaceId: string, payload: { document_id: string }): Promise<Uint8Array> {
152+
export async function createOrphanedView(workspaceId: string, payload: CreateOrphanedViewPayload): Promise<Uint8Array> {
153153
const url = `/api/workspace/${workspaceId}/orphaned-view`;
154154

155155
// Server returns doc_state as Vec<u8> which is JSON encoded as number[]

src/application/services/js-services/sync-protocol.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,19 @@ const handleSyncRequest = (ctx: SyncContext, message: collab.ISyncRequest): void
8484
version: doc.version,
8585
bytes: update.byteLength,
8686
});
87-
// send the update containing new data back to the server
87+
// send the update containing new data back to the server. This is a
88+
// manifest-style diff, so the causal `before` vector is the server's own
89+
// advertised state (what it told us it has). No after vector — the server
90+
// derives its own post-update state and never trusts a client-provided one.
8891
emit({
8992
collabMessage: {
9093
objectId: doc.guid,
9194
collabType: ctx.collabType,
9295
update: {
9396
flags: UpdateFlags.Lib0v1,
9497
payload: update,
95-
version: doc.version
98+
version: doc.version,
99+
beforeStateVector: stateVector,
96100
},
97101
},
98102
});
@@ -193,16 +197,24 @@ export const initSync = (ctx: SyncContext) => {
193197

194198
ctx.discardPendingUpdates = () => deleteOutboxByObjectId(doc.guid);
195199

196-
const onUpdate = (update: Uint8Array, origin: string) => {
200+
const onUpdate = (update: Uint8Array, origin: string, _doc: Y.Doc, transaction: Y.Transaction) => {
197201
if (origin === 'remote') {
198202
return; // Ignore remote updates
199203
}
200204

205+
// Causal metadata for server-side missing-update detection: the state vector
206+
// before this edit. Yjs already computed `transaction.beforeState`, so we just
207+
// encode it (lib0 v1, to match the server). We deliberately do NOT send an
208+
// after vector — the server never trusts a client-provided one (it derives the
209+
// post-update state from the update bytes itself), so it would be wasted work.
210+
const beforeStateVector = Y.encodeStateVector(transaction.beforeState);
211+
201212
enqueueOutboxUpdate({
202213
objectId: doc.guid,
203214
collabType,
204215
version: doc.version ?? null,
205216
payload: update,
217+
beforeStateVector,
206218
});
207219
ctx.onLocalUpdate?.(doc.guid);
208220
};

src/application/sync-outbox/__tests__/sync-outbox.test.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ interface MockSyncOutboxRecord {
1111
version?: string | null;
1212
payload: Uint8Array;
1313
createdAt: number;
14+
beforeStateVector?: Uint8Array;
1415
}
1516

1617
let mockRecords: MockSyncOutboxRecord[] = [];
@@ -199,6 +200,79 @@ describe('sync outbox live send', () => {
199200
expect(mockRecords).toHaveLength(0);
200201
});
201202

203+
it('attaches the before state vector to an immediately sent update (and no after)', async () => {
204+
const send = jest.fn();
205+
206+
configureDrain({
207+
userId,
208+
workspaceId,
209+
send,
210+
isReady: () => true,
211+
});
212+
213+
const beforeStateVector = new Uint8Array([1, 2, 3]);
214+
215+
enqueueOutboxUpdate({
216+
objectId,
217+
collabType: Types.Document,
218+
version: null,
219+
payload: makeUpdate('draft'),
220+
beforeStateVector,
221+
});
222+
223+
expect(send).toHaveBeenCalledTimes(1);
224+
const update = send.mock.calls[0][0].collabMessage.update;
225+
226+
expect(update.beforeStateVector).toBe(beforeStateVector);
227+
expect(update.afterStateVector).toBeUndefined();
228+
});
229+
230+
it('takes the merged drain before vector from the first record (and sends no after)', async () => {
231+
let ready = false;
232+
const send = jest.fn();
233+
234+
configureDrain({
235+
userId,
236+
workspaceId,
237+
send,
238+
isReady: () => ready,
239+
});
240+
241+
const firstBefore = new Uint8Array([10]);
242+
const lastBefore = new Uint8Array([20]);
243+
244+
enqueueOutboxUpdate({
245+
objectId,
246+
collabType: Types.Document,
247+
version: null,
248+
payload: makeUpdate('old'),
249+
beforeStateVector: firstBefore,
250+
});
251+
enqueueOutboxUpdate({
252+
objectId,
253+
collabType: Types.Document,
254+
version: null,
255+
payload: makeUpdate('new'),
256+
beforeStateVector: lastBefore,
257+
});
258+
await flushPromises();
259+
260+
expect(send).not.toHaveBeenCalled();
261+
expect(mockRecords).toHaveLength(2);
262+
263+
ready = true;
264+
startDrainAll();
265+
await flushPromises();
266+
267+
expect(send).toHaveBeenCalledTimes(1);
268+
const update = send.mock.calls[0][0].collabMessage.update;
269+
270+
// Merged payload covers first->last, so before = the first record's before.
271+
expect(update.beforeStateVector).toBe(firstBefore);
272+
expect(update.afterStateVector).toBeUndefined();
273+
expect(mockRecords).toHaveLength(0);
274+
});
275+
202276
it('live sends the current update and drains older queued records', async () => {
203277
let ready = false;
204278
const send = jest.fn();

src/application/sync-outbox/index.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ export function enqueueOutboxUpdate(record: Omit<SyncOutboxRecord, 'id' | 'creat
225225
flags: FLAGS_LIB0V1,
226226
payload: record.payload,
227227
version: record.version ?? undefined,
228+
beforeStateVector: record.beforeStateVector,
228229
} as collab.IUpdate,
229230
},
230231
};
@@ -490,7 +491,9 @@ async function distinctObjectIdsForSession(userId: string, workspaceId: string):
490491
return Array.from(ids);
491492
}
492493

493-
function buildUpdateMessage(record: Pick<SyncOutboxRecord, 'objectId' | 'collabType' | 'payload' | 'version'>): messages.IMessage {
494+
function buildUpdateMessage(
495+
record: Pick<SyncOutboxRecord, 'objectId' | 'collabType' | 'payload' | 'version' | 'beforeStateVector'>,
496+
): messages.IMessage {
494497
return {
495498
collabMessage: {
496499
objectId: record.objectId,
@@ -499,6 +502,7 @@ function buildUpdateMessage(record: Pick<SyncOutboxRecord, 'objectId' | 'collabT
499502
flags: FLAGS_LIB0V1,
500503
payload: record.payload,
501504
version: record.version ?? undefined,
505+
beforeStateVector: record.beforeStateVector,
502506
} as collab.IUpdate,
503507
},
504508
};
@@ -580,11 +584,18 @@ async function drainObjectWhileReady(objectId: string): Promise<void> {
580584

581585
if (records.length === 0) return;
582586

587+
// Records are sorted by id (insertion order), so first/last bound the batch.
588+
const firstRecord = records[0];
589+
const lastRecord = records[records.length - 1];
590+
583591
const merged = records.length === 1
584-
? records[0].payload
592+
? firstRecord.payload
585593
: Y.mergeUpdates(records.map((r) => r.payload));
586-
const collabType = records[records.length - 1].collabType as Types;
587-
const version = records[records.length - 1].version;
594+
const collabType = lastRecord.collabType as Types;
595+
const version = lastRecord.version;
596+
// The merged payload spans from the first queued edit to the last, so its
597+
// causal `before` is the first record's before-vector.
598+
const beforeStateVector = firstRecord.beforeStateVector;
588599

589600
const message: messages.IMessage = {
590601
collabMessage: {
@@ -594,6 +605,7 @@ async function drainObjectWhileReady(objectId: string): Promise<void> {
594605
flags: FLAGS_LIB0V1,
595606
payload: merged,
596607
version: version ?? undefined,
608+
beforeStateVector,
597609
} as collab.IUpdate,
598610
},
599611
};

src/application/types.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,10 @@ export interface View {
12031203
has_children?: boolean;
12041204
is_published: boolean;
12051205
is_private: boolean;
1206+
/** Whether this view is currently in the user's favorites. Synced via the folder. */
1207+
is_favorite?: boolean;
1208+
/** Favorite-section pin state returned by the favorites endpoint. */
1209+
is_pinned?: boolean;
12061210
/** Whether the page is locked (read-only) for everyone until unlocked. Synced via the folder. */
12071211
is_locked?: boolean;
12081212
last_edited_time?: string;
@@ -1348,6 +1352,17 @@ export interface UpdatePagePayload {
13481352
is_locked?: boolean;
13491353
}
13501354

1355+
export interface RowDocumentSourcePayload {
1356+
database_id: string;
1357+
database_view_id: string;
1358+
row_id: string;
1359+
}
1360+
1361+
export interface CreateOrphanedViewPayload {
1362+
document_id: string;
1363+
row_document_source?: RowDocumentSourcePayload;
1364+
}
1365+
13511366
export type ViewMetaCover = ViewCover;
13521367

13531368
export interface ViewMetaProps {
@@ -1396,7 +1411,7 @@ export interface ViewComponentProps {
13961411
* Create a row document on the server (orphaned view).
13971412
* Only available in app mode - not provided in publish mode.
13981413
*/
1399-
createRowDocument?: (documentId: string) => Promise<Uint8Array | null>;
1414+
createRowDocument?: (documentId: string, source?: RowDocumentSourcePayload) => Promise<Uint8Array | null>;
14001415
duplicateRowDocument?: (
14011416
databaseId: string,
14021417
sourceRowId: string,

src/components/app/contexts/AppOperationsContext.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { SyncContext } from '@/application/services/js-services/sync-protocol';
55
import {
66
CreateDatabaseViewPayload,
77
CreateDatabaseViewResponse,
8+
CreateOrphanedViewPayload,
89
DuplicatePageOperationOptions,
910
CreatePagePayload,
1011
CreatePageResponse,
@@ -15,6 +16,7 @@ import {
1516
LoadDatabasePrompts,
1617
LoadView,
1718
LoadViewMeta,
19+
RowDocumentSourcePayload,
1820
Subscription,
1921
TestDatabasePromptConfig,
2022
TextCount,
@@ -113,7 +115,7 @@ export interface AppOperationsContextType {
113115

114116
// ── Database operations ────────────────────────────────────────────
115117
/** Create an orphaned view (e.g. for inline database within a document). */
116-
createOrphanedView?: (payload: { document_id: string }) => Promise<Uint8Array>;
118+
createOrphanedView?: (payload: CreateOrphanedViewPayload) => Promise<Uint8Array>;
117119
/** Load AI prompt templates for a database. */
118120
loadDatabasePrompts?: LoadDatabasePrompts;
119121
/** Test an AI prompt config against a database. */
@@ -123,7 +125,7 @@ export interface AppOperationsContextType {
123125
/** Load an existing row document. */
124126
loadRowDocument?: (documentId: string) => Promise<YDoc | null>;
125127
/** Create a new row document (returns encoded initial state). */
126-
createRowDocument?: (documentId: string) => Promise<Uint8Array | null>;
128+
createRowDocument?: (documentId: string, source?: RowDocumentSourcePayload) => Promise<Uint8Array | null>;
127129
/** Fire-and-forget: ask the server to duplicate the row document with inline DB deep copy. */
128130
duplicateRowDocument?: (databaseId: string, sourceRowId: string, newRowId: string, clientDocStateB64?: string) => Promise<void>;
129131
/** Resolve a database ID to its primary view ID. */

0 commit comments

Comments
 (0)