-
Notifications
You must be signed in to change notification settings - Fork 270
Expand file tree
/
Copy pathSignalAPI.ts
More file actions
148 lines (117 loc) · 4.09 KB
/
Copy pathSignalAPI.ts
File metadata and controls
148 lines (117 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import { SignalRequest, SignalResponse, JoinResponse } from '@livekit/protocol';
import type { ITransport } from './SignalTransport';
import { Future, getClientInfo } from '../room/utils';
import { atomic } from '../decorators';
export class SignalAPI {
private writer?: WritableStreamDefaultWriter<SignalRequest>;
private promiseMap = new Map<string, Future<SignalResponse>>();
private offerId = 0;
private transport: ITransport;
private sequenceNumber = 0;
private latestRemoteSequenceNumber = 0;
constructor(transport: ITransport) {
this.transport = transport;
}
@atomic
async join(url: string, token: string, connectOpts: ConnectOpts): Promise<JoinResponse> {
const clientInfo = getClientInfo();
const { readableStream, writableStream } = await this.transport.connect({ url, token, clientInfo, connectOpts });
const reader = readableStream.getReader();
const { done, value } = await reader.read();
reader.releaseLock();
if(value?.message?.case !== 'join') {
throw new Error('Expected join response');
}
if(done || !value) {
throw new Error('Connection closed without join response');
}
this.readLoop(readableStream);
this.writer = writableStream.getWriter();
return value.message.value;
}
async readLoop(readableStream: ReadableStream<SignalResponse>) {
const reader = readableStream.getReader();
while (true) {
try {
const { done, value } = await reader.read();
if (done || !value) break;
const resolverId = getResolverId(value.message);
if(resolverId) {
const responseKey = getResponseKey(value.message.case, resolverId);
const future = this.promiseMap.get(responseKey);
if (future) {
future.resolve?.(value);
continue;
}
}
switch(value.message.case) {
case 'join':
case 'answer':
case 'requestResponse':
console.warn(`received ${value.message.case} these should all be handled by the promise map`);
break;
case 'leave':
value.message.value.
this.close();
break;
default:
console.debug(`received unsupported message ${value.message.case} `);
break;
}
} catch(e) {
Array.from(this.promiseMap.values()).forEach(future => future.reject?.(e));
this.promiseMap.clear();
break;
}
}
}
@atomic
async sendOfferAndAwaitAnswer(offer: RTCSessionDescriptionInit): Promise<SessionDescription> {
// const offerId = this.offerId++;
// if(!this.writer) {
// throw new Error('Writable stream not initialized');
// }
// const request = new SessionDescription({
// type: 'offer',
// sdp: offer.sdp,
// // id: offer.id,
// });
// await this.writer.write([this.createClientRequest({ case: 'offer', value: request })]);
// const future = new Future<Signalv2ServerMessage>();
// // we want an answer for this offer so we queue up a future for it
// this.promiseMap.set(getResponseKey('answer', offerId), future);
// const answerResponse = await future.promise;
// if(answerResponse.message.case === 'answer') {
// return answerResponse.message.value;
// }
throw new Error('Answer not found');
}
private getNextSequencer(): Sequencer {
return new Sequencer({
messageId: this.sequenceNumber++,
lastProcessedRemoteMessageId: this.latestRemoteSequenceNumber,
});
}
// @loggedMethod
async reconnect(): Promise<void> {
//return this.transport.reconnect();
}
// @loggedMethod
close() {
return this.transport.disconnect();
}
}
function getResponseKey(requestType: SignalResponse['message']['case'], messageId: number) {
return `${requestType}-${messageId}`;
}
function getResolverId(message: SignalResponse['message']) {
if(typeof message.value !== 'object') {
return null;
}
if('requestId' in message.value) {
return message.value.requestId;
} else if('id' in message.value) {
return message.value.id;
}
return null;
}