Skip to content

Commit 7ef9526

Browse files
author
Marko Petzold
committed
router/dealer: don't block callee session on slow caller; per-call retry queue (gammazero#324)
The dealer's yield() had a synchronous retry loop that ran on the callee's session-handler goroutine. When the caller's outbound queue filled (slow / unresponsive caller), that retry blocked for up to sendResultDeadline (one minute), freezing all other communication for the otherwise-healthy callee — even unrelated calls, subscriptions, and meta procedures. Fix: move the retry loop onto a fresh goroutine per invocation, with a per-invocation in-order pending-yield queue tracked on the invocation struct itself. Properties of the new design: - Fast path unchanged: a YIELD that delivers on the first attempt finishes synchronously; no goroutine spawn, no queueing. - Slow path: the YIELD is appended to invocation.pendingYields and a single retry goroutine drains it. Subsequent YIELDs for the same invocation that arrive while a retry is in progress are appended (preserving in-call ordering, which progressive results depend on) rather than racing on out-of-band delivery. - Backoff: starts at yieldRetryDelay, doubles up to a 1 s cap, and resets to yieldRetryDelay whenever a queued YIELD is delivered (caller is making progress). - Shutdown: dealer gains a 'closing' channel that is signaled before actionChan is closed, so retry goroutines exit cleanly via select on closing rather than racing to a send-on-closed-channel panic. Test: TestDealerYieldDoesNotBlockCalleeOnSlowCaller exercises the exact scenario from gammazero#324 — a caller with a 1-slot outbound queue that the test never drains, two CALLs whose RESULTs would queue, then a SUBSCRIBE from the callee that must complete promptly. Without the fix the test trips a 1-second deadline (callee deadlocked); with the fix the SUBSCRIBE returns in milliseconds. The existing TestProgressStress (16 progressive calls × variable chunk sizes, caller pauses mid-stream) continues to pass — chunks stay in order and counts match.
1 parent 8b4f740 commit 7ef9526

2 files changed

Lines changed: 232 additions & 29 deletions

File tree

router/dealer.go

Lines changed: 167 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,28 @@ type invocation struct {
6262
inProgress bool
6363
timerCancel context.CancelFunc
6464
options wamp.Dict
65+
66+
// pendingYields buffers YIELD messages that arrived while a retry
67+
// goroutine is draining the queue for this invocation. Drained in
68+
// arrival order so progressive results stay correctly sequenced.
69+
// Accessed only from the dealer actor goroutine.
70+
pendingYields []pendingYield
71+
72+
// retrying is true while a retry goroutine is actively delivering
73+
// queued YIELDs for this invocation. Prevents duplicate retry
74+
// goroutines from racing on the same call.
75+
retrying bool
76+
77+
// retryDeadline is the absolute time after which the retry goroutine
78+
// stops trying and cancels the call instead.
79+
retryDeadline time.Time
80+
}
81+
82+
// pendingYield carries a queued YIELD with the cached progress flag so
83+
// the retry goroutine doesn't have to re-extract it from msg.Options.
84+
type pendingYield struct {
85+
msg *wamp.Yield
86+
progress bool
6587
}
6688

