@@ -406,6 +406,34 @@ func Test_Client_Common(t *testing.T) {
406406 wg .Wait ()
407407 })
408408
409+ t .Run ("Queues_Remove_Stress" , func (t * testing.T ) {
410+ t .Parallel ()
411+
412+ client , _ := setup (t )
413+
414+ startClient (ctx , t , client )
415+ riversharedtest .WaitOrTimeout (t , client .baseStartStop .Started ())
416+
417+ var wg sync.WaitGroup
418+
419+ for i := range 5 {
420+ wg .Add (1 )
421+ workerNum := i
422+ go func () {
423+ defer wg .Done ()
424+
425+ for j := range 5 {
426+ queueName := fmt .Sprintf ("stress_queue_%d_%d" , workerNum , j )
427+
428+ require .NoError (t , client .Queues ().Add (queueName , QueueConfig {MaxWorkers : 1 }))
429+ require .NoError (t , client .Queues ().Remove (ctx , queueName ))
430+ }
431+ }()
432+ }
433+
434+ wg .Wait ()
435+ })
436+
409437 t .Run ("Queues_Remove_BeforeStart" , func (t * testing.T ) {
410438 t .Parallel ()
411439
@@ -427,7 +455,7 @@ func Test_Client_Common(t *testing.T) {
427455 })
428456 require .NoError (t , err )
429457
430- err = client .Queues ().Remove (queueName )
458+ err = client .Queues ().Remove (ctx , queueName )
431459 require .NoError (t , err )
432460
433461 startClient (ctx , t , client )
@@ -481,7 +509,7 @@ func Test_Client_Common(t *testing.T) {
481509 event := riversharedtest .WaitOrTimeout (t , subscribeChan )
482510 require .Equal (t , EventKindJobCompleted , event .Kind )
483511
484- err = client .Queues ().Remove (queueName )
512+ err = client .Queues ().Remove (ctx , queueName )
485513 require .NoError (t , err )
486514
487515 insertRes , err := client .Insert (ctx , & JobArgs {}, & InsertOpts {
@@ -502,12 +530,110 @@ func Test_Client_Common(t *testing.T) {
502530 require .Equal (t , rivertype .JobStateAvailable , job .State )
503531 })
504532
533+ t .Run ("Queues_Remove_ContextDone" , func (t * testing.T ) {
534+ t .Parallel ()
535+
536+ client , _ := setup (t )
537+
538+ type JobArgs struct {
539+ testutil.JobArgsReflectKind [JobArgs ]
540+ }
541+
542+ jobStartedChan := make (chan struct {})
543+ AddWorker (client .config .Workers , WorkFunc (func (ctx context.Context , job * Job [JobArgs ]) error {
544+ close (jobStartedChan )
545+ <- ctx .Done ()
546+ return nil
547+ }))
548+
549+ queueName := "remove_context_done_queue"
550+ require .NoError (t , client .Queues ().Add (queueName , QueueConfig {MaxWorkers : 2 }))
551+
552+ startClient (ctx , t , client )
553+ riversharedtest .WaitOrTimeout (t , client .baseStartStop .Started ())
554+
555+ _ , err := client .Insert (ctx , & JobArgs {}, & InsertOpts {Queue : queueName })
556+ require .NoError (t , err )
557+
558+ riversharedtest .WaitOrTimeout (t , jobStartedChan )
559+
560+ // Remove with an already-cancelled context should return immediately
561+ // without removing the queue.
562+ cancelledCtx , cancel := context .WithCancel (ctx )
563+ cancel ()
564+
565+ err = client .Queues ().Remove (cancelledCtx , queueName )
566+ require .ErrorIs (t , err , context .Canceled )
567+
568+ // Queue should still exist and be functional since Remove bailed out.
569+ // Verify by successfully removing it with a valid context after
570+ // cancelling the job via StopAndCancel.
571+ require .NoError (t , client .StopAndCancel (ctx ))
572+
573+ // Re-start so startClient's cleanup Stop doesn't fail.
574+ require .NoError (t , client .Start (ctx ))
575+ })
576+
577+ t .Run ("Queues_Remove_ContextCancelledThenRetry" , func (t * testing.T ) {
578+ t .Parallel ()
579+
580+ client , _ := setup (t )
581+
582+ type JobArgs struct {
583+ testutil.JobArgsReflectKind [JobArgs ]
584+ }
585+
586+ jobStartedChan := make (chan struct {})
587+ jobFinishChan := make (chan struct {})
588+ AddWorker (client .config .Workers , WorkFunc (func (ctx context.Context , job * Job [JobArgs ]) error {
589+ close (jobStartedChan )
590+ <- jobFinishChan // blocks until test releases it
591+ return nil
592+ }))
593+
594+ queueName := "remove_retry_queue"
595+ require .NoError (t , client .Queues ().Add (queueName , QueueConfig {MaxWorkers : 2 }))
596+
597+ startClient (ctx , t , client )
598+ riversharedtest .WaitOrTimeout (t , client .baseStartStop .Started ())
599+
600+ _ , err := client .Insert (ctx , & JobArgs {}, & InsertOpts {Queue : queueName })
601+ require .NoError (t , err )
602+
603+ riversharedtest .WaitOrTimeout (t , jobStartedChan )
604+
605+ // First Remove: use a very short timeout so that StopInit is called
606+ // (which cancels the producer's fetch context) but the producer
607+ // can't fully stop because the job is still running. Remove returns
608+ // context.DeadlineExceeded and the queue stays in the map.
609+ shortCtx , cancel := context .WithTimeout (ctx , time .Millisecond )
610+ defer cancel ()
611+
612+ err = client .Queues ().Remove (shortCtx , queueName )
613+ require .ErrorIs (t , err , context .DeadlineExceeded )
614+
615+ // Let the job finish so the producer can fully stop.
616+ close (jobFinishChan )
617+
618+ // Second Remove: the producer's fetch context was already cancelled
619+ // by the first attempt and the job has now finished, so the
620+ // producer should stop promptly. Remove succeeds and deletes the
621+ // queue from the map.
622+ err = client .Queues ().Remove (ctx , queueName )
623+ require .NoError (t , err )
624+
625+ // A third remove should return QueueNotFoundError since it's gone.
626+ err = client .Queues ().Remove (ctx , queueName )
627+ var queueNotFoundErr * QueueNotFoundError
628+ require .ErrorAs (t , err , & queueNotFoundErr )
629+ })
630+
505631 t .Run ("Queues_Remove_NonExistentQueue" , func (t * testing.T ) {
506632 t .Parallel ()
507633
508634 client , _ := setup (t )
509635
510- err := client .Queues ().Remove ("non_existent_queue" )
636+ err := client .Queues ().Remove (ctx , "non_existent_queue" )
511637 require .Error (t , err )
512638 var queueNotFoundErr * QueueNotFoundError
513639 require .ErrorAs (t , err , & queueNotFoundErr )
@@ -522,7 +648,7 @@ func Test_Client_Common(t *testing.T) {
522648 config .Workers = nil
523649 client := newTestClient (t , bundle .dbPool , config )
524650
525- err := client .Queues ().Remove ("any_queue" )
651+ err := client .Queues ().Remove (ctx , "any_queue" )
526652 require .Error (t , err )
527653 require .Contains (t , err .Error (), "client is not configured to execute jobs, cannot remove queue" )
528654 })
@@ -551,7 +677,7 @@ func Test_Client_Common(t *testing.T) {
551677 event := riversharedtest .WaitOrTimeout (t , subscribeChan )
552678 require .Equal (t , EventKindJobCompleted , event .Kind )
553679
554- err = client .Queues ().Remove (QueueDefault )
680+ err = client .Queues ().Remove (ctx , QueueDefault )
555681 require .NoError (t , err )
556682
557683 insertRes , err := client .Insert (ctx , & JobArgs {}, nil )
@@ -601,7 +727,7 @@ func Test_Client_Common(t *testing.T) {
601727 event := riversharedtest .WaitOrTimeout (t , subscribeChan )
602728 require .Equal (t , EventKindJobCompleted , event .Kind )
603729
604- err = client .Queues ().Remove (queueName )
730+ err = client .Queues ().Remove (ctx , queueName )
605731 require .NoError (t , err )
606732
607733 err = client .Queues ().Add (queueName , QueueConfig {
@@ -634,7 +760,7 @@ func Test_Client_Common(t *testing.T) {
634760 require .Equal (t , EventKindJobCompleted , event .Kind )
635761 require .Equal (t , insertRes1 .Job .ID , event .Job .ID )
636762
637- require .NoError (t , client .Queues ().Remove ("test_queue" ))
763+ require .NoError (t , client .Queues ().Remove (ctx , "test_queue" ))
638764
639765 insertRes2 , err := client .Insert (ctx , & noOpArgs {}, & InsertOpts {Queue : "test_queue" })
640766 require .NoError (t , err )
0 commit comments