Skip to content

Commit 6c7ebc0

Browse files
authored
fix: handle race between LocalTrackSubscribed signal and publishTrack completion (#1872)
1 parent 0752be3 commit 6c7ebc0

2 files changed

Lines changed: 65 additions & 16 deletions

File tree

.changeset/curvy-bugs-live.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-client": patch
3+
---
4+
5+
fix: handle race between `LocalTrackSubscribed` signal and `publishTrack` completion

src/room/Room.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -588,22 +588,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
588588
this.emit(RoomEvent.DCBufferStatusChanged, status, kind);
589589
})
590590
.on(EngineEvent.LocalTrackSubscribed, (subscribedSid) => {
591-
const trackPublication = this.localParticipant
592-
.getTrackPublications()
593-
.find(({ trackSid }) => trackSid === subscribedSid) as LocalTrackPublication | undefined;
594-
if (!trackPublication) {
595-
this.log.warn(
596-
'could not find local track subscription for subscribed event',
597-
this.logContext,
598-
);
599-
return;
600-
}
601-
this.localParticipant.emit(ParticipantEvent.LocalTrackSubscribed, trackPublication);
602-
this.emitWhenConnected(
603-
RoomEvent.LocalTrackSubscribed,
604-
trackPublication,
605-
this.localParticipant,
606-
);
591+
this.handleLocalTrackSubscribed(subscribedSid);
607592
})
608593
.on(EngineEvent.RoomMoved, (roomMoved) => {
609594
this.log.debug('room moved', roomMoved);
@@ -1626,6 +1611,65 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
16261611
}
16271612
}
16281613

1614+
private handleLocalTrackSubscribed(subscribedSid: string) {
1615+
const findPublication = () =>
1616+
this.localParticipant
1617+
.getTrackPublications()
1618+
.find(({ trackSid }) => trackSid === subscribedSid) as LocalTrackPublication | undefined;
1619+
1620+
const trackPublication = findPublication();
1621+
if (trackPublication) {
1622+
this.emitLocalTrackSubscribed(trackPublication);
1623+
return;
1624+
}
1625+
1626+
// the track publication may not be registered yet if the server signals
1627+
// the subscription before publishTrack has finished adding the publication.
1628+
// defer with a timeout until LocalTrackPublished fires for the matching trackSid
1629+
this.log.debug('deferring LocalTrackSubscribed, publication not yet available', {
1630+
...this.logContext,
1631+
subscribedSid,
1632+
});
1633+
1634+
const TIMEOUT_MS = 10_000;
1635+
let timer: ReturnType<typeof setTimeout>;
1636+
1637+
const onPublished = (pub: LocalTrackPublication) => {
1638+
if (pub.trackSid === subscribedSid) {
1639+
cleanup();
1640+
this.emitLocalTrackSubscribed(pub);
1641+
}
1642+
};
1643+
1644+
const cleanup = () => {
1645+
clearTimeout(timer);
1646+
this.localParticipant.off(ParticipantEvent.LocalTrackPublished, onPublished);
1647+
this.off(RoomEvent.Disconnected, cleanup);
1648+
};
1649+
1650+
this.localParticipant.on(ParticipantEvent.LocalTrackPublished, onPublished);
1651+
this.once(RoomEvent.Disconnected, cleanup);
1652+
1653+
timer = setTimeout(() => {
1654+
cleanup();
1655+
// final attempt in case the publication was added without emitting the event
1656+
const pub = findPublication();
1657+
if (pub) {
1658+
this.emitLocalTrackSubscribed(pub);
1659+
} else {
1660+
this.log.warn(
1661+
'could not find local track publication for LocalTrackSubscribed event after timeout',
1662+
{ ...this.logContext, subscribedSid },
1663+
);
1664+
}
1665+
}, TIMEOUT_MS);
1666+
}
1667+
1668+
private emitLocalTrackSubscribed(trackPublication: LocalTrackPublication) {
1669+
this.localParticipant.emit(ParticipantEvent.LocalTrackSubscribed, trackPublication);
1670+
this.emitWhenConnected(RoomEvent.LocalTrackSubscribed, trackPublication, this.localParticipant);
1671+
}
1672+
16291673
private handleRestarting = () => {
16301674
this.clearConnectionReconcile();
16311675
// in case we went from resuming to full-reconnect, make sure to reflect it on the isResuming flag

0 commit comments

Comments
 (0)