Skip to content

Commit 4a15d4b

Browse files
authored
Fix web sync fallback during reconnect (#350)
* Fix web sync fallback during reconnect * Fix lint in reconnect sync fallback
1 parent 429807e commit 4a15d4b

9 files changed

Lines changed: 1267 additions & 36 deletions

File tree

src/application/services/js-services/__tests__/sync.test.ts

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,113 @@ const mockSync = (clientCount: number, tracer = defaultTracer): SyncContext[] =>
9696
return clients;
9797
};
9898

99+
interface QueuedSyncBus {
100+
clients: SyncContext[];
101+
online: boolean[];
102+
texts: Y.Text[];
103+
disconnect: (index: number) => void;
104+
reconnect: (index: number) => void;
105+
publishManifest: (index: number) => void;
106+
publishManifests: () => void;
107+
drain: () => void;
108+
}
109+
110+
const createQueuedSyncBus = (clientCount: number): QueuedSyncBus => {
111+
outboxMock.__clearTestClients();
112+
113+
const guid = random.uuidv4();
114+
const clients: SyncContext[] = [];
115+
const online = Array.from({ length: clientCount }, () => true);
116+
const queue: Array<{ sender: number; message: messages.IMessage }> = [];
117+
118+
const enqueue = (sender: number, message: messages.IMessage) => {
119+
if (!online[sender]) return;
120+
queue.push({ sender, message });
121+
};
122+
123+
for (let i = 0; i < clientCount; i += 1) {
124+
const doc = new Y.Doc({ guid });
125+
126+
clients.push({
127+
doc,
128+
awareness: new awarenessProtocol.Awareness(doc),
129+
emit: (message) => enqueue(i, message),
130+
collabType: Types.Document,
131+
});
132+
}
133+
134+
clients.forEach((client) => initSync(client));
135+
queue.length = 0;
136+
137+
const publishManifest = (index: number) => {
138+
const client = clients[index];
139+
140+
client.emit({
141+
collabMessage: {
142+
objectId: client.doc.guid,
143+
collabType: client.collabType,
144+
syncRequest: {
145+
stateVector: Y.encodeStateVector(client.doc),
146+
lastMessageId: { timestamp: 0, counter: 0 },
147+
version: client.doc.version,
148+
},
149+
},
150+
});
151+
};
152+
153+
const publishManifests = () => {
154+
clients.forEach((_, index) => publishManifest(index));
155+
};
156+
157+
const drain = () => {
158+
let guard = 0;
159+
160+
while (queue.length > 0) {
161+
if (guard > 1_000) {
162+
throw new Error('sync queue did not settle');
163+
}
164+
165+
guard += 1;
166+
const { sender, message } = queue.shift()!;
167+
168+
if (!online[sender] || !message.collabMessage) continue;
169+
170+
clients.forEach((client, index) => {
171+
if (index !== sender && online[index]) {
172+
handleMessage(client, message.collabMessage!);
173+
}
174+
});
175+
}
176+
};
177+
178+
const disconnect = (index: number) => {
179+
online[index] = false;
180+
};
181+
182+
const reconnect = (index: number) => {
183+
online[index] = true;
184+
publishManifests();
185+
};
186+
187+
return {
188+
clients,
189+
online,
190+
texts: clients.map((client) => client.doc.getText('test')),
191+
disconnect,
192+
reconnect,
193+
publishManifest,
194+
publishManifests,
195+
drain,
196+
};
197+
};
198+
199+
const expectTextConvergence = (texts: Y.Text[], expectedChars: string) => {
200+
const values = texts.map((text) => text.toString());
201+
202+
expect(new Set(values).size).toBe(1);
203+
expect([...values[0]].sort().join('')).toBe(expectedChars);
204+
};
205+
99206
describe('sync protocol', () => {
100207
it('should exchange updates between client and server', () => {
101208
const [local, remote] = mockSync(2);
@@ -114,4 +221,146 @@ describe('sync protocol', () => {
114221
txt2.insert(5, ' World');
115222
expect(txt1.toString()).toEqual('Hello World');
116223
});
224+
225+
it('converges three online clients after concurrent same-position edits', () => {
226+
const { texts, publishManifests, drain } = createQueuedSyncBus(3);
227+
228+
texts[0].insert(0, 'A');
229+
texts[1].insert(0, 'B');
230+
texts[2].insert(0, 'C');
231+
232+
publishManifests();
233+
drain();
234+
235+
expectTextConvergence(texts, 'ABC');
236+
});
237+
238+
it('converges when one client edits offline and then reconnects', () => {
239+
const { texts, disconnect, reconnect, publishManifest, drain } = createQueuedSyncBus(3);
240+
241+
disconnect(2);
242+
243+
texts[0].insert(0, 'A');
244+
texts[1].insert(0, 'B');
245+
texts[2].insert(0, 'C');
246+
247+
publishManifest(0);
248+
publishManifest(1);
249+
drain();
250+
251+
expect(texts[0].toString()).toEqual(texts[1].toString());
252+
expect([...texts[0].toString()].sort().join('')).toBe('AB');
253+
expect(texts[2].toString()).toBe('C');
254+
255+
reconnect(2);
256+
drain();
257+
258+
expectTextConvergence(texts, 'ABC');
259+
});
260+
261+
it('converges when two offline clients reconnect in ascending order', () => {
262+
const { texts, disconnect, reconnect, publishManifest, drain } = createQueuedSyncBus(3);
263+
264+
disconnect(0);
265+
disconnect(1);
266+
267+
texts[0].insert(0, 'A');
268+
texts[1].insert(0, 'B');
269+
texts[2].insert(0, 'C');
270+
271+
publishManifest(2);
272+
drain();
273+
expect(texts.map((text) => text.toString())).toEqual(['A', 'B', 'C']);
274+
275+
reconnect(0);
276+
drain();
277+
expect(texts[0].toString()).toEqual(texts[2].toString());
278+
expect([...texts[0].toString()].sort().join('')).toBe('AC');
279+
expect(texts[1].toString()).toBe('B');
280+
281+
reconnect(1);
282+
drain();
283+
284+
expectTextConvergence(texts, 'ABC');
285+
});
286+
287+
it('converges when two offline clients reconnect in reverse order', () => {
288+
const { texts, disconnect, reconnect, publishManifest, drain } = createQueuedSyncBus(3);
289+
290+
disconnect(0);
291+
disconnect(1);
292+
293+
texts[0].insert(0, 'A');
294+
texts[1].insert(0, 'B');
295+
texts[2].insert(0, 'C');
296+
297+
publishManifest(2);
298+
drain();
299+
expect(texts.map((text) => text.toString())).toEqual(['A', 'B', 'C']);
300+
301+
reconnect(1);
302+
drain();
303+
expect(texts[1].toString()).toEqual(texts[2].toString());
304+
expect([...texts[1].toString()].sort().join('')).toBe('BC');
305+
expect(texts[0].toString()).toBe('A');
306+
307+
reconnect(0);
308+
drain();
309+
310+
expectTextConvergence(texts, 'ABC');
311+
});
312+
313+
it('converges when all clients edit offline and reconnect left to right', () => {
314+
const { texts, disconnect, reconnect, drain } = createQueuedSyncBus(3);
315+
316+
disconnect(0);
317+
disconnect(1);
318+
disconnect(2);
319+
320+
texts[0].insert(0, 'A');
321+
texts[1].insert(0, 'B');
322+
texts[2].insert(0, 'C');
323+
324+
reconnect(0);
325+
drain();
326+
expect(texts.map((text) => text.toString())).toEqual(['A', 'B', 'C']);
327+
328+
reconnect(1);
329+
drain();
330+
expect(texts[0].toString()).toEqual(texts[1].toString());
331+
expect([...texts[0].toString()].sort().join('')).toBe('AB');
332+
expect(texts[2].toString()).toBe('C');
333+
334+
reconnect(2);
335+
drain();
336+
337+
expectTextConvergence(texts, 'ABC');
338+
});
339+
340+
it('converges when all clients edit offline and reconnect right to left', () => {
341+
const { texts, disconnect, reconnect, drain } = createQueuedSyncBus(3);
342+
343+
disconnect(0);
344+
disconnect(1);
345+
disconnect(2);
346+
347+
texts[0].insert(0, 'A');
348+
texts[1].insert(0, 'B');
349+
texts[2].insert(0, 'C');
350+
351+
reconnect(2);
352+
drain();
353+
expect(texts.map((text) => text.toString())).toEqual(['A', 'B', 'C']);
354+
355+
reconnect(1);
356+
drain();
357+
expect(texts[1].toString()).toEqual(texts[2].toString());
358+
expect([...texts[1].toString()].sort().join('')).toBe('BC');
359+
expect(texts[0].toString()).toBe('A');
360+
361+
reconnect(0);
362+
drain();
363+
364+
expectTextConvergence(texts, 'ABC');
365+
});
117366
});

src/application/services/js-services/http/collab-api.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,14 @@ import { Log } from '@/utils/log';
1313

1414
import { APIResponse, APIError, executeAPIRequest, executeAPIVoidRequest, getAxios, parseRetryAfterSecs } from './core';
1515

16+
export interface CollabFullSyncBatchResult {
17+
objectId: string;
18+
collabType: Types;
19+
missingUpdate: Uint8Array;
20+
serverStateVector: Uint8Array;
21+
error?: string;
22+
}
23+
1624
export async function updateCollab(
1725
workspaceId: string,
1826
objectId: string,
@@ -61,7 +69,7 @@ export async function collabFullSyncBatch(
6169
stateVector: Uint8Array;
6270
docState: Uint8Array;
6371
}>
64-
): Promise<void> {
72+
): Promise<CollabFullSyncBatchResult[]> {
6573
const url = `/api/workspace/v1/${workspaceId}/collab/full-sync/batch`;
6674

6775
// Build the protobuf request
@@ -113,6 +121,7 @@ export async function collabFullSyncBatch(
113121
// Decode and check the response for errors
114122
const responseData = new Uint8Array(response.data);
115123
const batchResponse = collab.CollabBatchSyncResponse.decode(responseData);
124+
const results: CollabFullSyncBatchResult[] = [];
116125

117126
// Check for any errors in the results
118127
for (const result of batchResponse.results) {
@@ -123,7 +132,17 @@ export async function collabFullSyncBatch(
123132
error: result.error,
124133
});
125134
}
135+
136+
results.push({
137+
objectId: result.objectId ?? '',
138+
collabType: result.collabType as Types,
139+
missingUpdate: result.missingUpdate ?? new Uint8Array(),
140+
serverStateVector: result.serverStateVector ?? new Uint8Array(),
141+
error: result.error || undefined,
142+
});
126143
}
144+
145+
return results;
127146
}
128147

129148
export async function getCollab(workspaceId: string, objectId: string, collabType: Types) {

src/application/services/js-services/sync-protocol.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ export interface SyncContext {
4040
* Used by version reset flows where pending updates are stale and must be discarded.
4141
*/
4242
discardPendingUpdates?: () => Promise<void>;
43+
/**
44+
* Called after a local Yjs update is observed. The WebSocket path still owns
45+
* immediate delivery; this hook lets the app schedule a debounced HTTP full
46+
* sync when the WebSocket is reconnecting.
47+
*/
48+
onLocalUpdate?: (objectId: string) => void;
49+
/**
50+
* Called after this context participates in a WebSocket state-vector sync
51+
* exchange. A completed manifest-style exchange reconciles the full Yjs
52+
* state, so HTTP fallback dirty markers for the object can be cleared when
53+
* the socket is open.
54+
*/
55+
onManifestSync?: (objectId: string) => void;
4356
/**
4457
* Cleanup function to remove update/awareness observers and cancel debounced sends.
4558
* Set by initSync, called during deferred sync context cleanup.
@@ -83,6 +96,7 @@ const handleSyncRequest = (ctx: SyncContext, message: collab.ISyncRequest): void
8396
},
8497
},
8598
});
99+
ctx.onManifestSync?.(doc.guid);
86100
};
87101

88102
const handleAccessChanged = (ctx: SyncContext, message: collab.IAccessChanged): void => {
@@ -190,6 +204,7 @@ export const initSync = (ctx: SyncContext) => {
190204
version: doc.version ?? null,
191205
payload: update,
192206
});
207+
ctx.onLocalUpdate?.(doc.guid);
193208
};
194209

195210
const onDestroy = () => {

0 commit comments

Comments
 (0)