Skip to content

Commit dd5051a

Browse files
committed
Add QueueBundle.Remove() to stop and remove queues
Adds dynamic queue removal at runtime. When removed, the queue's producer is stopped and jobs can no longer be worked on that queue. Queues can be re-added after removal. Signed-off-by: Tiago Silva <3629062+tigrato@users.noreply.github.com>
1 parent 7f7d703 commit dd5051a

3 files changed

Lines changed: 296 additions & 0 deletions

File tree

client.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
768768

769769
client.queues = &QueueBundle{
770770
addProducer: client.addProducer,
771+
removeProducer: client.removeProducer,
771772
clientFetchCooldown: config.FetchCooldown,
772773
clientFetchPollInterval: config.FetchPollInterval,
773774
clientWillExecuteJobs: config.willExecuteJobs(),
@@ -2209,6 +2210,20 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p
22092210
return producer, nil
22102211
}
22112212

2213+
func (c *Client[TTx]) removeProducer(queueName string) error {
2214+
c.producersMu.Lock()
2215+
producer, ok := c.producersByQueueName[queueName]
2216+
delete(c.producersByQueueName, queueName)
2217+
c.producersMu.Unlock()
2218+
2219+
if !ok {
2220+
return &QueueNotFoundError{Name: queueName}
2221+
}
2222+
2223+
producer.Stop()
2224+
return nil
2225+
}
2226+
22122227
var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)
22132228

