Skip to content

Commit c6d65e5

Browse files
authored
Patch unsafe producer map access (#1236)
While doing a little work on #1235, my LLM flagged that the way that we're accessing `producersByQueueName` today is already a little unsafe as it may be read in multiple places concurrently. There's a mutex to protect it somewhat in the `QueueBundle`, but it may still race between a change there a "notify producer" send. Here, add one additional `RWMutex` that makes sure to synchronize read access on the map.
1 parent 6b98fb9 commit c6d65e5

2 files changed

Lines changed: 24 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- Fix unsafe concurrent producer map access in client. [PR #1236](https://github.com/riverqueue/river/pull/1236).
13+
1014
## [0.35.1] - 2026-04-26
1115

1216
### Fixed

client.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,7 @@ type Client[TTx any] struct {
636636
periodicJobs *PeriodicJobBundle
637637
pilot riverpilot.Pilot
638638
producersByQueueName map[string]*producer
639+
producersMu sync.RWMutex
639640
queueMaintainer *maintenance.QueueMaintainer
640641
queueMaintainerLeader *maintenance.QueueMaintainerLeader
641642
queues *QueueBundle
@@ -1959,7 +1960,14 @@ func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*rivertype.
19591960
// transaction, the producer wouldn't yet be able to access the new jobs that
19601961
// triggered the notification because they're not committed yet.
19611962
func (c *Client[TTx]) notifyProducerWithoutListenerJobFetch(_ context.Context, res []*rivertype.JobInsertResult) {
1962-
if c.driver.SupportsListener() || len(c.producersByQueueName) < 1 {
1963+
if c.driver.SupportsListener() {
1964+
return
1965+
}
1966+
1967+
c.producersMu.RLock()
1968+
defer c.producersMu.RUnlock()
1969+
1970+
if len(c.producersByQueueName) < 1 {
19631971
return
19641972
}
19651973

@@ -2169,6 +2177,9 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
21692177
}
21702178

21712179
func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) (*producer, error) {
2180+
c.producersMu.Lock()
2181+
defer c.producersMu.Unlock()
2182+
21722183
if _, alreadyExists := c.producersByQueueName[queueName]; alreadyExists {
21732184
return nil, &QueueAlreadyAddedError{Name: queueName}
21742185
}
@@ -2722,7 +2733,14 @@ func (c *Client[TTx]) QueueUpdateTx(ctx context.Context, tx TTx, name string, pa
27222733
// transaction, the producer wouldn't yet be able to access the state that
27232734
// triggered the notification because it's not committed yet.
27242735
func (c *Client[TTx]) notifyProducerWithoutListenerQueueControlEvent(queue string, controlEvent *controlEventPayload) {
2725-
if c.driver.SupportsListener() || len(c.producersByQueueName) < 1 {
2736+
if c.driver.SupportsListener() {
2737+
return
2738+
}
2739+
2740+
c.producersMu.RLock()
2741+
defer c.producersMu.RUnlock()
2742+
2743+
if len(c.producersByQueueName) < 1 {
27262744
return
27272745
}
27282746

0 commit comments

Comments
 (0)