Skip to content

Commit 8bde3ee

Browse files
authored
[CRE][Limits] Handle limit flip to zero and back correctly (#1762)
1 parent 7f47b3a commit 8bde3ee

3 files changed

Lines changed: 92 additions & 6 deletions

File tree

pkg/settings/limits/resource.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ type resourcePoolLimiter[N Number] struct {
9090
recordDenied func(context.Context, N, ...metric.RecordOption) // optional
9191
}
9292

93+
func (l *resourcePoolLimiter[N]) setOnLimitUpdate(fn func(ctx context.Context)) {
94+
l.updater.onLimitUpdate = fn
95+
}
96+
9397
func (l *resourcePoolLimiter[N]) createGauges(meter metric.Meter, unit string) error {
9498
if l.key == "" {
9599
return errors.New("metrics require Key to be set")
@@ -172,6 +176,14 @@ type resourcePoolUsage[N Number] struct {
172176
cancelSub func() // optional
173177
}
174178

179+
// onLimitUpdate is invoked when the configured limit changes. It attempts to
180+
// wake queued waiters using the new limit.
181+
func (u *resourcePoolUsage[N]) onLimitUpdate() {
182+
u.mu.Lock()
183+
defer u.mu.Unlock()
184+
u.tryWakeWaiters()
185+
}
186+
175187
func (l *resourcePoolLimiter[N]) newLimitUsage(opts ...metric.RecordOption) *resourcePoolUsage[N] {
176188
u := resourcePoolUsage[N]{
177189
resourcePoolLimiter: l,
@@ -367,6 +379,9 @@ func newUnscopedResourcePoolLimiter[N Number](defaultLimit N) *unscopedResourceP
367379
},
368380
}
369381
l.resourcePoolUsage = l.newLimitUsage()
382+
l.setOnLimitUpdate(func(context.Context) {
383+
l.resourcePoolUsage.onLimitUpdate()
384+
})
370385
return l
371386
}
372387

@@ -489,6 +504,15 @@ func newScopedResourcePoolLimiter[N Number](scope settings.Scope, key string, de
489504
},
490505
scope: scope,
491506
}
507+
l.setOnLimitUpdate(func(ctx context.Context) {
508+
tenant := l.scope.Value(ctx)
509+
if tenant == "" {
510+
return
511+
}
512+
if usage, ok := l.used.Load(tenant); ok {
513+
usage.(*resourcePoolUsage[N]).onLimitUpdate()
514+
}
515+
})
492516
return l
493517
}
494518

pkg/settings/limits/resource_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"testing"
99
"time"
1010

11+
"sync/atomic"
12+
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
1315
"go.opentelemetry.io/otel/attribute"
@@ -453,6 +455,57 @@ func TestResourcePoolLimiter_BasicUsage(t *testing.T) {
453455
assert.Equal(t, 5, avail)
454456
}
455457

458+
// TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock verifies that a waiter
459+
// is woken up when the limit is reduced to zero and then increased again.
460+
func TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock(t *testing.T) {
461+
t.Parallel()
462+
463+
var limit atomic.Int64
464+
limit.Store(1)
465+
466+
limiter := newUnscopedResourcePoolLimiter(1)
467+
limiter.getLimitFn = func(context.Context) (int, error) {
468+
return int(limit.Load()), nil
469+
}
470+
go limiter.updateLoop(contexts.CRE{})
471+
t.Cleanup(func() { assert.NoError(t, limiter.Close()) })
472+
473+
ctx := t.Context()
474+
475+
// Consume the single available resource to force the next waiter to enqueue.
476+
freeFirst, err := limiter.Wait(ctx, 1)
477+
require.NoError(t, err)
478+
479+
enqueued := make(chan struct{}, 1)
480+
limiter.resourcePoolUsage.setOnEnqueue(func() { enqueued <- struct{}{} })
481+
482+
waitErr := make(chan error, 1)
483+
go func() {
484+
_, err := limiter.Wait(t.Context(), 1)
485+
waitErr <- err
486+
}()
487+
488+
// Ensure the waiter is queued before mutating the limit.
489+
<-enqueued
490+
491+
// Drop the limit to zero, then free the first resource. The queued waiter
492+
// remains blocked because tryWakeWaiters sees a zero limit.
493+
limit.Store(0)
494+
freeFirst()
495+
496+
// Raise the limit again; the queued waiter should be woken by the update.
497+
limit.Store(1)
498+
499+
select {
500+
case err := <-waitErr:
501+
require.NoError(t, err)
502+
// release to avoid affecting subsequent waits
503+
_ = limiter.Free(ctx, 1)
504+
case <-time.After(pollPeriod * 3):
505+
t.Fatal("waiter did not return after limit flap")
506+
}
507+
}
508+
456509
// setOnEnqueue sets a callback that is invoked each time a waiter is added to the queue.
457510
// The callback is called with the mutex held. Used for testing to synchronize without sleeps.
458511
func (u *resourcePoolUsage[N]) setOnEnqueue(fn func()) {

pkg/settings/limits/updater.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ import (
1515
// updater monitors limit updates via subscriptions or polling and reports them via recordLimit.
1616
// If an updateLoop goroutine is spawned, then Close must be called.
1717
type updater[N any] struct {
18-
lggr logger.Logger
19-
getLimitFn func(context.Context) (N, error)
20-
subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional
21-
recordLimit func(context.Context, N)
18+
lggr logger.Logger
19+
getLimitFn func(context.Context) (N, error)
20+
subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional
21+
recordLimit func(context.Context, N)
22+
onLimitUpdate func(context.Context)
2223

2324
creCh chan struct{}
2425
cre atomic.Value
@@ -99,13 +100,21 @@ func (u *updater[N]) updateLoop(cre contexts.CRE) {
99100
if err != nil {
100101
u.lggr.Errorw("Failed to get limit. Using default value", "default", limit, "err", err)
101102
}
102-
u.recordLimit(contexts.WithCRE(ctx, cre), limit)
103+
rcCtx := contexts.WithCRE(ctx, cre)
104+
u.recordLimit(rcCtx, limit)
105+
if u.onLimitUpdate != nil {
106+
u.onLimitUpdate(rcCtx)
107+
}
103108

104109
case update := <-updates:
105110
if update.Err != nil {
106111
u.lggr.Errorw("Failed to update limit. Using default value", "default", update.Value, "err", update.Err)
107112
}
108-
u.recordLimit(contexts.WithCRE(ctx, cre), update.Value)
113+
rcCtx := contexts.WithCRE(ctx, cre)
114+
u.recordLimit(rcCtx, update.Value)
115+
if u.onLimitUpdate != nil {
116+
u.onLimitUpdate(rcCtx)
117+
}
109118

110119
case <-u.creCh:
111120
cre = u.cre.Load().(contexts.CRE)

0 commit comments

Comments
 (0)