Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/settings/limits/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ type resourcePoolLimiter[N Number] struct {
recordDenied func(context.Context, N, ...metric.RecordOption) // optional
}

func (l *resourcePoolLimiter[N]) setOnLimitUpdate(fn func(ctx context.Context)) {
l.updater.onLimitUpdate = fn
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need a write lock?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's internal and only used by constructors.

}

func (l *resourcePoolLimiter[N]) createGauges(meter metric.Meter, unit string) error {
if l.key == "" {
return errors.New("metrics require Key to be set")
Expand Down Expand Up @@ -172,6 +176,14 @@ type resourcePoolUsage[N Number] struct {
cancelSub func() // optional
}

// onLimitUpdate is invoked when the configured limit changes. It attempts to
// wake queued waiters using the new limit.
func (u *resourcePoolUsage[N]) onLimitUpdate() {
u.mu.Lock()
defer u.mu.Unlock()
u.tryWakeWaiters()
}

func (l *resourcePoolLimiter[N]) newLimitUsage(opts ...metric.RecordOption) *resourcePoolUsage[N] {
u := resourcePoolUsage[N]{
resourcePoolLimiter: l,
Expand Down Expand Up @@ -367,6 +379,9 @@ func newUnscopedResourcePoolLimiter[N Number](defaultLimit N) *unscopedResourceP
},
}
l.resourcePoolUsage = l.newLimitUsage()
l.setOnLimitUpdate(func(context.Context) {
l.resourcePoolUsage.onLimitUpdate()
})
return l
}

Expand Down Expand Up @@ -489,6 +504,15 @@ func newScopedResourcePoolLimiter[N Number](scope settings.Scope, key string, de
},
scope: scope,
}
l.setOnLimitUpdate(func(ctx context.Context) {
tenant := l.scope.Value(ctx)
if tenant == "" {
return
}
if usage, ok := l.used.Load(tenant); ok {
usage.(*resourcePoolUsage[N]).onLimitUpdate()
}
})
return l
}

Expand Down
53 changes: 53 additions & 0 deletions pkg/settings/limits/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"sync/atomic"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -453,6 +455,57 @@ func TestResourcePoolLimiter_BasicUsage(t *testing.T) {
assert.Equal(t, 5, avail)
}

// TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock verifies that a waiter
// is woken up when the limit is reduced to zero and then increased again.
func TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock(t *testing.T) {
t.Parallel()

var limit atomic.Int64
limit.Store(1)

limiter := newUnscopedResourcePoolLimiter(1)
limiter.getLimitFn = func(context.Context) (int, error) {
return int(limit.Load()), nil
}
go limiter.updateLoop(contexts.CRE{})
t.Cleanup(func() { assert.NoError(t, limiter.Close()) })

ctx := t.Context()

// Consume the single available resource to force the next waiter to enqueue.
freeFirst, err := limiter.Wait(ctx, 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there still a corner case here?:

we set the limit to zero
the node restarts
this wait blocks on ctx, which in core node is forever atm
we set the limit to 1
whatever else was waiting starts
the original is stuck forever

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiters don't only block on context, they block on both their context AND the queue channel. In this case it will get its channel unblocked when the limit is set to 1.

I don't believe there's a problem but if you disagree can you try proving it in a unit test (tell an LLM to do it)?

Copy link
Copy Markdown
Contributor

@krehermann krehermann Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as the Wait is happening in a different go routine than the set we are good; i assume that must be true in the core node. the test as written
~

Wait(ctx, foo)
Set(foo)

will block until ctx expiration in wait and never get to Set, but iiuc, that is simply test artifact

require.NoError(t, err)

enqueued := make(chan struct{}, 1)
limiter.resourcePoolUsage.setOnEnqueue(func() { enqueued <- struct{}{} })

waitErr := make(chan error, 1)
go func() {
_, err := limiter.Wait(t.Context(), 1)
waitErr <- err
}()

// Ensure the waiter is queued before mutating the limit.
<-enqueued

// Drop the limit to zero, then free the first resource. The queued waiter
// remains blocked because tryWakeWaiters sees a zero limit.
limit.Store(0)
freeFirst()

// Raise the limit again; the queued waiter should be woken by the update.
limit.Store(1)

select {
case err := <-waitErr:
require.NoError(t, err)
// release to avoid affecting subsequent waits
_ = limiter.Free(ctx, 1)
case <-time.After(pollPeriod * 3):
t.Fatal("waiter did not return after limit flap")
}
}

// setOnEnqueue sets a callback that is invoked each time a waiter is added to the queue.
// The callback is called with the mutex held. Used for testing to synchronize without sleeps.
func (u *resourcePoolUsage[N]) setOnEnqueue(fn func()) {
Expand Down
21 changes: 15 additions & 6 deletions pkg/settings/limits/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import (
// updater monitors limit updates via subscriptions or polling and reports them via recordLimit.
// If an updateLoop goroutine is spawned, then Close must be called.
type updater[N any] struct {
lggr logger.Logger
getLimitFn func(context.Context) (N, error)
subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional
recordLimit func(context.Context, N)
lggr logger.Logger
getLimitFn func(context.Context) (N, error)
subFn func(ctx context.Context) (<-chan settings.Update[N], func()) // optional
recordLimit func(context.Context, N)
onLimitUpdate func(context.Context)

creCh chan struct{}
cre atomic.Value
Expand Down Expand Up @@ -99,13 +100,21 @@ func (u *updater[N]) updateLoop(cre contexts.CRE) {
if err != nil {
u.lggr.Errorw("Failed to get limit. Using default value", "default", limit, "err", err)
}
u.recordLimit(contexts.WithCRE(ctx, cre), limit)
rcCtx := contexts.WithCRE(ctx, cre)
u.recordLimit(rcCtx, limit)
if u.onLimitUpdate != nil {
u.onLimitUpdate(rcCtx)
}

case update := <-updates:
if update.Err != nil {
u.lggr.Errorw("Failed to update limit. Using default value", "default", update.Value, "err", update.Err)
}
u.recordLimit(contexts.WithCRE(ctx, cre), update.Value)
rcCtx := contexts.WithCRE(ctx, cre)
u.recordLimit(rcCtx, update.Value)
if u.onLimitUpdate != nil {
u.onLimitUpdate(rcCtx)
}

case <-u.creCh:
cre = u.cre.Load().(contexts.CRE)
Expand Down
Loading