Skip to content

Commit 276e05b

Browse files
committed
A few tweaks to QueueBundle.Remove implementation
This builds on #1235 to bring in a few tweaks: * Removing a queue is a blocking operation because it needs to wait for the producer to finish up its jobs and shut down. It'd be better to provide a way for this not to block forever, so here we add a context parameter to `QueueBundle.Remove` similar to the one taken by `Client.Stop`. If the context becomes done before the producer resolves, `QueueBundle.Remove` falls through with the error. * Add a "stress" test case for `QueueBundle.Remove`. It's meant to detect a deadlock or other concurrency bug in case there is one and gives us a little more confidence that what we have here is right. * Renamed `addProducer` and `removeProducer` to `producerAdd` and `producerRemove` so they sort more nicely against each other. * Add changelogentry.
1 parent 2109122 commit 276e05b

3 files changed

Lines changed: 109 additions & 25 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Added
11+
12+
- Add `QeueueBundle.Remove` to remove an already added queue/producer. [PR #1235](https://github.com/riverqueue/river/pull/1235) and [PR #1240](https://github.com/riverqueue/river/pull/1240).
13+
1014
### Fixed
1115

1216
- Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236).

client.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -767,11 +767,11 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
767767
}
768768

769769
client.queues = &QueueBundle{
770-
addProducer: client.addProducer,
771-
removeProducer: client.removeProducer,
772770
clientFetchCooldown: config.FetchCooldown,
773771
clientFetchPollInterval: config.FetchPollInterval,
774772
clientWillExecuteJobs: config.willExecuteJobs(),
773+
producerAdd: client.producerAdd,
774+
producerRemove: client.producerRemove,
775775
}
776776

777777
baseservice.Init(archetype, &client.baseService)
@@ -879,7 +879,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
879879
client.services = append(client.services, client.elector)
880880

881881
for queue, queueConfig := range config.Queues {
882-
if _, err := client.addProducer(queue, queueConfig); err != nil {
882+
if _, err := client.producerAdd(queue, queueConfig); err != nil {
883883
return nil, err
884884
}
885885
}
@@ -2177,7 +2177,7 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
21772177
return nil
21782178
}
21792179

