Skip to content

Commit fb32606

Browse files
authored
Implement negotiation tracking based on offerId (#1927)
1 parent 95025ff commit fb32606

4 files changed

Lines changed: 357 additions & 34 deletions

File tree

.changeset/eight-eggs-obey.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-client": patch
3+
---
4+
5+
Implement negotiation tracking based on offerId

src/room/PCTransport.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { Mutex } from '@livekit/mutex';
22
import { EventEmitter } from 'events';
33
import { parse, write } from 'sdp-transform';
4-
import type { MediaDescription, SessionDescription } from 'sdp-transform';
4+
import type { MediaAttributes, MediaDescription, SessionDescription } from 'sdp-transform';
5+
import type TypedEmitter from 'typed-emitter';
56
import log, { LoggerNames, getLogger } from '../logger';
67
import { debounce } from './debounce';
78
import { NegotiationError, UnexpectedConnectionState } from './errors';
@@ -29,11 +30,16 @@ const debounceInterval = 20;
2930
export const PCEvents = {
3031
NegotiationStarted: 'negotiationStarted',
3132
NegotiationComplete: 'negotiationComplete',
33+
// Fired with the offerId for every successful publisher answer application,
34+
// including answers that immediately recurse into another offer via
35+
// `renegotiate`. Use this rather than NegotiationComplete to know that a
36+
// specific offer has been negotiated end-to-end.
37+
OfferAnswered: 'offerAnswered',
3238
RTPVideoPayloadTypes: 'rtpVideoPayloadTypes',
3339
} as const;
3440

3541
/** @internal */
36-
export default class PCTransport extends EventEmitter {
42+
export default class PCTransport extends (EventEmitter as new () => TypedEmitter<PCTransportEventCallbacks>) {
3743
private _pc: RTCPeerConnection | null;
3844

3945
private get pc() {
@@ -51,7 +57,9 @@ export default class PCTransport extends EventEmitter {
5157

5258
private ddExtID = 0;
5359

54-
private latestOfferId: number = 0;
60+
latestOfferId: number = 0;
61+
62+
latestAcknowledgedOfferId: number = 0;
5563

5664
private offerLock: Mutex;
5765

@@ -236,6 +244,14 @@ export default class PCTransport extends EventEmitter {
236244
this.pendingCandidates = [];
237245
this.restartingIce = false;
238246

247+
// Fire OfferAnswered for every successfully applied answer, including the
248+
// ones that recurse into another offer via `renegotiate`. Callers waiting
249+
// on a specific offerId can resolve as soon as their offer's answer is in.
250+
if (sd.type === 'answer') {
251+
this.latestAcknowledgedOfferId = offerId;
252+
this.emit(PCEvents.OfferAnswered, offerId);
253+
}
254+
239255
if (this.renegotiate) {
240256
this.renegotiate = false;
241257
await this.createAndSendOffer();
@@ -737,3 +753,10 @@ function ensureIPAddrMatchVersion(media: MediaDescription) {
737753
function getMidString(mid: string | number) {
738754
return typeof mid === 'number' ? mid.toFixed(0) : mid;
739755
}
756+
757+
type PCTransportEventCallbacks = {
758+
negotiationStarted: () => void;
759+
negotiationComplete: () => void;
760+
offerAnswered: (offerId: number) => void;
761+
rtpVideoPayloadTypes: (attributes: MediaAttributes['rtp']) => void;
762+
};
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
import { EventEmitter } from 'events';
2+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
3+
import { PCEvents } from './PCTransport';
4+
import { PCTransportManager } from './PCTransportManager';
5+
6+
/**
7+
* Yield to the microtask queue so any then/catch handlers chained to a promise
8+
* have a chance to run. Sufficient for "is this promise still pending right
9+
* now?" assertions; nothing in these tests depends on real elapsed time.
10+
*/
11+
const flushMicrotasks = () => Promise.resolve();
12+
13+
class StubPC {
14+
iceConnectionState: RTCIceConnectionState = 'new';
15+
16+
signalingState: RTCSignalingState = 'stable';
17+
18+
connectionState: RTCPeerConnectionState = 'new';
19+
20+
onicecandidate: ((ev: RTCPeerConnectionIceEvent) => void) | null = null;
21+
22+
onicecandidateerror: ((ev: Event) => void) | null = null;
23+
24+
oniceconnectionstatechange: (() => void) | null = null;
25+
26+
onsignalingstatechange: (() => void) | null = null;
27+
28+
onconnectionstatechange: (() => void) | null = null;
29+
30+
ondatachannel: ((ev: RTCDataChannelEvent) => void) | null = null;
31+
32+
ontrack: ((ev: RTCTrackEvent) => void) | null = null;
33+
34+
getTransceivers() {
35+
return [];
36+
}
37+
38+
getSenders() {
39+
return [];
40+
}
41+
42+
close() {}
43+
44+
setConfiguration() {}
45+
}
46+
47+
class FakePublisher extends EventEmitter {
48+
latestOfferId = 0;
49+
50+
latestAcknowledgedOfferId = 0;
51+
52+
negotiate = vi.fn(async (_onError?: (e: Error) => void) => {});
53+
54+
/** Simulate a publisher offer cycle: bump latestOfferId. */
55+
startOffer() {
56+
this.latestOfferId += 1;
57+
return this.latestOfferId;
58+
}
59+
60+
/** Simulate a successful answer for the given offerId. */
61+
answer(offerId: number) {
62+
this.latestAcknowledgedOfferId = offerId;
63+
this.emit(PCEvents.OfferAnswered, offerId);
64+
}
65+
}
66+
67+
describe('PCTransportManager.negotiate', () => {
68+
let originalRTCPeerConnection: unknown;
69+
70+
beforeEach(() => {
71+
originalRTCPeerConnection = (globalThis as unknown as { RTCPeerConnection?: unknown })
72+
.RTCPeerConnection;
73+
(globalThis as unknown as { RTCPeerConnection: unknown }).RTCPeerConnection = StubPC;
74+
});
75+
76+
afterEach(() => {
77+
(globalThis as unknown as { RTCPeerConnection: unknown }).RTCPeerConnection =
78+
originalRTCPeerConnection;
79+
});
80+
81+
function makeManager() {
82+
const manager = new PCTransportManager('publisher-only', {});
83+
const fake = new FakePublisher();
84+
(manager as unknown as { publisher: FakePublisher }).publisher = fake;
85+
manager.peerConnectionTimeout = 200;
86+
return { manager, pub: fake };
87+
}
88+
89+
it('resolves when an offer past the checkpoint is answered', async () => {
90+
const { manager, pub } = makeManager();
91+
const p = manager.negotiate(new AbortController());
92+
93+
const id = pub.startOffer();
94+
pub.answer(id);
95+
96+
await expect(p).resolves.toBeUndefined();
97+
});
98+
99+
it('does not resolve on answers for offers at or before the checkpoint', async () => {
100+
const { manager, pub } = makeManager();
101+
// Some prior cycle is in flight with id=5 at the moment we capture our
102+
// checkpoint. Its answer must NOT satisfy our request — our changes
103+
// weren't in offer 5.
104+
pub.latestOfferId = 5;
105+
const ac = new AbortController();
106+
const p = manager.negotiate(ac);
107+
108+
let settled = false;
109+
p.then(
110+
() => {
111+
settled = true;
112+
},
113+
() => {
114+
settled = true;
115+
},
116+
);
117+
118+
pub.answer(5);
119+
await flushMicrotasks();
120+
expect(settled).toBe(false);
121+
122+
ac.abort();
123+
await expect(p).rejects.toThrow(/aborted/);
124+
});
125+
126+
it('resolves through the renegotiate-recursion path', async () => {
127+
// Reproduces the field shape: we capture checkpoint=N while an offer N is
128+
// in flight. The answer for N arrives (renegotiate=true on the publisher,
129+
// so it doesn't satisfy us), then a follow-up offer N+1 is created and
130+
// answered. We resolve on the second answer.
131+
const { manager, pub } = makeManager();
132+
pub.latestOfferId = 1;
133+
const p = manager.negotiate(new AbortController());
134+
135+
pub.answer(1); // does not satisfy checkpoint=1
136+
137+
const id = pub.startOffer(); // 2
138+
pub.answer(id);
139+
140+
await expect(p).resolves.toBeUndefined();
141+
});
142+
143+
it('resolves immediately when an answer past the checkpoint already arrived', async () => {
144+
const { manager, pub } = makeManager();
145+
pub.latestOfferId = 3;
146+
pub.latestAcknowledgedOfferId = 4;
147+
await expect(manager.negotiate(new AbortController())).resolves.toBeUndefined();
148+
});
149+
150+
it('resolves concurrent callers independently at their own checkpoints', async () => {
151+
const { manager, pub } = makeManager();
152+
153+
// A captures checkpoint=0
154+
const a = manager.negotiate(new AbortController());
155+
156+
// First cycle starts; B captures checkpoint=1 (offer now in flight)
157+
const id1 = pub.startOffer();
158+
const b = manager.negotiate(new AbortController());
159+
160+
let bResolved = false;
161+
b.then(() => {
162+
bResolved = true;
163+
});
164+
165+
// The first answer satisfies A (1 > 0) but not B (1 > 1 is false).
166+
pub.answer(id1);
167+
await a;
168+
expect(bResolved).toBe(false);
169+
170+
// The next cycle's answer satisfies B.
171+
const id2 = pub.startOffer();
172+
pub.answer(id2);
173+
await b;
174+
expect(bResolved).toBe(true);
175+
});
176+
177+
it('rejects when the deadline elapses', async () => {
178+
const { manager } = makeManager();
179+
await expect(manager.negotiate(new AbortController())).rejects.toThrow(/timed out/);
180+
});
181+
182+
it('rejects when the abort signal fires', async () => {
183+
const { manager } = makeManager();
184+
const ac = new AbortController();
185+
const p = manager.negotiate(ac);
186+
ac.abort();
187+
await expect(p).rejects.toThrow(/aborted/);
188+
});
189+
190+
it('rejects when publisher.negotiate invokes its error callback', async () => {
191+
const { manager, pub } = makeManager();
192+
pub.negotiate.mockImplementationOnce(async (onError?: (e: Error) => void) => {
193+
onError?.(new Error('publisher boom'));
194+
});
195+
await expect(manager.negotiate(new AbortController())).rejects.toThrow(/publisher boom/);
196+
});
197+
198+
describe('listener cleanup', () => {
199+
it('after success', async () => {
200+
const { manager, pub } = makeManager();
201+
const p = manager.negotiate(new AbortController());
202+
const id = pub.startOffer();
203+
pub.answer(id);
204+
await p;
205+
expect(pub.listenerCount(PCEvents.OfferAnswered)).toBe(0);
206+
});
207+
208+
it('after non-matching answer (still pending), then abort', async () => {
209+
const { manager, pub } = makeManager();
210+
pub.latestOfferId = 5;
211+
const ac = new AbortController();
212+
const p = manager.negotiate(ac);
213+
pub.answer(5); // does not satisfy checkpoint=5
214+
expect(pub.listenerCount(PCEvents.OfferAnswered)).toBe(1);
215+
ac.abort();
216+
await expect(p).rejects.toThrow(/aborted/);
217+
expect(pub.listenerCount(PCEvents.OfferAnswered)).toBe(0);
218+
});
219+
220+
it('after deadline', async () => {
221+
const { manager, pub } = makeManager();
222+
await expect(manager.negotiate(new AbortController())).rejects.toThrow(/timed out/);
223+
expect(pub.listenerCount(PCEvents.OfferAnswered)).toBe(0);
224+
});
225+
226+
it('after abort', async () => {
227+
const { manager, pub } = makeManager();
228+
const ac = new AbortController();
229+
const p = manager.negotiate(ac);
230+
ac.abort();
231+
await expect(p).rejects.toThrow(/aborted/);
232+
expect(pub.listenerCount(PCEvents.OfferAnswered)).toBe(0);
233+
});
234+
235+
it('after publisher.negotiate errors', async () => {
236+
const { manager, pub } = makeManager();
237+
pub.negotiate.mockImplementationOnce(async (onError?: (e: Error) => void) => {
238+
onError?.(new Error('publisher boom'));
239+
});
240+
await expect(manager.negotiate(new AbortController())).rejects.toThrow(/publisher boom/);
241+
expect(pub.listenerCount(PCEvents.OfferAnswered)).toBe(0);
242+
});
243+
244+
it('does not leak across many sequential negotiate() calls', async () => {
245+
const { manager, pub } = makeManager();
246+
for (let i = 0; i < 12; i += 1) {
247+
const p = manager.negotiate(new AbortController());
248+
const id = pub.startOffer();
249+
pub.answer(id);
250+
await p;
251+
}
252+
expect(pub.listenerCount(PCEvents.OfferAnswered)).toBe(0);
253+
});
254+
});
255+
256+
// Regression test for publishing call getting stuck
257+
// With the old design, NegotiationStarted firing faster than
258+
// peerConnectionTimeout kept resetting the timer indefinitely while
259+
// NegotiationComplete was suppressed by an unconverging `renegotiate` cycle,
260+
// wedging the publishTrack Promise. The offerId-checkpoint design resolves
261+
// on the first answer past the checkpoint, regardless of how many cycles
262+
// start in between.
263+
it('does not hang when many spurious cycles start without converging on the checkpoint', async () => {
264+
const { manager, pub } = makeManager();
265+
pub.latestOfferId = 1; // an unrelated cycle is in flight
266+
const p = manager.negotiate(new AbortController());
267+
268+
// NegotiationStarted noise (not listened to anymore) interleaved with an
269+
// answer for the in-flight offer that doesn't satisfy our checkpoint.
270+
pub.emit(PCEvents.NegotiationStarted);
271+
pub.emit(PCEvents.NegotiationStarted);
272+
pub.answer(1); // doesn't satisfy checkpoint=1
273+
pub.emit(PCEvents.NegotiationStarted);
274+
275+
// Eventually a fresh offer is created and answered.
276+
const id = pub.startOffer(); // 2
277+
pub.answer(id);
278+
279+
await expect(p).resolves.toBeUndefined();
280+
});
281+
});

0 commit comments

Comments
 (0)