6789
type requestID struct {
@@ -98,6 +120,10 @@ type dealer struct {
98120

99121
actionChan chan func()
100122
stopped chan struct{}
123+
// closing is signaled before actionChan is closed so background
124+
// goroutines (e.g. yield-retry workers) can exit cleanly without
125+
// racing on a send-on-closed-channel panic.
126+
closing chan struct{}
101127

102128
// Generate registration IDs.
103129
idGen *wamp.IDGen
@@ -139,6 +165,7 @@ func newDealer(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *deal
139165
// channel is appropriate.
140166
actionChan: make(chan func()),
141167
stopped: make(chan struct{}),
168+
closing: make(chan struct{}),
142169

143170
idGen: new(wamp.IDGen),
144171
prng: rand.New(rand.NewSource(time.Now().Unix())), //nolint:gosec // used for call invocation
@@ -318,52 +345,159 @@ func (d *dealer) cancel(caller *wamp.Session, msg *wamp.Cancel) {
318345
// yield handles the result of successfully processing and finishing the
319346
// execution of a call, send from callee to dealer.
320347
//
321-
// If the RESULT could not be sent to the caller because the caller was blocked
322-
// (send queue full), then retry sending until timeout. If timeout while trying
323-
// to send RESULT, then cancel call.
348+
// Fast path: deliver the RESULT on the dealer goroutine via syncYield. If
349+
// the caller's outbound queue is full, the YIELD is enqueued on the
350+
// invocation's per-call retry queue and a retry goroutine is started
351+
// (one per invocation, at most). All subsequent YIELDs for the same
352+
// invocation that arrive while a retry is in progress are appended to
353+
// the queue and drained in arrival order, so progressive results stay
354+
// correctly sequenced.
355+
//
356+
// Critically, this function returns to the callee's session-handler
357+
// goroutine as soon as the dealer has either delivered or queued the
358+
// YIELD. The retry loop runs on a fresh goroutine. Without this split,
359+
// a single unresponsive caller would freeze all further communication
360+
// for an otherwise healthy callee — see gammazero/nexus#324.
324361
func (d *dealer) yield(callee *wamp.Session, msg *wamp.Yield) {
325362
if callee == nil || msg == nil {
326363
panic("dealer.Yield with nil session or message")
327364
}
328365

329-
var again bool
330366
progress, _ := msg.Options[wamp.OptProgress].(bool)
367+
invkReqID := requestID{session: callee.ID, request: msg.Request}
331368

369+
var startRetry bool
332370
done := make(chan struct{})
333-
d.actionChan <- func() {
334-
again = d.syncYield(callee, msg, progress, true)
371+
action := func() {
372+
invk, ok := d.invocations[invkReqID]
373+
if ok && invk.retrying {
374+
// A retry goroutine is already draining this call's queue;
375+
// preserve in-call order by appending instead of trying to
376+
// deliver out-of-band.
377+
invk.pendingYields = append(invk.pendingYields, pendingYield{msg: msg, progress: progress})
378+
done <- struct{}{}
379+
return
380+
}
381+
// No active retry — try direct delivery first.
382+
if !d.syncYield(callee, msg, progress, true) {
383+
done <- struct{}{}
384+
return
385+
}
386+
// Caller's queue is full. Queue this YIELD and start a retry
387+
// goroutine. Re-fetch invk because syncYield may have changed
388+
// state (it shouldn't have deleted the entry on canRetry=true,
389+
// but be defensive).
390+
invk, ok = d.invocations[invkReqID]
391+
if !ok {
392+
// Invocation gone (raced with cancel/timeout) — nothing to retry.
393+
done <- struct{}{}
394+
return
395+
}
396+
invk.pendingYields = append(invk.pendingYields, pendingYield{msg: msg, progress: progress})
397+
invk.retrying = true
398+
invk.retryDeadline = time.Now().Add(sendResultDeadline)
399+
startRetry = true
335400
done <- struct{}{}
336401
}
402+
select {
403+
case d.actionChan <- action:
404+
case <-d.closing:
405+
return
406+
}
337407
<-done
338408

339-
// If blocked, retry
340-
if again {
341-
retry := true
342-
delay := yieldRetryDelay
343-
start := time.Now()
344-
// Retry processing YIELD until caller gone or deadline reached.
345-
for {
346-
if d.debug {
347-
d.log.Println("Retry sending RESULT after", delay)
348-
}
349-
<-time.After(delay)
350-
// Do not retry if the elapsed time exceeds deadline.
351-
if time.Since(start) >= sendResultDeadline {
352-
retry = false
353-
}
354-
d.actionChan <- func() {
355-
again = d.syncYield(callee, msg, progress, retry)
356-
done <- struct{}{}
357-
}
358-
<-done
359-
if !again {
360-
break
361-
}
409+
if startRetry {
410+
go d.drainPendingYields(callee, invkReqID)
411+
}
412+
}
413+
414+
// drainPendingYields runs on a dedicated goroutine per invocation while
415+
// that invocation has YIELDs queued for retry. It serializes through the
416+
// dealer actor (actionChan), so dealer state stays single-threaded.
417+
//
418+
// Backoff: starts at yieldRetryDelay, doubles on each failed attempt
419+
// up to a 1-second ceiling, and resets to yieldRetryDelay whenever a
420+
// queued YIELD is successfully delivered (caller is making progress).
421+
//
422+
// Exits when the queue is drained, the invocation is gone, or the dealer
423+
// is closing.
424+
func (d *dealer) drainPendingYields(callee *wamp.Session, invkReqID requestID) {
425+
delay := yieldRetryDelay
426+
for {
427+
if d.debug {
428+
d.log.Println("Retry sending RESULT after", delay)
429+
}
430+
// Sleep, but watch for dealer shutdown so we never block the
431+
// realm.close path.
432+
timer := time.NewTimer(delay)
433+
select {
434+
case <-timer.C:
435+
case <-d.closing:
436+
timer.Stop()
437+
return
438+
}
439+
440+
var keepGoing, delivered bool
441+
done := make(chan struct{})
442+
select {
443+
case d.actionChan <- func() {
444+
keepGoing, delivered = d.tryDrainOneYield(callee, invkReqID)
445+
done <- struct{}{}
446+
}:
447+
case <-d.closing:
448+
return
449+
}
450+
<-done
451+
452+
if !keepGoing {
453+
return
454+
}
455+
if delivered {
456+
// Caller is making progress; retry the next item quickly.
457+
delay = yieldRetryDelay
458+
} else if delay < time.Second {
362459
delay *= 2
363460
}
364461
}
365462
}
366463

464+
// tryDrainOneYield is invoked on the dealer actor goroutine. It attempts
465+
// to deliver the head of the invocation's pending-yield queue.
466+
// Returns:
467+
//
468+
// keepGoing — true if the retry goroutine should keep looping.
469+
// delivered — true if the head was delivered on this attempt
470+
// (whether or not more remain).
471+
func (d *dealer) tryDrainOneYield(callee *wamp.Session, invkReqID requestID) (keepGoing, delivered bool) {
472+
invk, ok := d.invocations[invkReqID]
473+
if !ok {
474+
// Invocation gone — call canceled, callee disconnected, etc.
475+
return false, false
476+
}
477+
if len(invk.pendingYields) == 0 {
478+
invk.retrying = false
479+
return false, false
480+
}
481+
head := invk.pendingYields[0]
482+
canRetry := time.Now().Before(invk.retryDeadline)
483+
if d.syncYield(callee, head.msg, head.progress, canRetry) {
484+
// Still blocked — keep the head and retry after backoff.
485+
return true, false
486+
}
487+
// Delivered (or canceled at deadline). Pop the head. For a final
488+
// non-progress YIELD, syncYield's deferred cleanup deletes the
489+
// invocation; re-fetch before touching it again.
490+
invk.pendingYields = invk.pendingYields[1:]
491+
invk, ok = d.invocations[invkReqID]
492+
if !ok || len(invk.pendingYields) == 0 {
493+
if ok {
494+
invk.retrying = false
495+
}
496+
return false, true
497+
}
498+
return true, true
499+
}
500+
367501
// error handles an invocation error returned by the callee.
368502
func (d *dealer) error(callee *wamp.Session, msg *wamp.Error) {
369503
if msg == nil {
@@ -402,6 +536,10 @@ func (d *dealer) removeSession(sess *wamp.Session) {
402536

403537
// close stops the dealer, letting already queued actions finish.
404538
func (d *dealer) close() {
539+
// Signal background goroutines (e.g. yield-retry workers) before
540+
// closing actionChan, so they exit via the closing channel rather
541+
// than racing on a send-on-closed-channel panic.
542+
close(d.closing)
405543
close(d.actionChan)
406544
<-d.stopped
407545
if d.debug {

router/router_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,71 @@ func TestPublishNoAcknowledge(t *testing.T) {
255255
})
256256
}
257257

258+
// TestDealerYieldDoesNotBlockCalleeOnSlowCaller pins gammazero/nexus#324:
259+
// when a caller's outbound queue fills (the caller is slow or unresponsive
260+
// reading its Recv channel), the dealer's RESULT-delivery retry loop must
261+
// not stall the callee's session goroutine. Otherwise a single bad caller
262+
// freezes communication for the callee, even though the callee has
263+
// nothing wrong with it.
264+
//
265+
// Without the fix this test trips its 1-second deadline; with the fix the
266+
// callee remains responsive within milliseconds.
267+
func TestDealerYieldDoesNotBlockCalleeOnSlowCaller(t *testing.T) {
268+
r := newTestRouter(t)
269+
270+
// Caller with the smallest practical outbound queue. After WELCOME is
271+
// drained the queue is empty; one stalled RESULT will fill it.
272+
callerClient, callerServer := transport.LinkedPeersQSize(1)
273+
go func() {
274+
callerClient.Send() <- &wamp.Hello{Realm: testRealm, Details: clientRoles}
275+
}()
276+
require.NoError(t, r.Attach(callerServer))
277+
welcome := <-callerClient.Recv()
278+
_, ok := welcome.(*wamp.Welcome)
279+
require.True(t, ok, "expected WELCOME, got %T", welcome)
280+
281+
// Callee with the normal default queue.
282+
callee := testClient(t, r)
283+
284+
const proc = wamp.URI("nexus.test.slowcaller.proc")
285+
callee.Send() <- &wamp.Register{Request: 1, Procedure: proc}
286+
msg, err := wamp.RecvTimeout(callee, time.Second)
287+
require.NoError(t, err)
288+
_, ok = msg.(*wamp.Registered)
289+
require.True(t, ok, "expected REGISTERED, got %T", msg)
290+
291+
deliverYield := func(callRequest wamp.ID) {
292+
t.Helper()
293+
callerClient.Send() <- &wamp.Call{Request: callRequest, Procedure: proc}
294+
msg, err := wamp.RecvTimeout(callee, time.Second)
295+
require.NoErrorf(t, err, "callee never received INVOCATION for call %d", callRequest)
296+
inv, ok := msg.(*wamp.Invocation)
297+
require.True(t, ok, "expected INVOCATION, got %T", msg)
298+
callee.Send() <- &wamp.Yield{Request: inv.Request}
299+
}
300+
301+
// First call: RESULT 1 fills caller's 1-slot queue (test never reads it).
302+
deliverYield(1)
303+
304+
// Second call: RESULT 2 cannot be queued — caller is now blocked.
305+
// dealer.yield enters its retry loop. With the bug, this retry runs on
306+
// the callee's session goroutine and blocks it.
307+
deliverYield(2)
308+
309+
// The callee must still be able to do other work while the dealer
310+
// retries delivering RESULT 2 in the background. SUBSCRIBE → SUBSCRIBED
311+
// must complete promptly; without the fix it waits for the retry
312+
// deadline (~1 minute).
313+
callee.Send() <- &wamp.Subscribe{Request: 100, Topic: testTopic}
314+
select {
315+
case msg := <-callee.Recv():
316+
_, ok := msg.(*wamp.Subscribed)
317+
require.True(t, ok, "expected SUBSCRIBED, got %T", msg)
318+
case <-time.After(time.Second):
319+
t.Fatal("callee blocked for >1s while caller's queue is full — see #324")
320+
}
321+
}
322+
258323
func TestRouterCall(t *testing.T) {
259324
synctest.Test(t, func(t *testing.T) {
260325
r := newTestRouter(t)

0 commit comments

Comments
 (0)