diff --git a/pkg/settings/limits/resource.go b/pkg/settings/limits/resource.go index 02b2d27305..04f68ea512 100644 --- a/pkg/settings/limits/resource.go +++ b/pkg/settings/limits/resource.go @@ -90,6 +90,10 @@ type resourcePoolLimiter[N Number] struct { recordDenied func(context.Context, N, ...metric.RecordOption) // optional } +func (l *resourcePoolLimiter[N]) setOnLimitUpdate(fn func(ctx context.Context)) { + l.updater.onLimitUpdate = fn +} + func (l *resourcePoolLimiter[N]) createGauges(meter metric.Meter, unit string) error { if l.key == "" { return errors.New("metrics require Key to be set") @@ -172,6 +176,14 @@ type resourcePoolUsage[N Number] struct { cancelSub func() // optional } +// onLimitUpdate is invoked when the configured limit changes. It attempts to +// wake queued waiters using the new limit. +func (u *resourcePoolUsage[N]) onLimitUpdate() { + u.mu.Lock() + defer u.mu.Unlock() + u.tryWakeWaiters() +} + func (l *resourcePoolLimiter[N]) newLimitUsage(opts ...metric.RecordOption) *resourcePoolUsage[N] { u := resourcePoolUsage[N]{ resourcePoolLimiter: l, @@ -367,6 +379,9 @@ func newUnscopedResourcePoolLimiter[N Number](defaultLimit N) *unscopedResourceP }, } l.resourcePoolUsage = l.newLimitUsage() + l.setOnLimitUpdate(func(context.Context) { + l.resourcePoolUsage.onLimitUpdate() + }) return l } @@ -489,6 +504,15 @@ func newScopedResourcePoolLimiter[N Number](scope settings.Scope, key string, de }, scope: scope, } + l.setOnLimitUpdate(func(ctx context.Context) { + tenant := l.scope.Value(ctx) + if tenant == "" { + return + } + if usage, ok := l.used.Load(tenant); ok { + usage.(*resourcePoolUsage[N]).onLimitUpdate() + } + }) return l } diff --git a/pkg/settings/limits/resource_test.go b/pkg/settings/limits/resource_test.go index f5aed65444..ac568aa4ec 100644 --- a/pkg/settings/limits/resource_test.go +++ b/pkg/settings/limits/resource_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "sync/atomic" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" @@ -453,6 +455,57 @@ func TestResourcePoolLimiter_BasicUsage(t *testing.T) { assert.Equal(t, 5, avail) } +// TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock verifies that a waiter +// is woken up when the limit is reduced to zero and then increased again. +func TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock(t *testing.T) { + t.Parallel() + + var limit atomic.Int64 + limit.Store(1) + + limiter := newUnscopedResourcePoolLimiter(1) + limiter.getLimitFn = func(context.Context) (int, error) { + return int(limit.Load()), nil + } + go limiter.updateLoop(contexts.CRE{}) + t.Cleanup(func() { assert.NoError(t, limiter.Close()) }) + + ctx := t.Context() + + // Consume the single available resource to force the next waiter to enqueue. + freeFirst, err := limiter.Wait(ctx, 1) + require.NoError(t, err) + + enqueued := make(chan struct{}, 1) + limiter.resourcePoolUsage.setOnEnqueue(func() { enqueued <- struct{}{} }) + + waitErr := make(chan error, 1) + go func() { + _, err := limiter.Wait(t.Context(), 1) + waitErr <- err + }() + + // Ensure the waiter is queued before mutating the limit. + <-enqueued + + // Drop the limit to zero, then free the first resource. The queued waiter + // remains blocked because tryWakeWaiters sees a zero limit. + limit.Store(0) + freeFirst() + + // Raise the limit again; the queued waiter should be woken by the update. + limit.Store(1) + + select { + case err := <-waitErr: + require.NoError(t, err) + // release to avoid affecting subsequent waits + _ = limiter.Free(ctx, 1) + case <-time.After(pollPeriod * 3): + t.Fatal("waiter did not return after limit flap") + } +} + // setOnEnqueue sets a callback that is invoked each time a waiter is added to the queue. // The callback is called with the mutex held. Used for testing to synchronize without sleeps. func (u *resourcePoolUsage[N]) setOnEnqueue(fn func()) { diff --git a/pkg/settings/limits/updater.go b/pkg/settings/limits/updater.go index 3d2b9971a7..b1ac35343b 100644 --- a/pkg/settings/limits/updater.go +++ b/pkg/settings/limits/updater.go @@ -15,10 +15,11 @@ import ( // updater monitors limit updates via subscriptions or polling and reports them via recordLimit. // If an updateLoop goroutine is spawned, then Close must be called. type updater[N any] struct { - lggr logger.Logger - getLimitFn func(context.Context) (N, error) - subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional - recordLimit func(context.Context, N) + lggr logger.Logger + getLimitFn func(context.Context) (N, error) + subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional + recordLimit func(context.Context, N) + onLimitUpdate func(context.Context) creCh chan struct{} cre atomic.Value @@ -99,13 +100,21 @@ func (u *updater[N]) updateLoop(cre contexts.CRE) { if err != nil { u.lggr.Errorw("Failed to get limit. Using default value", "default", limit, "err", err) } - u.recordLimit(contexts.WithCRE(ctx, cre), limit) + rcCtx := contexts.WithCRE(ctx, cre) + u.recordLimit(rcCtx, limit) + if u.onLimitUpdate != nil { + u.onLimitUpdate(rcCtx) + } case update := <-updates: if update.Err != nil { u.lggr.Errorw("Failed to update limit. Using default value", "default", update.Value, "err", update.Err) } - u.recordLimit(contexts.WithCRE(ctx, cre), update.Value) + rcCtx := contexts.WithCRE(ctx, cre) + u.recordLimit(rcCtx, update.Value) + if u.onLimitUpdate != nil { + u.onLimitUpdate(rcCtx) + } case <-u.creCh: cre = u.cre.Load().(contexts.CRE)