Skip to content

Commit 2f43a06

Browse files
committed
Add QueueBundle.Remove() to stop and remove queues
Adds dynamic queue removal at runtime. When removed, the queue's producer is stopped and jobs can no longer be worked on that queue. Queues can be re-added after removal. Signed-off-by: Tiago Silva <3629062+tigrato@users.noreply.github.com>
1 parent 6b98fb9 commit 2f43a06

2 files changed

Lines changed: 250 additions & 0 deletions

File tree

client.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
767767

768768
client.queues = &QueueBundle{
769769
addProducer: client.addProducer,
770+
removeProducer: client.removeProducer,
770771
clientFetchCooldown: config.FetchCooldown,
771772
clientFetchPollInterval: config.FetchPollInterval,
772773
clientWillExecuteJobs: config.willExecuteJobs(),
@@ -2198,6 +2199,17 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p
21982199
return producer, nil
21992200
}
22002201

2202+
func (c *Client[TTx]) removeProducer(queueName string) {
2203+
producer, ok := c.producersByQueueName[queueName]
2204+
if !ok {
2205+
return
2206+
}
2207+
2208+
producer.Stop()
2209+
2210+
delete(c.producersByQueueName, queueName)
2211+
}
2212+
22012213
var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)
22022214

22032215
func validateQueueName(queueName string) error {
@@ -2781,6 +2793,8 @@ type QueueBundle struct {
27812793
// Function that adds a producer to the associated client.
27822794
addProducer func(queueName string, queueConfig QueueConfig) (*producer, error)
27832795

2796+
removeProducer func(queueName string)
2797+
27842798
clientFetchCooldown time.Duration
27852799
clientFetchPollInterval time.Duration
27862800

@@ -2826,6 +2840,17 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
28262840
return nil
28272841
}
28282842

2843+
// Remove removes a queue from the client. If the client is already started,
2844+
// the producer for the queue is stopped.
2845+
func (b *QueueBundle) Remove(queueName string) error {
2846+
b.startStopMu.Lock()
2847+
defer b.startStopMu.Unlock()
2848+
2849+
b.removeProducer(queueName)
2850+
2851+
return nil
2852+
}
2853+
28292854
// Generates a default client ID using the current hostname and time.
28302855
func defaultClientID(startedAt time.Time) string {
28312856
host, _ := os.Hostname()

client_test.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,231 @@ func Test_Client_Common(t *testing.T) {
406406
wg.Wait()
407407
})
408408

409+
t.Run("Queues_Remove_BeforeStart", func(t *testing.T) {
410+
t.Parallel()
411+
412+
client, _ := setup(t)
413+
414+
type JobArgs struct {
415+
testutil.JobArgsReflectKind[JobArgs]
416+
}
417+
418+
workedChan := make(chan struct{})
419+
420+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
421+
workedChan <- struct{}{}
422+
return nil
423+
}))
424+
425+
queueName := "remove_before_start_queue"
426+
err := client.Queues().Add(queueName, QueueConfig{
427+
MaxWorkers: 2,
428+
})
429+
require.NoError(t, err)
430+
431+
// Remove the queue before starting the client
432+
err = client.Queues().Remove(queueName)
433+
require.NoError(t, err)
434+
435+
startClient(ctx, t, client)
436+
437+
// Jobs inserted to removed queue should not be worked
438+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
439+
Queue: queueName,
440+
})
441+
require.NoError(t, err)
442+
443+
select {
444+
case <-workedChan:
445+
t.Fatal("expected job to not be worked after queue removal")
446+
case <-time.After(500 * time.Millisecond):
447+
}
448+
})
449+
450+
t.Run("Queues_Remove_AfterStart", func(t *testing.T) {
451+
t.Parallel()
452+
453+
client, _ := setup(t)
454+
455+
type JobArgs struct {
456+
testutil.JobArgsReflectKind[JobArgs]
457+
}
458+
459+
workedChan := make(chan struct{})
460+
461+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
462+
workedChan <- struct{}{}
463+
return nil
464+
}))
465+
466+
queueName := "remove_after_start_queue"
467+
err := client.Queues().Add(queueName, QueueConfig{
468+
MaxWorkers: 2,
469+
})
470+
require.NoError(t, err)
471+
472+
startClient(ctx, t, client)
473+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
474+
475+
// Verify queue works before removal
476+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
477+
Queue: queueName,
478+
})
479+
require.NoError(t, err)
480+
riversharedtest.WaitOrTimeout(t, workedChan)
481+
482+
// Remove the queue after start
483+
err = client.Queues().Remove(queueName)
484+
require.NoError(t, err)
485+
486+
// Jobs inserted to removed queue should not be worked
487+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
488+
Queue: queueName,
489+
})
490+
require.NoError(t, err)
491+
492+
select {
493+
case <-workedChan:
494+
t.Fatal("expected job to not be worked after queue removal")
495+
case <-time.After(500 * time.Millisecond):
496+
}
497+
})
498+
499+
t.Run("Queues_Remove_NonExistentQueue", func(t *testing.T) {
500+
t.Parallel()
501+
502+
client, _ := setup(t)
503+
504+
// Removing a non-existent queue should not error
505+
err := client.Queues().Remove("non_existent_queue")
506+
require.NoError(t, err)
507+
})
508+
509+
t.Run("Queues_Remove_DefaultQueue", func(t *testing.T) {
510+
t.Parallel()
511+
512+
config, bundle := setupConfig(t)
513+
config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}}
514+
client := newTestClient(t, bundle.dbPool, config)
515+
516+
type JobArgs struct {
517+
testutil.JobArgsReflectKind[JobArgs]
518+
}
519+
520+
workedChan := make(chan struct{})
521+
522+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
523+
workedChan <- struct{}{}
524+
return nil
525+
}))
526+
527+
startClient(ctx, t, client)
528+
529+
// Verify default queue works
530+
_, err := client.Insert(ctx, &JobArgs{}, nil)
531+
require.NoError(t, err)
532+
riversharedtest.WaitOrTimeout(t, workedChan)
533+
534+
// Remove default queue
535+
err = client.Queues().Remove(QueueDefault)
536+
require.NoError(t, err)
537+
538+
// Jobs should not be worked after removal
539+
_, err = client.Insert(ctx, &JobArgs{}, nil)
540+
require.NoError(t, err)
541+
542+
select {
543+
case <-workedChan:
544+
t.Fatal("expected job to not be worked after default queue removal")
545+
case <-time.After(500 * time.Millisecond):
546+
}
547+
})
548+
549+
t.Run("Queues_Remove_ThenAddAgain", func(t *testing.T) {
550+
t.Parallel()
551+
552+
client, _ := setup(t)
553+
554+
type JobArgs struct {
555+
testutil.JobArgsReflectKind[JobArgs]
556+
}
557+
558+
workedChan := make(chan struct{})
559+
560+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
561+
workedChan <- struct{}{}
562+
return nil
563+
}))
564+
565+
queueName := "remove_then_add_queue"
566+
err := client.Queues().Add(queueName, QueueConfig{
567+
MaxWorkers: 2,
568+
})
569+
require.NoError(t, err)
570+
571+
startClient(ctx, t, client)
572+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
573+
574+
// Verify queue works
575+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
576+
Queue: queueName,
577+
})
578+
require.NoError(t, err)
579+
riversharedtest.WaitOrTimeout(t, workedChan)
580+
581+
// Remove the queue
582+
err = client.Queues().Remove(queueName)
583+
require.NoError(t, err)
584+
585+
// Add it back again
586+
err = client.Queues().Add(queueName, QueueConfig{
587+
MaxWorkers: 2,
588+
})
589+
require.NoError(t, err)
590+
591+
// Verify queue works again after re-adding
592+
_, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{
593+
Queue: queueName,
594+
})
595+
require.NoError(t, err)
596+
riversharedtest.WaitOrTimeout(t, workedChan)
597+
})
598+
599+
t.Run("Queues_Remove_JobWaitsUntilReAdded", func(t *testing.T) {
600+
t.Parallel()
601+
602+
config, bundle := setupConfig(t)
603+
config.Queues = map[string]QueueConfig{"test_queue": {MaxWorkers: 10}}
604+
client := newTestClient(t, bundle.dbPool, config)
605+
606+
subscribeChan := subscribe(t, client)
607+
startClient(ctx, t, client)
608+
609+
insertRes1, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
610+
require.NoError(t, err)
611+
612+
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
613+
require.Equal(t, EventKindJobCompleted, event.Kind)
614+
require.Equal(t, insertRes1.Job.ID, event.Job.ID)
615+
616+
require.NoError(t, client.Queues().Remove("test_queue"))
617+
618+
insertRes2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
619+
require.NoError(t, err)
620+
621+
select {
622+
case <-subscribeChan:
623+
t.Fatal("expected job 2 to not start on removed queue")
624+
case <-time.After(500 * time.Millisecond):
625+
}
626+
627+
require.NoError(t, client.Queues().Add("test_queue", QueueConfig{MaxWorkers: 10}))
628+
629+
event = riversharedtest.WaitOrTimeout(t, subscribeChan)
630+
require.Equal(t, EventKindJobCompleted, event.Kind)
631+
require.Equal(t, insertRes2.Job.ID, event.Job.ID)
632+
})
633+
409634
t.Run("JobCancelErrorReturned", func(t *testing.T) {
410635
t.Parallel()
411636

0 commit comments

Comments
 (0)