Skip to content

Commit 9f34803

Browse files
pyphiliakim
andauthored
feat: add read ws (#1967)
* feat: add read ws * test: add test for read doc * refactor: read doc only accepts messages from corresponding doc * fix: delete docs only if no one is connected --------- Co-authored-by: kim <kim.phanhoang@epfl.ch>
1 parent 76a284d commit 9f34803

7 files changed

Lines changed: 729 additions & 134 deletions

File tree

src/app.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ export default async function (instance: FastifyInstance): Promise<void> {
2525

2626
await instance.register(fws, {
2727
errorHandler: (error, conn, _req, _reply) => {
28-
log.error(`graasp-websockets: an error occured: ${error}\n\tDestroying connection`);
28+
log.error(
29+
`graasp-websockets: an error occured: ${error.toString()}\n\tDestroying connection`,
30+
);
2931
conn.terminate();
3032
},
3133
});

src/services/item/plugins/page/README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,40 @@ The yjs's updates are saved in `page_update` and merged (given an update count t
1010

1111
`page_update` is agnostic of the data `update` it contains.
1212

13+
### Permission Levels
14+
15+
There are 2 websockets connection endpoints:
16+
17+
- `items/pages/:id/ws/read`: For read access, the websocket allows to receive updates for this page. It can be connected to if signed out and the item is public. It does not handle awareness.
18+
- `items/pages/:id/ws`: For write access, the websocket allows to receive updates and to send updates.
19+
20+
```mermaid
21+
flowchart TD
22+
A[Writer 1] -->|/ws| E{WSSharedDoc}
23+
B[Writer 2] -->|/ws| E{WSSharedDoc}
24+
B -->|send update, awareness| E
25+
A -->|send update, awareness| E
26+
C[Reader] -->|/ws/read| D{WSReadDoc}
27+
E -->|on: update| D
28+
E -->|on: update| A
29+
E -->|on: update| B
30+
D -->|on: update| C
31+
32+
```
33+
34+
```mermaid
35+
sequenceDiagram
36+
Writer->>WSSharedDoc: /ws
37+
WSSharedDoc-->>Writer: first state
38+
participant WSReadDoc
39+
Reader->>WSReadDoc: /ws/read
40+
WSReadDoc-->>Reader: first state
41+
Writer->>WSSharedDoc: send update
42+
WSSharedDoc->>WSReadDoc: send update
43+
WSReadDoc-->>Reader: broadcast update
44+
45+
```
46+
1347
## Tests
1448

1549
Controller tests use [`y-websocket`](https://github.com/yjs/y-websocket) to connect to the websocket endpoint. This allows to simulate a change in a yjs document to be reflected in the server.
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
2+
// @ts-expect-error
3+
import * as decoding from 'lib0/decoding';
4+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
5+
// @ts-expect-error
6+
import * as encoding from 'lib0/encoding';
7+
import { WebSocket } from 'ws';
8+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
9+
// @ts-expect-error
10+
import * as awarenessProtocol from 'y-protocols/awareness';
11+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
12+
// @ts-expect-error
13+
import * as syncProtocol from 'y-protocols/sync';
14+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
15+
// @ts-expect-error
16+
import * as Y from 'yjs';
17+
18+
import { MESSAGE_AWARENESS_CODE, MESSAGE_SYNC_CODE } from './constants';
19+
20+
const wsReadyStateConnecting = 0;
21+
const wsReadyStateOpen = 1;
22+
23+
/**
24+
* General wrapper of a yjs document
25+
* Broadcast updates to attached connections
26+
*/
27+
export class WSDoc extends Y.Doc {
28+
name: string;
29+
conns: Map<WebSocket, Set<number>>;
30+
enableAwareness: boolean;
31+
awareness: awarenessProtocol.Awareness;
32+
33+
constructor(name: string, enableAwareness: boolean) {
34+
super();
35+
this.name = name;
36+
this.conns = new Map();
37+
this.enableAwareness = enableAwareness;
38+
this.awareness = new awarenessProtocol.Awareness(this);
39+
this.awareness.setLocalState(null);
40+
}
41+
42+
protected broadcastUpdate(update: Uint8Array) {
43+
const encoder = encoding.createEncoder();
44+
encoding.writeVarUint(encoder, MESSAGE_SYNC_CODE);
45+
syncProtocol.writeUpdate(encoder, update);
46+
const message = encoding.toUint8Array(encoder);
47+
48+
this.conns.forEach((_, conn) => this.send(conn, message));
49+
}
50+
51+
/**
52+
* Add a connection to the pool
53+
* @param conn
54+
*/
55+
addConnection(conn: WebSocket) {
56+
this.conns.set(conn, new Set());
57+
// listen and reply to events
58+
conn.on('message', (message: ArrayBuffer) => {
59+
this.messageListener(conn, new Uint8Array(message));
60+
});
61+
}
62+
63+
/*
64+
* Get message from other connections at WS connection level
65+
*/
66+
messageListener(conn: WebSocket, message: Uint8Array) {
67+
try {
68+
const encoder = encoding.createEncoder();
69+
const decoder = decoding.createDecoder(message);
70+
const messageType = decoding.readVarUint(decoder);
71+
switch (messageType) {
72+
case MESSAGE_SYNC_CODE:
73+
encoding.writeVarUint(encoder, MESSAGE_SYNC_CODE);
74+
syncProtocol.readSyncMessage(decoder, encoder, this, conn);
75+
76+
// If the `encoder` only contains the type of reply message and no
77+
// message, there is no need to send the message. When `encoder` only
78+
// contains the type of reply, its length is 1.
79+
if (encoding.length(encoder) > 1) {
80+
this.send(conn, encoding.toUint8Array(encoder));
81+
}
82+
break;
83+
case MESSAGE_AWARENESS_CODE: {
84+
if (this.enableAwareness) {
85+
awarenessProtocol.applyAwarenessUpdate(
86+
this.awareness,
87+
decoding.readVarUint8Array(decoder),
88+
conn,
89+
);
90+
}
91+
break;
92+
}
93+
}
94+
} catch (err) {
95+
console.error(err);
96+
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
97+
// @ts-expect-error
98+
doc.emit('error', [err]);
99+
}
100+
}
101+
102+
/**
103+
* Send message to given connection
104+
* @param conn connection to receive message
105+
* @param m message
106+
*/
107+
send(conn: WebSocket, m: Uint8Array) {
108+
if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) {
109+
this.closeConn(conn);
110+
}
111+
try {
112+
conn.send(m, {}, (err) => {
113+
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
114+
err != null && this.closeConn(conn);
115+
});
116+
} catch (e) {
117+
console.error(e);
118+
this.closeConn(conn);
119+
}
120+
}
121+
122+
/**
123+
* Close given connection
124+
* @param conn connection to close
125+
*/
126+
closeConn(conn: WebSocket) {
127+
console.debug('Page: close connection');
128+
if (this.conns.has(conn)) {
129+
const controlledIds: Set<number> = this.conns.get(conn)!;
130+
this.conns.delete(conn);
131+
awarenessProtocol.removeAwarenessStates(this.awareness, Array.from(controlledIds), null);
132+
this.destroy();
133+
}
134+
conn.close();
135+
}
136+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export const MESSAGE_SYNC_CODE = 0;
2+
export const MESSAGE_AWARENESS_CODE = 1;
3+
4+
export const PING_TIMEOUT = 30000;

0 commit comments

Comments
 (0)