Skip to content

Commit 6bca256

Browse files
pyphiliakim
andauthored
feat(page): gracefully fallback on error (#1970)
* feat: close connection if update is corrupted * refactor: fix errors * refactor: remove commented code --------- Co-authored-by: kim <kim.phanhoang@epfl.ch>
1 parent 9d6eb5c commit 6bca256

3 files changed

Lines changed: 205 additions & 21 deletions

File tree

src/services/item/plugins/page/WSDoc.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,8 @@ export class WSDoc extends Y.Doc {
9797
}
9898
} catch (err) {
9999
console.error(err);
100-
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
101-
// @ts-expect-error
102-
doc.emit('error', [err]);
100+
// close connection because receive message is unsupported
101+
this.closeConn(conn, 1003);
103102
}
104103
}
105104

@@ -127,14 +126,14 @@ export class WSDoc extends Y.Doc {
127126
* Close given connection
128127
* @param conn connection to close
129128
*/
130-
closeConn(conn: WebSocket) {
131-
console.debug('Page: close connection');
129+
closeConn(conn: WebSocket, code = 1000, reason?: string) {
130+
console.debug('Page: close connection', code, reason);
132131
if (this.conns.has(conn)) {
133132
const controlledIds: Set<number> = this.conns.get(conn)!;
134133
this.conns.delete(conn);
135134
awarenessProtocol.removeAwarenessStates(this.awareness, Array.from(controlledIds), null);
136135
this.destroy();
137136
}
138-
conn.close();
137+
conn.close(code, reason);
139138
}
140139
}

src/services/item/plugins/page/page.controller.test.ts

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { faker } from '@faker-js/faker';
22
import { desc, eq } from 'drizzle-orm';
33
import { StatusCodes } from 'http-status-codes';
4+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
5+
// @ts-expect-error
6+
import * as encoding from 'lib0/encoding';
47
import { AddressInfo } from 'net';
58
import { v4 } from 'uuid';
69
import waitForExpect from 'wait-for-expect';
@@ -28,6 +31,7 @@ import { db } from '../../../../drizzle/db';
2831
import { itemsRawTable, pageUpdateTable } from '../../../../drizzle/schema';
2932
import { assertIsDefined } from '../../../../utils/assertions';
3033
import { assertIsMemberForTest } from '../../../authentication';
34+
import { MESSAGE_SYNC_CODE } from './constants';
3135
import { PageItemService } from './page.service';
3236

3337
async function getAppPort(app: FastifyInstance) {
@@ -36,6 +40,14 @@ async function getAppPort(app: FastifyInstance) {
3640
return port;
3741
}
3842

43+
async function expectServerToBeResponsive(app: FastifyInstance) {
44+
const result = await app.inject({
45+
method: 'GET',
46+
url: '/version',
47+
});
48+
expect(result.statusCode).toEqual(StatusCodes.OK);
49+
}
50+
3951
async function connectToItemWs(
4052
app: FastifyInstance,
4153
itemId: string,
@@ -340,6 +352,82 @@ describe('Page routes tests', () => {
340352
provider1.destroy();
341353
provider2.destroy();
342354
});
355+
356+
it('Gracefully recover if update is corrupted', async () => {
357+
const {
358+
actor,
359+
items: [item],
360+
} = await seedFromJson({
361+
items: [
362+
{
363+
type: ItemType.PAGE,
364+
memberships: [{ account: 'actor', permission: PermissionLevel.Write }],
365+
},
366+
],
367+
});
368+
assertIsDefined(actor);
369+
mockAuthenticate(actor);
370+
371+
// prefill incorrect update in db
372+
await db
373+
.insert(pageUpdateTable)
374+
.values({ itemId: item.id, clock: 1, update: Buffer.from([1, 2, 3]) });
375+
376+
const { doc, provider: provider1 } = await connectToItemWs(app, item.id);
377+
378+
// connection should close
379+
let hasClosed = false;
380+
provider1.on('connection-close', () => {
381+
hasClosed = true;
382+
});
383+
await waitForExpect(async () => {
384+
expect(hasClosed).toBeTruthy();
385+
await expectServerToBeResponsive(app);
386+
}, 2000);
387+
388+
// cleanup
389+
doc.destroy();
390+
provider1.destroy();
391+
});
392+
393+
it('Gracefully recover if receive corrupted ws message', async () => {
394+
const {
395+
actor,
396+
items: [item],
397+
} = await seedFromJson({
398+
items: [
399+
{
400+
type: ItemType.PAGE,
401+
memberships: [{ account: 'actor', permission: PermissionLevel.Write }],
402+
},
403+
],
404+
});
405+
assertIsDefined(actor);
406+
mockAuthenticate(actor);
407+
const port = await getAppPort(app);
408+
const ws = new WebSocket(`ws://localhost:${port}/items/pages/${item.id}/ws`);
409+
410+
// connection should close
411+
let hasClosed = false;
412+
ws.on('close', () => {
413+
hasClosed = true;
414+
});
415+
416+
// wait for connection to be established before switching user
417+
await waitForExpect(() => {
418+
expect(ws.readyState).toBeTruthy();
419+
}, 2000);
420+
421+
const encoder = encoding.createEncoder();
422+
encoding.writeVarUint(encoder, MESSAGE_SYNC_CODE);
423+
encoding.writeVarUint8Array(encoder, Buffer.from([1, 2, 3]));
424+
ws.send(encoding.toUint8Array(encoder));
425+
426+
await waitForExpect(async () => {
427+
expect(hasClosed).toBeTruthy();
428+
await expectServerToBeResponsive(app);
429+
}, 4000);
430+
});
343431
});
344432

345433
describe('GET /items/pages/ws/read', () => {
@@ -653,6 +741,81 @@ describe('Page routes tests', () => {
653741
provider.destroy();
654742
readerProvider.destroy();
655743
});
744+
745+
it('Gracefully recover if update is corrupted', async () => {
746+
const {
747+
actor,
748+
items: [item],
749+
} = await seedFromJson({
750+
items: [
751+
{
752+
type: ItemType.PAGE,
753+
memberships: [{ account: 'actor', permission: PermissionLevel.Read }],
754+
},
755+
],
756+
});
757+
assertIsDefined(actor);
758+
mockAuthenticate(actor);
759+
760+
// prefill incorrect update in db
761+
await db
762+
.insert(pageUpdateTable)
763+
.values({ itemId: item.id, clock: 1, update: Buffer.from([1, 2, 3]) });
764+
765+
const { doc, provider: provider1 } = await connectToItemWs(app, item.id, { readOnly: true });
766+
767+
// connection should close
768+
let hasClosed = false;
769+
provider1.on('connection-close', () => {
770+
hasClosed = true;
771+
});
772+
await waitForExpect(async () => {
773+
expect(hasClosed).toBeTruthy();
774+
}, 2000);
775+
776+
// cleanup
777+
doc.destroy();
778+
provider1.destroy();
779+
});
780+
781+
it('Gracefully recover if receive corrupted ws message', async () => {
782+
const {
783+
actor,
784+
items: [item],
785+
} = await seedFromJson({
786+
items: [
787+
{
788+
type: ItemType.PAGE,
789+
memberships: [{ account: 'actor', permission: PermissionLevel.Read }],
790+
},
791+
],
792+
});
793+
assertIsDefined(actor);
794+
mockAuthenticate(actor);
795+
const port = await getAppPort(app);
796+
const ws = new WebSocket(`ws://localhost:${port}/items/pages/${item.id}/ws/read`);
797+
798+
// connection should close
799+
let hasClosed = false;
800+
ws.on('close', () => {
801+
hasClosed = true;
802+
});
803+
804+
// wait for connection to be established before switching user
805+
await waitForExpect(() => {
806+
expect(ws.readyState).toBeTruthy();
807+
}, 2000);
808+
809+
const encoder = encoding.createEncoder();
810+
encoding.writeVarUint(encoder, MESSAGE_SYNC_CODE);
811+
encoding.writeVarUint8Array(encoder, Buffer.from([1, 2, 3]));
812+
ws.send(encoding.toUint8Array(encoder));
813+
814+
await waitForExpect(async () => {
815+
expect(hasClosed).toBeTruthy();
816+
await expectServerToBeResponsive(app);
817+
}, 4000);
818+
});
656819
});
657820

658821
describe('copy post hook', () => {

src/services/item/plugins/page/setupWSConnection.ts

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/* source: https://github.com/yjs/y-websocket-server/blob/main/src/utils.js */
2+
import { captureException } from '@sentry/node';
23
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
34
// @ts-expect-error
45
import * as encoding from 'lib0/encoding';
@@ -92,24 +93,35 @@ class WSSharedDoc extends WSDoc {
9293
this.bindState(name);
9394
}
9495

95-
closeConn(conn: WebSocket) {
96-
super.closeConn(conn);
96+
closeConn(conn: WebSocket, code = 1000, reason?: string) {
97+
super.closeConn(conn, code, reason);
9798
if (this.conns.size === 0) {
9899
docs.delete(this.name);
99100
}
100101
}
101102

102103
private async bindState(pageId: string) {
103-
// get updates from database and apply on the yjs doc
104-
const persistedYdoc = await this.pageItemService.getById(db, pageId);
105-
const newUpdates = Y.encodeStateAsUpdate(this);
106-
this.pageItemService.storeUpdate(db, pageId, newUpdates);
107-
Y.applyUpdate(this, Y.encodeStateAsUpdate(persistedYdoc));
104+
try {
105+
// get updates from database and apply on the yjs doc
106+
const persistedYdoc = await this.pageItemService.getById(db, pageId);
107+
const newUpdates = Y.encodeStateAsUpdate(this);
108+
this.pageItemService.storeUpdate(db, pageId, newUpdates);
109+
Y.applyUpdate(this, Y.encodeStateAsUpdate(persistedYdoc));
108110

109-
// on yjs document update, the update is store in the database
110-
this.on('update', (update) => {
111-
this.pageItemService.storeUpdate(db, pageId, update);
112-
});
111+
// on yjs document update, the update is store in the database
112+
this.on('update', (update) => {
113+
this.pageItemService.storeUpdate(db, pageId, update);
114+
});
115+
} catch (e) {
116+
console.error('An error occured while binding the state:', e);
117+
// send error to sentry
118+
captureException(e);
119+
120+
this.conns.forEach((v, conn) => {
121+
// close connections for unexpected error
122+
this.closeConn(conn, 1011);
123+
});
124+
}
113125
}
114126
}
115127

@@ -134,12 +146,22 @@ class WSReadDoc extends WSDoc {
134146
}
135147

136148
private async bindState(pageId: string) {
137-
const persistedYdoc = await this.pageItemService.getById(db, pageId);
138-
Y.applyUpdate(this, Y.encodeStateAsUpdate(persistedYdoc), this.SYNC_ORIGIN);
149+
try {
150+
const persistedYdoc = await this.pageItemService.getById(db, pageId);
151+
Y.applyUpdate(this, Y.encodeStateAsUpdate(persistedYdoc), this.SYNC_ORIGIN);
152+
} catch (e) {
153+
console.error('An error occured while binding the state:', e);
154+
// send error to sentry
155+
captureException(e);
156+
this.conns.forEach((v, conn) => {
157+
// close connections for unexpected error
158+
this.closeConn(conn, 1011);
159+
});
160+
}
139161
}
140162

141-
closeConn(conn: WebSocket) {
142-
super.closeConn(conn);
163+
closeConn(conn: WebSocket, code = 1000, reason?: string) {
164+
super.closeConn(conn, code, reason);
143165
if (this.conns.size === 0) {
144166
readDocs.delete(this.name);
145167
}

0 commit comments

Comments
 (0)