Skip to content

Commit 49889c4

Browse files
authored
fix(blocking): wake producers waiting on capacity after Reset (#49)
* test(blocking): add failing test for Reset not waking OfferWait Asserts that OfferWait returns after Reset frees capacity. Fails on current main because Reset only broadcasts notEmptyCond, leaving producers stuck on notFullCond.Wait(). Refs #39 * fix(blocking): broadcast notFullCond on Reset Reset can free capacity slots when it restores the initial (smaller) element set. Without broadcasting notFullCond, any goroutine blocked in OfferWait stays stuck until another consumer runs. Fixes #39
1 parent aabd29a commit 49889c4

2 files changed

Lines changed: 41 additions & 0 deletions

File tree

blocking.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func (bq *Blocking[T]) Reset() {
107107
copy(bq.elems, bq.initialElems)
108108

109109
bq.notEmptyCond.Broadcast()
110+
bq.notFullCond.Broadcast()
110111
}
111112

112113
// ===================================Removal==================================

blocking_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,46 @@ func testBlockingReset(t *testing.T) {
311311
t.Fatalf("expected elem to be %d, got %d", 5, e)
312312
}
313313
})
314+
315+
t.Run("UnblocksProducersWaitingOnCapacity", func(t *testing.T) {
316+
t.Parallel()
317+
318+
// Start with fewer elements than capacity so Reset frees a slot.
319+
blockingQueue := queue.NewBlocking(
320+
[]int{1, 2},
321+
queue.WithCapacity(3),
322+
)
323+
324+
if err := blockingQueue.Offer(3); err != nil {
325+
t.Fatalf("unexpected err filling queue: %v", err)
326+
}
327+
328+
offerReturned := make(chan struct{})
329+
330+
go func() {
331+
blockingQueue.OfferWait(4)
332+
close(offerReturned)
333+
}()
334+
335+
// Give the producer a moment to enter notFullCond.Wait().
336+
time.Sleep(10 * time.Millisecond)
337+
338+
select {
339+
case <-offerReturned:
340+
t.Fatal("OfferWait returned before Reset; queue was full")
341+
default:
342+
}
343+
344+
// Reset shrinks back to the initial two elements, leaving a free slot.
345+
// Without broadcasting notFullCond, the producer never wakes.
346+
blockingQueue.Reset()
347+
348+
select {
349+
case <-offerReturned:
350+
case <-time.After(time.Second):
351+
t.Fatal("OfferWait was not unblocked by Reset")
352+
}
353+
})
314354
}
315355

316356
func testBlockingOfferWait(t *testing.T) {

0 commit comments

Comments
 (0)