@@ -1212,3 +1212,250 @@ func TestWrongYielder(t *testing.T) {
12121212 }
12131213 })
12141214}
1215+
1216+ // TestDealerYieldRetriesOnSlowCaller pins the per-invocation
1217+ // retry-queue path added in gammazero/nexus#324: when the caller's
1218+ // outbound channel is full at the moment the dealer tries to deliver
1219+ // a YIELD, the dealer queues the YIELD onto the invocation's
1220+ // pendingYields slice and spawns a drainPendingYields goroutine to
1221+ // retry delivery on a backoff. Once the caller drains, the queued
1222+ // yield is delivered and the drain goroutine exits.
1223+ //
1224+ // Coverage: dealer.yield's blocked-direct-delivery branch,
1225+ // drainPendingYields outer loop, tryDrainOneYield's
1226+ // deliver-and-pop branch.
1227+ func TestDealerYieldRetriesOnSlowCaller (t * testing.T ) {
1228+ dealer , _ := newTestDealer (t )
1229+
1230+ // Register a procedure on the callee.
1231+ callee := newTestPeer ()
1232+ calleeSess := wamp .NewSession (callee , 0 , nil , nil )
1233+ dealer .register (calleeSess , & wamp.Register {Request : 1 , Procedure : testProcedure })
1234+ registered := <- callee .Recv ()
1235+ _ , ok := registered .(* wamp.Registered )
1236+ require .True (t , ok , "expected REGISTERED" )
1237+
1238+ // Caller is also a 1-slot testPeer — slow side.
1239+ caller := newTestPeer ()
1240+ callerSess := wamp .NewSession (caller , 0 , nil , nil )
1241+
1242+ // Caller issues CALL.
1243+ dealer .call (callerSess , & wamp.Call {Request : 100 , Procedure : testProcedure })
1244+ inv := (<- callee .Recv ()).(* wamp.Invocation )
1245+
1246+ // First progressive YIELD: dealer's syncYield delivers it
1247+ // directly into caller's outbound channel (slot fills).
1248+ dealer .yield (calleeSess , & wamp.Yield {
1249+ Request : inv .Request ,
1250+ Options : wamp.Dict {wamp .OptProgress : true },
1251+ Arguments : wamp.List {1 },
1252+ })
1253+
1254+ // Second progressive YIELD: caller queue is full; syncYield
1255+ // returns canRetry=true; dealer.yield queues into
1256+ // pendingYields and spawns drainPendingYields.
1257+ dealer .yield (calleeSess , & wamp.Yield {
1258+ Request : inv .Request ,
1259+ Options : wamp.Dict {wamp .OptProgress : true },
1260+ Arguments : wamp.List {2 },
1261+ })
1262+
1263+ // Test reads first RESULT — caller queue empties.
1264+ res1 := (<- caller .Recv ()).(* wamp.Result )
1265+ require .Equal (t , 1 , res1 .Arguments [0 ], "expected first progressive RESULT arg=1" )
1266+
1267+ // drainPendingYields goroutine eventually delivers the second
1268+ // queued RESULT after its 1ms backoff fires.
1269+ select {
1270+ case msg := <- caller .Recv ():
1271+ res2 , ok := msg .(* wamp.Result )
1272+ require .True (t , ok , "expected RESULT, got %T" , msg )
1273+ require .Equal (t , 2 , res2 .Arguments [0 ], "expected second progressive RESULT arg=2" )
1274+ case <- time .After (time .Second ):
1275+ require .FailNow (t , "drain goroutine did not deliver queued YIELD" )
1276+ }
1277+
1278+ // Final non-progress YIELD ends the invocation.
1279+ dealer .yield (calleeSess , & wamp.Yield {
1280+ Request : inv .Request ,
1281+ Arguments : wamp.List {3 },
1282+ })
1283+ resFinal := (<- caller .Recv ()).(* wamp.Result )
1284+ require .Equal (t , 3 , resFinal .Arguments [0 ])
1285+ _ , hasProgress := resFinal .Details [wamp .OptProgress ]
1286+ require .False (t , hasProgress , "final RESULT must not carry progress=true" )
1287+ }
1288+
1289+ // TestDealerYieldRetriesPreservesOrder pins FIFO semantics of the
1290+ // pendingYields queue: when N YIELDs back up, draining delivers
1291+ // them in the order they arrived.
1292+ //
1293+ // Coverage: tryDrainOneYield's "more to drain" return path.
1294+ func TestDealerYieldRetriesPreservesOrder (t * testing.T ) {
1295+ dealer , _ := newTestDealer (t )
1296+
1297+ callee := newTestPeer ()
1298+ calleeSess := wamp .NewSession (callee , 0 , nil , nil )
1299+ dealer .register (calleeSess , & wamp.Register {Request : 1 , Procedure : testProcedure })
1300+ <- callee .Recv () // drain Registered
1301+
1302+ caller := newTestPeer ()
1303+ callerSess := wamp .NewSession (caller , 0 , nil , nil )
1304+ dealer .call (callerSess , & wamp.Call {Request : 200 , Procedure : testProcedure })
1305+ inv := (<- callee .Recv ()).(* wamp.Invocation )
1306+
1307+ // Send 5 progressive YIELDs back-to-back. First delivers
1308+ // directly (caller queue takes it). Subsequent four queue.
1309+ const n = 5
1310+ for i := 1 ; i <= n ; i ++ {
1311+ dealer .yield (calleeSess , & wamp.Yield {
1312+ Request : inv .Request ,
1313+ Options : wamp.Dict {wamp .OptProgress : true },
1314+ Arguments : wamp.List {i },
1315+ })
1316+ }
1317+
1318+ // Drain all five from caller — they MUST arrive in order
1319+ // 1, 2, 3, 4, 5.
1320+ for i := 1 ; i <= n ; i ++ {
1321+ select {
1322+ case msg := <- caller .Recv ():
1323+ res , ok := msg .(* wamp.Result )
1324+ require .Truef (t , ok , "expected RESULT %d, got %T" , i , msg )
1325+ require .Equalf (t , i , res .Arguments [0 ],
1326+ "out-of-order RESULT at position %d (got arg %v)" , i , res .Arguments [0 ])
1327+ case <- time .After (2 * time .Second ):
1328+ require .FailNowf (t , "timed out" , "drain did not deliver RESULT %d in time" , i )
1329+ }
1330+ }
1331+ }
1332+
1333+ // TestDealerDrainExitsOnInvocationCancel pins the cleanup path:
1334+ // while the drain goroutine is retrying, if the caller cancels the
1335+ // invocation (or the call is removed for any other reason),
1336+ // tryDrainOneYield observes the missing invocation entry and
1337+ // returns (keepGoing=false), which exits the drain goroutine.
1338+ //
1339+ // Coverage: tryDrainOneYield's "invocation gone" early return.
1340+ func TestDealerDrainExitsOnInvocationCancel (t * testing.T ) {
1341+ dealer , _ := newTestDealer (t )
1342+
1343+ callee := newTestPeer ()
1344+ calleeSess := wamp .NewSession (callee , 0 , nil , nil )
1345+ dealer .register (calleeSess , & wamp.Register {Request : 1 , Procedure : testProcedure ,
1346+ Options : wamp.Dict {"call_canceling" : true }})
1347+ <- callee .Recv () // Registered
1348+
1349+ caller := newTestPeer ()
1350+ calleeRoles := wamp.Dict {
1351+ "roles" : wamp.Dict {
1352+ "callee" : wamp.Dict {
1353+ "features" : wamp.Dict {
1354+ "call_canceling" : true ,
1355+ },
1356+ },
1357+ },
1358+ }
1359+ calleeSess2 := wamp .NewSession (callee , 0 , nil , calleeRoles )
1360+ calleeSess .Details = calleeSess2 .Details // attach features so cancel can be sent
1361+ callerSess := wamp .NewSession (caller , 0 , nil , nil )
1362+
1363+ dealer .call (callerSess , & wamp.Call {Request : 300 , Procedure : testProcedure })
1364+ inv := (<- callee .Recv ()).(* wamp.Invocation )
1365+
1366+ // Fill caller's queue with 1 YIELD, then queue a 2nd.
1367+ dealer .yield (calleeSess , & wamp.Yield {
1368+ Request : inv .Request , Options : wamp.Dict {wamp .OptProgress : true },
1369+ Arguments : wamp.List {"first" },
1370+ })
1371+ dealer .yield (calleeSess , & wamp.Yield {
1372+ Request : inv .Request , Options : wamp.Dict {wamp .OptProgress : true },
1373+ Arguments : wamp.List {"queued" },
1374+ })
1375+
1376+ // At this point the drain goroutine is retrying. Cancel the
1377+ // call from the caller side (mode=killnowait so we don't wait
1378+ // for callee to acknowledge).
1379+ dealer .cancel (callerSess , & wamp.Cancel {
1380+ Request : 300 ,
1381+ Options : wamp.Dict {wamp .OptMode : wamp .CancelModeKillNoWait },
1382+ })
1383+
1384+ // Caller may receive ERROR(canceled). Drain caller's queue
1385+ // (initial RESULT + ERROR) so we don't deadlock the dealer.
1386+ for range 3 {
1387+ select {
1388+ case <- caller .Recv ():
1389+ case <- time .After (time .Second ):
1390+ }
1391+ }
1392+
1393+ // Send another YIELD post-cancel — dealer.yield should NOT
1394+ // requeue (invocation entry is gone). This indirectly proves
1395+ // the drain goroutine has exited (otherwise it would still
1396+ // try to deliver and we'd see a fourth message).
1397+ dealer .yield (calleeSess , & wamp.Yield {
1398+ Request : inv .Request , Arguments : wamp.List {"too late" },
1399+ })
1400+ select {
1401+ case msg := <- caller .Recv ():
1402+ // syncYield's "no caller" path may send INTERRUPT to
1403+ // callee, but caller should not get a stale RESULT.
1404+ _ , isResult := msg .(* wamp.Result )
1405+ require .False (t , isResult , "should not receive RESULT after cancel; got %T" , msg )
1406+ case <- time .After (100 * time .Millisecond ):
1407+ }
1408+ }
1409+
1410+ // TestDealerDrainExitsOnDealerClose pins the drain goroutine's
1411+ // shutdown path: when the dealer is closing, drainPendingYields'
1412+ // sleep wakes via <-d.closing and the goroutine returns rather
1413+ // than blocking the realm.close ordering.
1414+ //
1415+ // Coverage: drainPendingYields' <-d.closing branch.
1416+ func TestDealerDrainExitsOnDealerClose (t * testing.T ) {
1417+ d := newDealer (logger , false , true , debug )
1418+ metaClient , rtr := transport .LinkedPeers ()
1419+ d .setMetaPeer (rtr )
1420+
1421+ // Register + call setup as before.
1422+ callee := newTestPeer ()
1423+ calleeSess := wamp .NewSession (callee , 0 , nil , nil )
1424+ d .register (calleeSess , & wamp.Register {Request : 1 , Procedure : testProcedure })
1425+ <- callee .Recv ()
1426+
1427+ caller := newTestPeer ()
1428+ callerSess := wamp .NewSession (caller , 0 , nil , nil )
1429+ d .call (callerSess , & wamp.Call {Request : 400 , Procedure : testProcedure })
1430+ inv := (<- callee .Recv ()).(* wamp.Invocation )
1431+
1432+ // Fill + queue one extra to spawn drain.
1433+ d .yield (calleeSess , & wamp.Yield {
1434+ Request : inv .Request , Options : wamp.Dict {wamp .OptProgress : true },
1435+ Arguments : wamp.List {1 },
1436+ })
1437+ d .yield (calleeSess , & wamp.Yield {
1438+ Request : inv .Request , Options : wamp.Dict {wamp .OptProgress : true },
1439+ Arguments : wamp.List {2 },
1440+ })
1441+
1442+ // drainPendingYields goroutine is now sleeping (or in
1443+ // actionChan) at backoff. d.close() closes d.closing — both
1444+ // the sleep and the actionChan submit have a <-d.closing case
1445+ // and exit cleanly.
1446+ d .close ()
1447+
1448+ // We can't easily assert on the goroutine count without
1449+ // goleak, but if d.close hung waiting on the drain goroutine,
1450+ // this test would time out. Reaching this point at all proves
1451+ // the drain goroutine yielded to closing.
1452+
1453+ // Consume any leftover messages so the test doesn't leak
1454+ // goroutines through the testPeer channels.
1455+ go func () {
1456+ for range caller .Recv () {
1457+ }
1458+ }()
1459+ metaClient .Close ()
1460+ rtr .Close ()
1461+ }
0 commit comments