Skip to content

Commit 7fc4b52

Browse files
committed
[CRE][Limits] Handle limit flip to zero and back correctly
1 parent 7f47b3a commit 7fc4b52

3 files changed

Lines changed: 98 additions & 6 deletions

File tree

pkg/settings/limits/resource.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ type resourcePoolLimiter[N Number] struct {
9090
recordDenied func(context.Context, N, ...metric.RecordOption) // optional
9191
}
9292

93+
func (l *resourcePoolLimiter[N]) setOnLimitUpdate(fn func(ctx context.Context)) {
94+
l.updater.onLimitUpdate = fn
95+
}
96+
9397
func (l *resourcePoolLimiter[N]) createGauges(meter metric.Meter, unit string) error {
9498
if l.key == "" {
9599
return errors.New("metrics require Key to be set")
@@ -172,6 +176,14 @@ type resourcePoolUsage[N Number] struct {
172176
cancelSub func() // optional
173177
}
174178

179+
// onLimitUpdate is invoked when the configured limit changes. It attempts to
180+
// wake queued waiters using the new limit.
181+
func (u *resourcePoolUsage[N]) onLimitUpdate() {
182+
u.mu.Lock()
183+
defer u.mu.Unlock()
184+
u.tryWakeWaiters()
185+
}
186+
175187
func (l *resourcePoolLimiter[N]) newLimitUsage(opts ...metric.RecordOption) *resourcePoolUsage[N] {
176188
u := resourcePoolUsage[N]{
177189
resourcePoolLimiter: l,
@@ -367,6 +379,9 @@ func newUnscopedResourcePoolLimiter[N Number](defaultLimit N) *unscopedResourceP
367379
},
368380
}
369381
l.resourcePoolUsage = l.newLimitUsage()
382+
l.setOnLimitUpdate(func(context.Context) {
383+
l.resourcePoolUsage.onLimitUpdate()
384+
})
370385
return l
371386
}
372387

@@ -489,6 +504,15 @@ func newScopedResourcePoolLimiter[N Number](scope settings.Scope, key string, de
489504
},
490505
scope: scope,
491506
}
507+
l.setOnLimitUpdate(func(ctx context.Context) {
508+
tenant := l.scope.Value(ctx)
509+
if tenant == "" {
510+
return
511+
}
512+
if usage, ok := l.used.Load(tenant); ok {
513+
usage.(*resourcePoolUsage[N]).onLimitUpdate()
514+
}
515+
})
492516
return l
493517
}
494518

pkg/settings/limits/resource_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"testing"
99
"time"
1010

11+
"sync/atomic"
12+
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
1315
"go.opentelemetry.io/otel/attribute"
@@ -453,6 +455,63 @@ func TestResourcePoolLimiter_BasicUsage(t *testing.T) {
453455
assert.Equal(t, 5, avail)
454456
}
455457

458+
// TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock verifies that a waiter
459+
// is woken up when the limit is reduced to zero and then increased again.
460+
func TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock(t *testing.T) {
461+
t.Parallel()
462+
463+
origPoll := pollPeriod
464+
pollPeriod = 10 * time.Millisecond
465+
t.Cleanup(func() { pollPeriod = origPoll })
466+
467+
var limit atomic.Int64
468+
limit.Store(1)
469+
470+
limiter := newUnscopedResourcePoolLimiter(1)
471+
limiter.getLimitFn = func(context.Context) (int, error) {
472+
return int(limit.Load()), nil
473+
}
474+
go limiter.updateLoop(contexts.CRE{})
475+
t.Cleanup(func() { assert.NoError(t, limiter.Close()) })
476+
477+
ctx := t.Context()
478+
479+
// Consume the single available resource to force the next waiter to enqueue.
480+
freeFirst, err := limiter.Wait(ctx, 1)
481+
require.NoError(t, err)
482+
483+
enqueued := make(chan struct{}, 1)
484+
limiter.resourcePoolUsage.setOnEnqueue(func() { enqueued <- struct{}{} })
485+
486+
waitErr := make(chan error, 1)
487+
go func() {
488+
waitCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
489+
defer cancel()
490+
_, err := limiter.Wait(waitCtx, 1)
491+
waitErr <- err
492+
}()
493+
494+
// Ensure the waiter is queued before mutating the limit.
495+
<-enqueued
496+
497+
// Drop the limit to zero, then free the first resource. The queued waiter
498+
// remains blocked because tryWakeWaiters sees a zero limit.
499+
limit.Store(0)
500+
freeFirst()
501+
502+
// Raise the limit again; the queued waiter should be woken by the update.
503+
limit.Store(1)
504+
505+
select {
506+
case err := <-waitErr:
507+
require.NoError(t, err)
508+
// release to avoid affecting subsequent waits
509+
_ = limiter.Free(ctx, 1)
510+
case <-time.After(time.Second):
511+
t.Fatal("waiter did not return after limit flap")
512+
}
513+
}
514+
456515
// setOnEnqueue sets a callback that is invoked each time a waiter is added to the queue.
457516
// The callback is called with the mutex held. Used for testing to synchronize without sleeps.
458517
func (u *resourcePoolUsage[N]) setOnEnqueue(fn func()) {

pkg/settings/limits/updater.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import (
1515
// updater monitors limit updates via subscriptions or polling and reports them via recordLimit.
1616
// If an updateLoop goroutine is spawned, then Close must be called.
1717
type updater[N any] struct {
18-
lggr logger.Logger
19-
getLimitFn func(context.Context) (N, error)
20-
subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional
21-
recordLimit func(context.Context, N)
18+
lggr logger.Logger
19+
getLimitFn func(context.Context) (N, error)
20+
subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional
21+
recordLimit func(context.Context, N)
22+
onLimitUpdate func(context.Context)
2223

2324
creCh chan struct{}
2425
cre atomic.Value
@@ -99,13 +100,21 @@ func (u *updater[N]) updateLoop(cre contexts.CRE) {
99100
if err != nil {
100101
u.lggr.Errorw("Failed to get limit. Using default value", "default", limit, "err", err)
101102
}
102-
u.recordLimit(contexts.WithCRE(ctx, cre), limit)
103+
rcCtx := contexts.WithCRE(ctx, cre)
104+
u.recordLimit(rcCtx, limit)
105+
if u.onLimitUpdate != nil {
106+
u.onLimitUpdate(rcCtx)
107+
}
103108

104109
case update := <-updates:
105110
if update.Err != nil {
106111
u.lggr.Errorw("Failed to update limit. Using default value", "default", update.Value, "err", update.Err)
107112
}
108-
u.recordLimit(contexts.WithCRE(ctx, cre), update.Value)
113+
rcCtx := contexts.WithCRE(ctx, cre)
114+
u.recordLimit(rcCtx, update.Value)
115+
if u.onLimitUpdate != nil {
116+
u.onLimitUpdate(rcCtx)
117+
}
109118

110119
case <-u.creCh:
111120
cre = u.cre.Load().(contexts.CRE)

0 commit comments

Comments
 (0)