-
Notifications
You must be signed in to change notification settings - Fork 29
[CRE][Limits] Handle limit flip to zero and back correctly #1762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,8 @@ import ( | |
| "testing" | ||
| "time" | ||
|
|
||
| "sync/atomic" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "go.opentelemetry.io/otel/attribute" | ||
|
|
@@ -453,6 +455,57 @@ func TestResourcePoolLimiter_BasicUsage(t *testing.T) { | |
| assert.Equal(t, 5, avail) | ||
| } | ||
|
|
||
| // TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock verifies that a waiter | ||
| // is woken up when the limit is reduced to zero and then increased again. | ||
| func TestResourcePoolLimiter_LimitFlapToZeroDoesNotDeadlock(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| var limit atomic.Int64 | ||
| limit.Store(1) | ||
|
|
||
| limiter := newUnscopedResourcePoolLimiter(1) | ||
| limiter.getLimitFn = func(context.Context) (int, error) { | ||
| return int(limit.Load()), nil | ||
| } | ||
| go limiter.updateLoop(contexts.CRE{}) | ||
| t.Cleanup(func() { assert.NoError(t, limiter.Close()) }) | ||
|
|
||
| ctx := t.Context() | ||
|
|
||
| // Consume the single available resource to force the next waiter to enqueue. | ||
| freeFirst, err := limiter.Wait(ctx, 1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there still a corner case here?: we set the limit to zero
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Waiters don't only block on context, they block on both their context AND the queue channel. In this case it will get its channel unblocked when the limit is set to 1. I don't believe there's a problem but if you disagree can you try proving it in a unit test (tell an LLM to do it)?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as long as the Wait is happening in a different go routine than the set we are good; i assume that must be true in the core node. the test as written will block until ctx expiration in wait and never get to Set, but iiuc, that is simply test artifact |
||
| require.NoError(t, err) | ||
|
|
||
| enqueued := make(chan struct{}, 1) | ||
| limiter.resourcePoolUsage.setOnEnqueue(func() { enqueued <- struct{}{} }) | ||
|
|
||
| waitErr := make(chan error, 1) | ||
| go func() { | ||
| _, err := limiter.Wait(t.Context(), 1) | ||
| waitErr <- err | ||
| }() | ||
|
|
||
| // Ensure the waiter is queued before mutating the limit. | ||
| <-enqueued | ||
|
|
||
| // Drop the limit to zero, then free the first resource. The queued waiter | ||
| // remains blocked because tryWakeWaiters sees a zero limit. | ||
| limit.Store(0) | ||
| freeFirst() | ||
|
|
||
| // Raise the limit again; the queued waiter should be woken by the update. | ||
| limit.Store(1) | ||
|
|
||
| select { | ||
| case err := <-waitErr: | ||
| require.NoError(t, err) | ||
| // release to avoid affecting subsequent waits | ||
| _ = limiter.Free(ctx, 1) | ||
| case <-time.After(pollPeriod * 3): | ||
| t.Fatal("waiter did not return after limit flap") | ||
| } | ||
| } | ||
|
|
||
| // setOnEnqueue sets a callback that is invoked each time a waiter is added to the queue. | ||
| // The callback is called with the mutex held. Used for testing to synchronize without sleeps. | ||
| func (u *resourcePoolUsage[N]) setOnEnqueue(fn func()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need a write lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's internal and only used by constructors.