Skip to content

Commit 7b6dfb2

Browse files
authored
Fix dynacast on re-publish. (#941)
On a reconnect, the tracks are re-published. When that happens if dynacast is active and a layer is disabled, nothing is sent on a re-publish and so the SFU never sees the track re-published. Use a `disabled` flag in `LocalTrack` to keep a separate state from mute and ensure that track is enabled on a (re)-publish. Added an integration test (thanks to Claude for the test).
1 parent 736700e commit 7b6dfb2

4 files changed

Lines changed: 145 additions & 3 deletions

File tree

integration_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,3 +1068,132 @@ func TestE2EE_H264RoundTrip(t *testing.T) {
10681068
require.NoError(t, err)
10691069
require.NotNil(t, dec)
10701070
}
1071+
1072+
// TestDynacastRepublish exercises two behaviours around simulcast dynacast:
1073+
//
1074+
// 1. Dynacast: a SubscribedQualityUpdate from the server disables the simulcast
1075+
// layers no subscriber needs. A disabled layer's LocalTrack stops writing
1076+
// samples (LocalTrack.disabled gates the write worker).
1077+
// 2. Re-publish reset: when tracks are re-published (e.g. on a full reconnect,
1078+
// which reuses the same *LocalTrack objects via republishTracks), the
1079+
// per-layer disabled flag is reset so every layer resumes publishing until
1080+
// the new SFU issues its own dynacast update. Without the reset, a layer
1081+
// disabled before the reconnect would stay dark forever.
1082+
//
1083+
// The dynacast update is injected through handleSubscribedQualityUpdate, which
1084+
// is exactly the path Room.OnSubscribedQualityUpdate drives for a real server
1085+
// message, so the layer-disabling wiring is covered end-to-end. The re-publish
1086+
// is driven by a real full reconnect.
1087+
func TestDynacastRepublish(t *testing.T) {
1088+
if apiKey == "" || apiSecret == "" {
1089+
t.Skip("no LIVEKIT_KEYS; requires a running livekit-server")
1090+
}
1091+
1092+
videoCodec := webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000}
1093+
const videoName = "dynacast_video"
1094+
1095+
var subVideoRTP atomic.Int32
1096+
subCB := &RoomCallback{
1097+
ParticipantCallback: ParticipantCallback{
1098+
OnTrackSubscribed: func(track *webrtc.TrackRemote, publication *RemoteTrackPublication, _ *RemoteParticipant) {
1099+
if publication.Name() != videoName {
1100+
return
1101+
}
1102+
go func() {
1103+
for {
1104+
if _, _, err := track.ReadRTP(); err != nil {
1105+
return
1106+
}
1107+
subVideoRTP.Add(1)
1108+
}
1109+
}()
1110+
},
1111+
},
1112+
}
1113+
sub, err := createAgent(t.Name(), subCB, "subscriber-dynacast")
1114+
require.NoError(t, err)
1115+
defer sub.Disconnect()
1116+
1117+
// simTracks is ordered [LOW, MEDIUM, HIGH]; republishTracks reuses these
1118+
// same objects, so they remain the canonical layer handles across reconnect.
1119+
simTracks := newSimulcastSampleTracks(t, videoCodec, "SC_"+videoName)
1120+
require.Len(t, simTracks, 3)
1121+
layerOf := map[livekit.VideoQuality]*LocalTrack{
1122+
livekit.VideoQuality_LOW: simTracks[0],
1123+
livekit.VideoQuality_MEDIUM: simTracks[1],
1124+
livekit.VideoQuality_HIGH: simTracks[2],
1125+
}
1126+
1127+
// OnReconnected fires right after OnRestarted -> republishTracks (which resets
1128+
// the disabled flags) and before the new SFU issues its own dynacast update.
1129+
// Snapshot the flags here so the "re-enabled" assertion is race-free against a
1130+
// later server update that could re-disable unwatched layers.
1131+
var reconnected, allEnabledAfterRepublish atomic.Bool
1132+
pubCB := &RoomCallback{
1133+
OnReconnected: func() {
1134+
allEnabled := true
1135+
for _, tr := range simTracks {
1136+
if tr.disabled.Load() {
1137+
allEnabled = false
1138+
}
1139+
}
1140+
allEnabledAfterRepublish.Store(allEnabled)
1141+
reconnected.Store(true)
1142+
},
1143+
}
1144+
pub, err := createAgent(t.Name(), pubCB, "publisher-dynacast")
1145+
require.NoError(t, err)
1146+
defer pub.Disconnect()
1147+
1148+
videoPub, err := pub.LocalParticipant.PublishSimulcastTrack(simTracks, &TrackPublicationOptions{Name: videoName})
1149+
require.NoError(t, err)
1150+
require.NotNil(t, videoPub)
1151+
require.NotEmpty(t, videoPub.SID())
1152+
1153+
// end-to-end: the subscriber receives video RTP
1154+
require.Eventually(t, func() bool {
1155+
return subVideoRTP.Load() > 0
1156+
}, 15*time.Second, 100*time.Millisecond, "subscriber should receive simulcast video RTP")
1157+
1158+
// all layers start enabled (not disabled)
1159+
for q, tr := range layerOf {
1160+
require.False(t, tr.disabled.Load(), "layer %s should start enabled", q)
1161+
}
1162+
1163+
// (1) dynacast: server reports only HIGH is needed -> LOW and MEDIUM disabled.
1164+
// handleSubscribedQualityUpdate applies the layer flags synchronously.
1165+
pub.LocalParticipant.handleSubscribedQualityUpdate(&livekit.SubscribedQualityUpdate{
1166+
TrackSid: videoPub.SID(),
1167+
SubscribedCodecs: []*livekit.SubscribedCodec{
1168+
{
1169+
Codec: "vp8",
1170+
Qualities: []*livekit.SubscribedQuality{
1171+
{Quality: livekit.VideoQuality_LOW, Enabled: false},
1172+
{Quality: livekit.VideoQuality_MEDIUM, Enabled: false},
1173+
{Quality: livekit.VideoQuality_HIGH, Enabled: true},
1174+
},
1175+
},
1176+
},
1177+
})
1178+
require.True(t, layerOf[livekit.VideoQuality_LOW].disabled.Load(), "LOW must be disabled by dynacast")
1179+
require.True(t, layerOf[livekit.VideoQuality_MEDIUM].disabled.Load(), "MEDIUM must be disabled by dynacast")
1180+
require.False(t, layerOf[livekit.VideoQuality_HIGH].disabled.Load(), "HIGH must stay enabled")
1181+
1182+
// (2) re-publish reset via a full reconnect: OnRestarted -> republishTracks
1183+
// re-publishes these same tracks, which must clear the disabled flags.
1184+
reconnected.Store(false)
1185+
pub.Simulate(SimulateNodeFailure)
1186+
require.Eventually(t, func() bool {
1187+
return reconnected.Load()
1188+
}, 20*time.Second, 100*time.Millisecond, "publisher should complete a full reconnect")
1189+
1190+
// the fix under test: re-publishing the (previously dynacast-disabled) tracks
1191+
// reset every layer's disabled flag to false, captured at reconnect time.
1192+
require.True(t, allEnabledAfterRepublish.Load(), "re-published layers must reset disabled to false")
1193+
1194+
// end-to-end: video RTP resumes after the re-publish
1195+
baseline := subVideoRTP.Load()
1196+
require.Eventually(t, func() bool {
1197+
return subVideoRTP.Load() > baseline
1198+
}, 20*time.Second, 100*time.Millisecond, "subscriber should receive video RTP again after re-publish")
1199+
}