2180-
func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*producer, error) {
2180+
func (c *Client[TTx]) producerAdd(queueName string, queueConfig QueueConfig) (*producer, error) {
21812181
c.producersMu.Lock()
21822182
defer c.producersMu.Unlock()
21832183

@@ -2210,7 +2210,7 @@ func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*p
22102210
return producer, nil
22112211
}
22122212

2213-
func (c *Client[TTx]) removeProducer(queueName string) error {
2213+
func (c *Client[TTx]) producerRemove(ctx context.Context, queueName string) error {
22142214
c.producersMu.Lock()
22152215
defer c.producersMu.Unlock()
22162216

@@ -2219,7 +2219,17 @@ func (c *Client[TTx]) removeProducer(queueName string) error {
22192219
return &QueueNotFoundError{Name: queueName}
22202220
}
22212221

2222-
producer.Stop()
2222+
shouldStop, stopped, finalizeStop := producer.StopInit()
2223+
if shouldStop {
2224+
select {
2225+
case <-ctx.Done():
2226+
finalizeStop(false)
2227+
return ctx.Err()
2228+
case <-stopped:
2229+
finalizeStop(true)
2230+
}
2231+
}
2232+
22232233
delete(c.producersByQueueName, queueName)
22242234

22252235
return nil
@@ -2812,17 +2822,14 @@ func (c *Client[TTx]) Schema() string { return c.config.Schema }
28122822
// QueueBundle is a bundle for adding additional queues. It's made accessible
28132823
// through Client.Queues.
28142824
type QueueBundle struct {
2815-
// Function that adds a producer to the associated client.
2816-
addProducer func(queueName string, queueConfig QueueConfig) (*producer, error)
2817-
2818-
removeProducer func(queueName string) error
2819-
28202825
clientFetchCooldown time.Duration
28212826
clientFetchPollInterval time.Duration
28222827

28232828
clientWillExecuteJobs bool
28242829

2825-
fetchCtx context.Context //nolint:containedctx
2830+
fetchCtx context.Context //nolint:containedctx
2831+
producerAdd func(queueName string, queueConfig QueueConfig) (*producer, error) // add producer to associated client
2832+
producerRemove func(ctx context.Context, queueName string) error // remove producer from associated client
28262833

28272834
// Mutex that's acquired when client is starting and stopping and when a
28282835
// queue is being added so that we can be sure that a client is fully
@@ -2847,7 +2854,7 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
28472854
b.startStopMu.Lock()
28482855
defer b.startStopMu.Unlock()
28492856

2850-
producer, err := b.addProducer(queueName, queueConfig)
2857+
producer, err := b.producerAdd(queueName, queueConfig)
28512858
if err != nil {
28522859
return err
28532860
}
@@ -2863,21 +2870,22 @@ func (b *QueueBundle) Add(queueName string, queueConfig QueueConfig) error {
28632870
}
28642871

28652872
// Remove removes a queue from the client, stopping the producer if the client
2866-
// is running. The function will block until all jobs currently being worked in
2867-
// the queue have completed. This blocking behavior may affect other operations,
2868-
// including shutdown timing.
2873+
// is running. It waits for any jobs currently being worked in the queue to
2874+
// complete before returning. If the provided context is done before the
2875+
// producer has stopped, Remove returns the context's error and does not remove
2876+
// the queue.
28692877
//
28702878
// Returns an error if the client is not configured to execute jobs or if the
28712879
// specified queue does not exist.
2872-
func (b *QueueBundle) Remove(queueName string) error {
2880+
func (b *QueueBundle) Remove(ctx context.Context, queueName string) error {
28732881
if !b.clientWillExecuteJobs {
28742882
return errors.New("client is not configured to execute jobs, cannot remove queue")
28752883
}
28762884

28772885
b.startStopMu.Lock()
28782886
defer b.startStopMu.Unlock()
28792887

2880-
return b.removeProducer(queueName)
2888+
return b.producerRemove(ctx, queueName)
28812889
}
28822890

28832891
// Generates a default client ID using the current hostname and time.

client_test.go

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,34 @@ func Test_Client_Common(t *testing.T) {
406406
wg.Wait()
407407
})
408408

409+
t.Run("Queues_Remove_Stress", func(t *testing.T) {
410+
t.Parallel()
411+
412+
client, _ := setup(t)
413+
414+
startClient(ctx, t, client)
415+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
416+
417+
var wg sync.WaitGroup
418+
419+
for i := range 5 {
420+
wg.Add(1)
421+
workerNum := i
422+
go func() {
423+
defer wg.Done()
424+
425+
for j := range 5 {
426+
queueName := fmt.Sprintf("stress_queue_%d_%d", workerNum, j)
427+
428+
require.NoError(t, client.Queues().Add(queueName, QueueConfig{MaxWorkers: 1}))
429+
require.NoError(t, client.Queues().Remove(ctx, queueName))
430+
}
431+
}()
432+
}
433+
434+
wg.Wait()
435+
})
436+
409437
t.Run("Queues_Remove_BeforeStart", func(t *testing.T) {
410438
t.Parallel()
411439

@@ -427,7 +455,7 @@ func Test_Client_Common(t *testing.T) {
427455
})
428456
require.NoError(t, err)
429457

430-
err = client.Queues().Remove(queueName)
458+
err = client.Queues().Remove(ctx, queueName)
431459
require.NoError(t, err)
432460

433461
startClient(ctx, t, client)
@@ -481,7 +509,7 @@ func Test_Client_Common(t *testing.T) {
481509
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
482510
require.Equal(t, EventKindJobCompleted, event.Kind)
483511

484-
err = client.Queues().Remove(queueName)
512+
err = client.Queues().Remove(ctx, queueName)
485513
require.NoError(t, err)
486514

487515
insertRes, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{
@@ -502,12 +530,56 @@ func Test_Client_Common(t *testing.T) {
502530
require.Equal(t, rivertype.JobStateAvailable, job.State)
503531
})
504532

533+
t.Run("Queues_Remove_ContextDone", func(t *testing.T) {
534+
t.Parallel()
535+
536+
client, _ := setup(t)
537+
538+
type JobArgs struct {
539+
testutil.JobArgsReflectKind[JobArgs]
540+
}
541+
542+
jobStartedChan := make(chan struct{})
543+
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
544+
close(jobStartedChan)
545+
<-ctx.Done()
546+
return nil
547+
}))
548+
549+
queueName := "remove_context_done_queue"
550+
require.NoError(t, client.Queues().Add(queueName, QueueConfig{MaxWorkers: 2}))
551+
552+
startClient(ctx, t, client)
553+
riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started())
554+
555+
_, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{Queue: queueName})
556+
require.NoError(t, err)
557+
558+
riversharedtest.WaitOrTimeout(t, jobStartedChan)
559+
560+
// Remove with an already-cancelled context should return immediately
561+
// without removing the queue.
562+
cancelledCtx, cancel := context.WithCancel(ctx)
563+
cancel()
564+
565+
err = client.Queues().Remove(cancelledCtx, queueName)
566+
require.ErrorIs(t, err, context.Canceled)
567+
568+
// Queue should still exist and be functional since Remove bailed out.
569+
// Verify by successfully removing it with a valid context after
570+
// cancelling the job via StopAndCancel.
571+
require.NoError(t, client.StopAndCancel(ctx))
572+
573+
// Re-start so startClient's cleanup Stop doesn't fail.
574+
require.NoError(t, client.Start(ctx))
575+
})
576+
505577
t.Run("Queues_Remove_NonExistentQueue", func(t *testing.T) {
506578
t.Parallel()
507579

508580
client, _ := setup(t)
509581

510-
err := client.Queues().Remove("non_existent_queue")
582+
err := client.Queues().Remove(ctx, "non_existent_queue")
511583
require.Error(t, err)
512584
var queueNotFoundErr *QueueNotFoundError
513585
require.ErrorAs(t, err, &queueNotFoundErr)
@@ -522,7 +594,7 @@ func Test_Client_Common(t *testing.T) {
522594
config.Workers = nil
523595
client := newTestClient(t, bundle.dbPool, config)
524596

525-
err := client.Queues().Remove("any_queue")
597+
err := client.Queues().Remove(ctx, "any_queue")
526598
require.Error(t, err)
527599
require.Contains(t, err.Error(), "client is not configured to execute jobs, cannot remove queue")
528600
})
@@ -551,7 +623,7 @@ func Test_Client_Common(t *testing.T) {
551623
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
552624
require.Equal(t, EventKindJobCompleted, event.Kind)
553625

554-
err = client.Queues().Remove(QueueDefault)
626+
err = client.Queues().Remove(ctx, QueueDefault)
555627
require.NoError(t, err)
556628

557629
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
@@ -601,7 +673,7 @@ func Test_Client_Common(t *testing.T) {
601673
event := riversharedtest.WaitOrTimeout(t, subscribeChan)
602674
require.Equal(t, EventKindJobCompleted, event.Kind)
603675

604-
err = client.Queues().Remove(queueName)
676+
err = client.Queues().Remove(ctx, queueName)
605677
require.NoError(t, err)
606678

607679
err = client.Queues().Add(queueName, QueueConfig{
@@ -634,7 +706,7 @@ func Test_Client_Common(t *testing.T) {
634706
require.Equal(t, EventKindJobCompleted, event.Kind)
635707
require.Equal(t, insertRes1.Job.ID, event.Job.ID)
636708

637-
require.NoError(t, client.Queues().Remove("test_queue"))
709+
require.NoError(t, client.Queues().Remove(ctx, "test_queue"))
638710

639711
insertRes2, err := client.Insert(ctx, &noOpArgs{}, &InsertOpts{Queue: "test_queue"})
640712
require.NoError(t, err)

0 commit comments

Comments
 (0)