22142229
func validateQueueName(queueName string) error {
@@ -2799,6 +2814,8 @@ type QueueBundle struct {
27992814
// Function that adds a producer to the associated client.
28002815
addProducer func(queueName string, queueConfig QueueConfig) (*producer, error)
28012816

2817+
removeProducer func(queueName string) error
2818+
28022819
clientFetchCooldown time.Duration
28032820
clientFetchPollInterval time.Duration
28042821

@@ -2844,6 +2861,24 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
28442861
return nil
28452862
}
28462863

2864+
// Remove removes a queue from the client, stopping the producer if the client
2865+
// is running. The function will block until all jobs currently being worked in
2866+
// the queue have completed. This blocking behavior may affect other operations,
2867+
// including shutdown timing.
2868+
//
2869+
// Returns an error if the client is not configured to execute jobs or if the
2870+
// specified queue does not exist.
2871+
func (b *QueueBundle) Remove(queueName string) error {
2872+
if !b.clientWillExecuteJobs {
2873+
return errors.New("client is not configured to execute jobs, cannot remove queue")
2874+
}
2875+
2876+
b.startStopMu.Lock()
2877+
defer b.startStopMu.Unlock()
2878+
2879+
return b.removeProducer(queueName)
2880+
}
2881+
28472882
// Generates a default client ID using the current hostname and time.
28482883
func defaultClientID(startedAt time.Time) string {
28492884
host, _ := os.Hostname()

client_test.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,252 @@ func Test_Client_Common(t *testing.T) {
406406
wg.Wait()
407407
})
408408

409+
t.Run("Queues_Remove_BeforeStart", func(t *testing.T) {
410+
t.Parallel()
411+
412+
client, _ := setup(t)
413+
414+
type JobArgs struct {
415+
testutil.JobArgsReflectKind[JobArgs]
416+
}
417+
418+
subscribeChan := subscribe(t, client)
419+
420+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
421+
return nil
422+
}))
423+
424+
queueName := "remove_before_start_queue"
425+
err := client.Queues().Add(queueName, QueueConfig{
426+
MaxWorkers: 2,
427+
})
428+
require.NoError(t, err)
429+
430+
err = client.Queues().Remove(queueName)
431+
require.NoError(t, err)
432+
433+
startClient(ctx, t, client)
434+
435+
insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{
436+
Queue: queueName,
437+
})
438+
require.NoError(t, err)
439+
440+
// Verify job stays available by checking another queue's job completes
441+
_, err = client.Insert(ctx, &noOpArgs{}, nil)
442+
require.NoError(t, err)
443+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
444+
require.Equal(t, EventKindJobCompleted, event.Kind)
445+
require.NotEqual(t, insertRes.Job.ID, event.Job.ID)
446+
447+
// Job on removed queue should still be available
448+
job, err := client.JobGet(ctx, insertRes.Job.ID)
449+
require.NoError(t, err)
450+
require.Equal(t, rivertype.JobStateAvailable, job.State)
451+
})
452+
453+
t.Run("Queues_Remove_AfterStart", func(t *testing.T) {
454+
t.Parallel()
455+
456+
client, _ := setup(t)
457+
458+
type JobArgs struct {
459+
testutil.JobArgsReflectKind[JobArgs]
460+
}
461+
462+
subscribeChan := subscribe(t, client)
463+
464+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
465+
return nil
466+
}))
467+
468+
queueName := "remove_after_start_queue"
469+
err := client.Queues().Add(queueName, QueueConfig{
470+
MaxWorkers: 2,
471+
})
472+
require.NoError(t, err)
473+
474+
startClient(ctx, t, client)
475+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
476+
477+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
478+
Queue: queueName,
479+
})
480+
require.NoError(t, err)
481+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
482+
require.Equal(t, EventKindJobCompleted, event.Kind)
483+
484+
err = client.Queues().Remove(queueName)
485+
require.NoError(t, err)
486+
487+
insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{
488+
Queue: queueName,
489+
})
490+
require.NoError(t, err)
491+
492+
// Verify job stays available by checking another queue's job completes
493+
_, err = client.Insert(ctx, &noOpArgs{}, nil)
494+
require.NoError(t, err)
495+
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
496+
require.Equal(t, EventKindJobCompleted, event.Kind)
497+
require.NotEqual(t, insertRes.Job.ID, event.Job.ID)
498+
499+
// Job on removed queue should still be available
500+
job, err := client.JobGet(ctx, insertRes.Job.ID)
501+
require.NoError(t, err)
502+
require.Equal(t, rivertype.JobStateAvailable, job.State)
503+
})
504+
505+
t.Run("Queues_Remove_NonExistentQueue", func(t *testing.T) {
506+
t.Parallel()
507+
508+
client, _ := setup(t)
509+
510+
err := client.Queues().Remove("non_existent_queue")
511+
require.Error(t, err)
512+
var queueNotFoundErr *QueueNotFoundError
513+
require.ErrorAs(t, err, &queueNotFoundErr)
514+
require.Equal(t, "non_existent_queue", queueNotFoundErr.Name)
515+
})
516+
517+
t.Run("Queues_Remove_WhenClientWontExecuteJobs", func(t *testing.T) {
518+
t.Parallel()
519+
520+
config, bundle := setupConfig(t)
521+
config.Queues = nil
522+
config.Workers = nil
523+
client := newTestClient(t, bundle.dbPool, config)
524+
525+
err := client.Queues().Remove("any_queue")
526+
require.Error(t, err)
527+
require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot remove queue")
528+
})
529+
530+
t.Run("Queues_Remove_DefaultQueue", func(t *testing.T) {
531+
t.Parallel()
532+
533+
config, bundle := setupConfig(t)
534+
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}}
535+
client := newTestClient(t, bundle.dbPool, config)
536+
537+
type JobArgs struct {
538+
testutil.JobArgsReflectKind[JobArgs]
539+
}
540+
541+
subscribeChan := subscribe(t, client)
542+
543+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
544+
return nil
545+
}))
546+
547+
startClient(ctx, t, client)
548+
549+
_, err := client.Insert(ctx, &JobArgs{}, nil)
550+
require.NoError(t, err)
551+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
552+
require.Equal(t, EventKindJobCompleted, event.Kind)
553+
554+
err = client.Queues().Remove(QueueDefault)
555+
require.NoError(t, err)
556+
557+
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
558+
require.NoError(t, err)
559+
560+
// Verify no more jobs complete by using a short timeout
561+
select {
562+
case <-subscribeChan:
563+
t.Fatal("expected job to not be worked after default queue removal")
564+
case <-time.After(500 * time.Millisecond):
565+
}
566+
567+
// Job should still be available
568+
job, err := client.JobGet(ctx, insertRes.Job.ID)
569+
require.NoError(t, err)
570+
require.Equal(t, rivertype.JobStateAvailable, job.State)
571+
})
572+
573+
t.Run("Queues_Remove_ThenAddAgain", func(t *testing.T) {
574+
t.Parallel()
575+
576+
client, _ := setup(t)
577+
578+
type JobArgs struct {
579+
testutil.JobArgsReflectKind[JobArgs]
580+
}
581+
582+
subscribeChan := subscribe(t, client)
583+
584+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
585+
return nil
586+
}))
587+
588+
queueName := "remove_then_add_queue"
589+
err := client.Queues().Add(queueName, QueueConfig{
590+
MaxWorkers: 2,
591+
})
592+
require.NoError(t, err)
593+
594+
startClient(ctx, t, client)
595+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
596+
597+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
598+
Queue: queueName,
599+
})
600+
require.NoError(t, err)
601+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
602+
require.Equal(t, EventKindJobCompleted, event.Kind)
603+
604+
err = client.Queues().Remove(queueName)
605+
require.NoError(t, err)
606+
607+
err = client.Queues().Add(queueName, QueueConfig{
608+
MaxWorkers: 2,
609+
})
610+
require.NoError(t, err)
611+
612+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
613+
Queue: queueName,
614+
})
615+
require.NoError(t, err)
616+
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
617+
require.Equal(t, EventKindJobCompleted, event.Kind)
618+
})
619+
620+
t.Run("Queues_Remove_JobWaitsUntilReAdded", func(t *testing.T) {
621+
t.Parallel()
622+
623+
config, bundle := setupConfig(t)
624+
config.Queues = map[string]QueueConfig{"test_queue": {MaxWorkers: 10}}
625+
client := newTestClient(t, bundle.dbPool, config)
626+
627+
subscribeChan := subscribe(t, client)
628+
startClient(ctx, t, client)
629+
630+
insertRes1, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
631+
require.NoError(t, err)
632+
633+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
634+
require.Equal(t, EventKindJobCompleted, event.Kind)
635+
require.Equal(t, insertRes1.Job.ID, event.Job.ID)
636+
637+
require.NoError(t, client.Queues().Remove("test_queue"))
638+
639+
insertRes2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
640+
require.NoError(t, err)
641+
642+
select {
643+
case <-subscribeChan:
644+
t.Fatal("expected job 2 to not start on removed queue")
645+
case <-time.After(500 * time.Millisecond):
646+
}
647+
648+
require.NoError(t, client.Queues().Add("test_queue", QueueConfig{MaxWorkers: 10}))
649+
650+
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
651+
require.Equal(t, EventKindJobCompleted, event.Kind)
652+
require.Equal(t, insertRes2.Job.ID, event.Job.ID)
653+
})
654+
409655
t.Run("JobCancelErrorReturned", func(t *testing.T) {
410656
t.Parallel()
411657

error.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,21 @@ func (e *QueueAlreadyAddedError) Is(target error) bool {
5959
return ok
6060
}
6161

62+
// QueueNotFoundError is returned when attempting to remove a queue that does
63+
// not exist on the Client.
64+
type QueueNotFoundError struct {
65+
Name string
66+
}
67+
68+
func (e *QueueNotFoundError) Error() string {
69+
return fmt.Sprintf("queue %q not found", e.Name)
70+
}
71+
72+
func (e *QueueNotFoundError) Is(target error) bool {
73+
_, ok := target.(*QueueNotFoundError)
74+
return ok
75+
}
76+
6277
// UnknownJobKindError is returned when a Client fetches and attempts to
6378
// work a job that has not been registered on the Client's Workers bundle (using AddWorker).
6479
type UnknownJobKindError = rivertype.UnknownJobKindError

0 commit comments

Comments
 (0)