localparticipant.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ func (p *LocalParticipant) prepareSimulcastTrackPublication(tracks []*LocalTrack
227227
if track.videoLayer == nil || track.RID() == "" {
228228
return nil, nil, nil, ErrInvalidSimulcastTrack
229229
}
230+
231+
// disable dynacast on a re-publication
232+
track.setDisabled(false)
230233
}
231234

232235
pubOptions := &LocalTrackPublishOptions{}
@@ -318,6 +321,11 @@ func (p *LocalParticipant) prepareSimulcastTrackPublication(tracks []*LocalTrack
318321
},
319322
}
320323
if len(pubOptions.backupCodecTracks) > 0 {
324+
for _, track := range pubOptions.backupCodecTracks {
325+
// disable dynacast on a re-publication
326+
track.setDisabled(false)
327+
}
328+
321329
backupTracksCopy := make([]*LocalTrack, len(pubOptions.backupCodecTracks))
322330
copy(backupTracksCopy, pubOptions.backupCodecTracks)
323331

localtrack.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type LocalTrack struct {
8383
onRTCP func(rtcp.Packet)
8484

8585
muted atomic.Bool
86+
disabled atomic.Bool
8687
disconnected atomic.Bool
8788
cancelWrite func()
8889
writeClosed chan struct{}
@@ -584,6 +585,10 @@ func (s *LocalTrack) setMuted(muted bool) {
584585
s.muted.Store(muted)
585586
}
586587

588+
func (s *LocalTrack) setDisabled(disabled bool) {
589+
s.disabled.Store(disabled)
590+
}
591+
587592
func (s *LocalTrack) setDisconnected(disconnected bool) {
588593
s.disconnected.Store(disconnected)
589594
}
@@ -671,7 +676,7 @@ func (s *LocalTrack) writeWorker(provider SampleProvider, onComplete func()) {
671676
return
672677
}
673678

674-
if !s.muted.Load() {
679+
if !s.muted.Load() && !s.disabled.Load() {
675680
var opts *SampleWriteOptions
676681
if isAudioProvider {
677682
level := audioProvider.CurrentAudioLevel()

publication.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ func (p *LocalTrackPublication) setPublishingCodecsQuality(subscribedCodecs []*l
490490
for _, subscribedQuality := range subscribedCodec.Qualities {
491491
track := p.GetSimulcastTrack(subscribedQuality.Quality)
492492
if track != nil {
493-
track.setMuted(!subscribedQuality.Enabled)
493+
track.setDisabled(!subscribedQuality.Enabled)
494494
p.log.Infow(
495495
"updating layer enable",
496496
"trackID", p.SID(),
@@ -544,7 +544,7 @@ func (p *LocalTrackPublication) setPublishingCodecsQuality(subscribedCodecs []*l
544544
for _, subscribedQuality := range subscribedCodec.Qualities {
545545
track := backupCodecTracksForSimulcast[subscribedQuality.Quality]
546546
if track != nil {
547-
track.setMuted(!subscribedQuality.Enabled)
547+
track.setDisabled(!subscribedQuality.Enabled)
548548
p.log.Infow(
549549
"updating layer enable",
550550
"trackID", p.SID(),

0 commit comments

Comments
 (0)