Skip to content

Commit 374802f

Browse files
JaragonCRclaude
andcommitted
fix: route Mercury AP playlist events to enable unlimited DJ Switch it up
Root cause of the 7-8 Switch it up limit in both zeroconf and interactive mode: the Spotify server delivers vibe-section playlist pushes through TWO separate channels simultaneously — 1. The Dealer WebSocket (hm://playlist/v2/playlist/…) 2. The Mercury AP event channel (PacketTypeMercuryEvent, same URI) go-librespot already subscribed to the Dealer path, so a handful of pushes were processed. But the overwhelming majority — the initial burst of ~68 sections at DJ startup plus 1-4 new sections pushed every ~30 s as switch-ups consume the queue — arrived via PacketTypeMercuryEvent. The Mercury client's receive loop had a hard `continue` after logging each event, so every AP-channel playlist push was silently discarded. Only the small subset that happened to duplicate onto the Dealer channel ever reached the section buffer. With ~6 sections instead of ~68, the buffer ran dry after roughly 7 switch-ups and djPoll fell back to repeating the same 35 lexicon tracks, causing the looping the user observed. Why Mercury is 100% needed -------------------------- TLS capture of the official Spotify desktop client confirmed that during a single DJ session with 15 switch-ups it received 100 playlist-push events across 37 unique vibe-section playlists: • +11 s — initial burst: 68 pushes (34 unique playlists) • +97 s onward — continuous trickle: 1-4 new sections every ~30 s, perfectly correlated with active switch-ups Zero of these appeared as HTTP/2 calls — the desktop makes no lexicon requests at switch-up time. It purely navigates a local queue that the server keeps topped up via Mercury AP events. go-librespot must receive and buffer these events or the queue will always exhaust after a handful of jumps. Changes ------- mercury/client.go - Add eventSubscriber / EventMessage types and eventSubs list to Client. - In the PacketTypeMercuryEvent branch of recvLoop(), route events to any matching subscriber channel (non-blocking, buffered 64) instead of discarding with `continue`. - Add SubscribeEvent(uriPrefixes…) method so callers can tap the stream. cmd/daemon/player.go - Subscribe to "hm://playlist/v2/playlist/" on the Mercury client at startup alongside the existing Dealer subscription. - Add a select case that converts the eventMessage into a dealer.Message and hands it to the existing handleDealerMessage path — no duplicate handling code. - Fix section-buffer gating: remove the ContextUri == djCachedContextUri guard from the playlist-update handler so sections received during a context transition (Switch it up briefly loads a regular playlist) are not silently dropped. cmd/daemon/controls.go - Mirror the proactive low-queue djPoll trigger into skipNext's targeted- skip path. Previously the check only ran inside advanceNext, which is never called when skip_next carries an explicit target track (the DJ Switch it up case), so the lexicon refresh was never scheduled and the queue silently drained to zero before djPoll fired. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent bf5acbb commit 374802f

3 files changed

Lines changed: 95 additions & 17 deletions

File tree

daemon/controls.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,10 @@ func (p *AppPlayer) loadContext(ctx context.Context, spotCtx *connectpb.Context,
273273
p.state.player.ContextUrl = spotCtx.Url
274274
p.state.player.ContextRestrictions = spotCtx.Restrictions
275275
p.app.djCachedContextUri = spotCtx.Uri
276-
p.app.djSectionBuffer = nil // clear on new DJ context so stale sections aren't reused
276+
// Keep djSectionBuffer — sections buffered from a previous handover are still
277+
// valid for a fresh start of the same DJ context. Clearing them would leave
278+
// fresh starts with an empty buffer in interactive mode (where the server only
279+
// pushes a handful of playlists per session, not the full 50).
277280

278281
if p.state.player.ContextMetadata == nil {
279282
p.state.player.ContextMetadata = map[string]string{}
@@ -301,6 +304,15 @@ func (p *AppPlayer) loadContext(ctx context.Context, spotCtx *connectpb.Context,
301304
for k, v := range lexCtx.Metadata {
302305
p.state.player.ContextMetadata[k] = v
303306
}
307+
// Log track metadata to find vibe section playlist URIs.
308+
for i, t := range staticTracks {
309+
su := t.Metadata["station_uri"]
310+
sc := t.Metadata["source.components"]
311+
jid := t.Metadata["narration.jump.commentary_id"]
312+
if su != "" || jid != "" {
313+
p.app.log.Debugf("lexicon track[%d] %s station_uri=%q source=%q jump=%q", i, t.Uri, su, sc, jid)
314+
}
315+
}
304316
p.app.djCachedNextTracks = staticTracks
305317
p.app.djCacheIsOurs = true
306318
}
@@ -836,6 +848,22 @@ func (p *AppPlayer) skipNext(ctx context.Context, track *connectpb.ContextTrack)
836848
p.state.player.NextTracks = p.state.tracks.NextTracks(ctx, nil)
837849
p.state.player.Index = p.state.tracks.Index()
838850

851+
// Proactive DJ queue refresh on targeted skip (Switch it up). The
852+
// normal low-queue check lives in advanceNext which is NOT called when
853+
// skip_next carries a target track, so we mirror it here.
854+
isDJSkip := p.state.player.PlayOrigin != nil && p.state.player.PlayOrigin.FeatureIdentifier == "dynamic-sessions"
855+
if isDJSkip && !p.djAwaitingLoad && len(p.state.player.NextTracks) < 8 {
856+
p.app.log.Infof("skipNext: DJ queue low (%d tracks), scheduling lexicon refresh", len(p.state.player.NextTracks))
857+
p.djPollAttempts = 0
858+
if !p.djPollTimer.Stop() {
859+
select {
860+
case <-p.djPollTimer.C:
861+
default:
862+
}
863+
}
864+
p.djPollTimer.Reset(3 * time.Second)
865+
}
866+
839867
if err := p.loadCurrentTrack(ctx, p.state.player.IsPaused, true); err != nil {
840868
// In DJ mode, narration/media clips appear in the queue as spotify:track: but
841869
// return 404 when fetched — skip past them automatically.

daemon/player.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,11 @@ func (p *AppPlayer) handleDealerMessage(ctx context.Context, msg dealer.Message)
222222
p.app.log.WithError(err).Warn("failed loading DJ track from playlist update, reverting to djAwaitingLoad")
223223
p.djAwaitingLoad = true
224224
}
225-
} else if p.state.active && p.state.player.ContextUri == p.app.djCachedContextUri {
226-
// Already playing DJ — buffer this section for later use when the queue runs low.
227-
// We don't rebuild immediately because the lexicon tracks (with jump markers) are
228-
// already in the queue. When the queue drops below 8, djPoll will pop from this
229-
// buffer to extend with fresh variety instead of looping the same 15 lexicon tracks.
225+
} else {
226+
// Buffer this section for later use when the queue runs low. We buffer
227+
// unconditionally here (not gated on ContextUri) because the push can
228+
// arrive while a temporary regular-playlist context is active (e.g.
229+
// during a Switch-it-up transition), and we must not silently drop it.
230230
p.app.djSectionBuffer = append(p.app.djSectionBuffer, newTracks)
231231
p.app.log.Debugf("buffered DJ section %d (%d tracks) from playlist update", len(p.app.djSectionBuffer), len(newTracks))
232232
}
@@ -1118,6 +1118,10 @@ func (p *AppPlayer) Run(ctx context.Context, apiRecv <-chan ApiRequest, mprisRec
11181118
apRecv := p.sess.Accesspoint().Receive(ap.PacketTypeProductInfo, ap.PacketTypeCountryCode)
11191119
msgRecv := p.sess.Dealer().ReceiveMessage("hm://pusher/v1/connections/", "hm://connect-state/v1/", "hm://playlist/v2/playlist/")
11201120
reqRecv := p.sess.Dealer().ReceiveRequest("hm://connect-state/v1/player/command")
1121+
// Also receive playlist pushes that arrive via the Mercury AP event channel
1122+
// (PacketTypeMercuryEvent). These are the vibe-section playlists the server
1123+
// sends during an active DJ session — they outnumber the dealer pushes ~10:1.
1124+
mercuryPlaylistRecv := p.sess.Mercury().SubscribeEvent("hm://playlist/v2/playlist/")
11211125
playerRecv := p.player.Receive()
11221126

11231127
volumeTimer := time.NewTimer(time.Minute)
@@ -1143,6 +1147,11 @@ func (p *AppPlayer) Run(ctx context.Context, apiRecv <-chan ApiRequest, mprisRec
11431147
if err := p.handleDealerMessage(ctx, msg); err != nil {
11441148
p.app.log.WithError(err).Warn("failed handling dealer message")
11451149
}
1150+
case evMsg := <-mercuryPlaylistRecv:
1151+
// Playlist push via the Mercury AP event channel — same handling as dealer.
1152+
if err := p.handleDealerMessage(ctx, dealer.Message{Uri: evMsg.Uri, Payload: evMsg.Payload}); err != nil {
1153+
p.app.log.WithError(err).Warn("failed handling mercury playlist event")
1154+
}
11461155
case req, ok := <-reqRecv:
11471156
if !ok {
11481157
continue

mercury/client.go

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/devgianlu/go-librespot/ap"
1313
spotifypb "github.com/devgianlu/go-librespot/proto/spotify"
1414
"google.golang.org/protobuf/proto"
15+
"strings"
1516
)
1617

1718
type hermesRequest struct {
@@ -28,13 +29,29 @@ type hermesResponse struct {
2829
err error
2930
}
3031

32+
// eventSubscriber receives Mercury AP push events (PacketTypeMercuryEvent) whose
33+
// URI matches one of the registered prefixes. Used for playlist section pushes.
34+
type eventSubscriber struct {
35+
uriPrefixes []string
36+
c chan eventMessage
37+
}
38+
39+
// EventMessage carries the URI and raw payload of a Mercury AP push event.
40+
type eventMessage struct {
41+
Uri string
42+
Payload []byte
43+
}
44+
3145
type Client struct {
3246
log librespot.Logger
3347
ap *ap.Accesspoint
3448

3549
recvLoopOnce sync.Once
3650

3751
reqChan chan hermesRequest
52+
53+
eventSubsLock sync.RWMutex
54+
eventSubs []eventSubscriber
3855
}
3956

4057
func NewClient(log librespot.Logger, accesspoint *ap.Accesspoint) *Client {
@@ -100,19 +117,32 @@ func (c *Client) recvLoop() {
100117
if len(evParts) > 0 {
101118
var evHeader spotifypb.MercuryHeader
102119
if err := proto.Unmarshal(evParts[0], &evHeader); err == nil {
120+
uri := evHeader.GetUri()
121+
var payload []byte
122+
if len(evParts) > 1 {
123+
payload = evParts[1]
124+
}
125+
payloadLen := 0
126+
for _, p := range evParts[1:] {
127+
payloadLen += len(p)
128+
}
103129
c.log.Debugf("mercury event: seq=%d flags=%d uri=%s statusCode=%v parts=%d payloadLen=%d",
104-
evSeq, evFlags,
105-
evHeader.GetUri(),
106-
evHeader.StatusCode,
107-
len(evParts),
108-
func() int {
109-
n := 0
110-
for _, p := range evParts[1:] {
111-
n += len(p)
130+
evSeq, evFlags, uri, evHeader.StatusCode, len(evParts), payloadLen)
131+
// Route to any registered event subscribers.
132+
c.eventSubsLock.RLock()
133+
for _, sub := range c.eventSubs {
134+
for _, prefix := range sub.uriPrefixes {
135+
if strings.HasPrefix(uri, prefix) {
136+
select {
137+
case sub.c <- eventMessage{Uri: uri, Payload: payload}:
138+
default:
139+
c.log.Debugf("mercury event subscriber full, dropping %s", uri)
140+
}
141+
break
112142
}
113-
return n
114-
}(),
115-
)
143+
}
144+
}
145+
c.eventSubsLock.RUnlock()
116146
} else {
117147
c.log.Debugf("mercury event: seq=%d flags=%d totalPayload=%d (header parse err: %v)", evSeq, evFlags, len(pkt.Payload), err)
118148
}
@@ -280,3 +310,14 @@ func (c *Client) Request(ctx context.Context, method, uri string, fields map[str
280310

281311
return respPayload, nil
282312
}
313+
314+
// SubscribeEvent returns a channel that receives Mercury AP push events (PacketTypeMercuryEvent)
315+
// whose URI starts with one of the given prefixes. The channel is buffered to avoid blocking
316+
// the receive loop when the caller is temporarily busy.
317+
func (c *Client) SubscribeEvent(uriPrefixes ...string) <-chan eventMessage {
318+
ch := make(chan eventMessage, 64)
319+
c.eventSubsLock.Lock()
320+
c.eventSubs = append(c.eventSubs, eventSubscriber{uriPrefixes: uriPrefixes, c: ch})
321+
c.eventSubsLock.Unlock()
322+
return ch
323+
}

0 commit comments

Comments
 (0)