Skip to content

Commit 7e86732

Browse files
authored
fix(blocking): drop popped references so pointer T is reclaimable (#54)
* test(blocking): add failing tests for popped-element retention Get advances the backing slice with [1:] without zeroing the head slot, and Clear truncates with [:0] without zeroing slots. For pointer T the backing array still references popped elements, so they are not GC'd while the queue is alive. Two runtime-finalizer tests that fail on current main and demonstrate the retention. Refs #43 * fix(blocking): zero popped slots so the backing array drops references Get advanced the slice with elems[:1:] and Clear truncated with elems[:0]. Neither cleared the underlying array slots, so for pointer T the queue kept popped elements reachable until the slice reallocated. Long-lived queues over *T leaked memory. Zero the popped slot in get() and all slots in Clear(). Fixes #43 * fix: appease wsl_v5 and revive linters wsl_v5 wants a blank line between the var declaration and the zero assignment. revive's call-to-gc rule flags runtime.GC() in the finalizer-driven tests; the calls are intentional so silence them per-line.
1 parent 3437db5 commit 7e86732

2 files changed

Lines changed: 127 additions & 0 deletions

File tree

blocking.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,15 @@ func (bq *Blocking[T]) Clear() []T {
154154

155155
removed := make([]T, len(bq.elems))
156156
copy(removed, bq.elems)
157+
158+
// Drop references into the backing array so popped elements can be
159+
// GC'd while the queue outlives them. elems = elems[:0] alone keeps
160+
// the underlying slots populated.
161+
var zero T
162+
for i := range bq.elems {
163+
bq.elems[i] = zero
164+
}
165+
157166
bq.elems = bq.elems[:0]
158167

159168
return removed
@@ -274,6 +283,13 @@ func (bq *Blocking[T]) get() (v T, _ error) {
274283
}
275284

276285
elem := bq.elems[0]
286+
287+
// Zero the popped slot so the backing array no longer references the
288+
// popped element; otherwise pointer T leaks until the slice eventually
289+
// reallocates.
290+
var zero T
291+
292+
bq.elems[0] = zero
277293
bq.elems = bq.elems[1:]
278294

279295
bq.notFullCond.Signal()

blocking_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"reflect"
9+
"runtime"
910
"sort"
1011
"sync"
1112
"testing"
@@ -32,6 +33,116 @@ func TestBlocking(t *testing.T) {
3233
t.Run("CondWaitWithCapacity", testBlockingCondWaitWithCapacity)
3334
t.Run("MarshalJSON", testBlockingMarshalJSON)
3435
t.Run("NewDoesNotAliasCallerSlice", testBlockingNewDoesNotAliasCallerSlice)
36+
t.Run("GetReleasesReference", testBlockingGetReleasesReference)
37+
t.Run("ClearReleasesReferences", testBlockingClearReleasesReferences)
38+
}
39+
40+
func testBlockingGetReleasesReference(t *testing.T) {
41+
t.Parallel()
42+
43+
type payload struct{ id int }
44+
45+
var bq *queue.Blocking[*payload]
46+
47+
finalized := make(chan struct{}, 1)
48+
49+
// Create the payload in a nested scope so p/got don't keep it alive
50+
// after the offer/get cycle.
51+
func() {
52+
p := &payload{id: 42}
53+
runtime.SetFinalizer(p, func(*payload) {
54+
finalized <- struct{}{}
55+
})
56+
57+
bq = queue.NewBlocking[*payload](nil)
58+
59+
if err := bq.Offer(p); err != nil {
60+
t.Fatalf("offer: %v", err)
61+
}
62+
63+
got, err := bq.Get()
64+
if err != nil {
65+
t.Fatalf("get: %v", err)
66+
}
67+
68+
if got != p {
69+
t.Fatalf("unexpected element %v", got)
70+
}
71+
}()
72+
73+
// Queue is still live. After a successful Get, the queue must not
74+
// retain the popped element. Nudge GC until the finalizer fires or
75+
// we give up.
76+
deadline := time.After(time.Second)
77+
78+
for {
79+
runtime.GC() //nolint:revive // explicit GC needed to drive finalizer
80+
81+
select {
82+
case <-finalized:
83+
runtime.KeepAlive(bq)
84+
return
85+
case <-deadline:
86+
runtime.KeepAlive(bq)
87+
t.Fatal("popped element not finalized; backing array still holds it")
88+
default:
89+
time.Sleep(10 * time.Millisecond)
90+
}
91+
}
92+
}
93+
94+
func testBlockingClearReleasesReferences(t *testing.T) {
95+
t.Parallel()
96+
97+
type payload struct{ id int }
98+
99+
var bq *queue.Blocking[*payload]
100+
101+
finalized := make(chan struct{}, 3)
102+
103+
func() {
104+
items := []*payload{{id: 1}, {id: 2}, {id: 3}}
105+
106+
for _, it := range items {
107+
runtime.SetFinalizer(it, func(*payload) {
108+
finalized <- struct{}{}
109+
})
110+
}
111+
112+
bq = queue.NewBlocking[*payload](nil)
113+
114+
for _, it := range items {
115+
if err := bq.Offer(it); err != nil {
116+
t.Fatalf("offer: %v", err)
117+
}
118+
}
119+
120+
cleared := bq.Clear()
121+
if len(cleared) != len(items) {
122+
t.Fatalf("expected %d cleared, got %d", len(items), len(cleared))
123+
}
124+
}()
125+
126+
// All items should be eligible for finalization now — the queue is
127+
// empty and the temporary `cleared` slice has gone out of scope.
128+
deadline := time.After(time.Second)
129+
130+
count := 0
131+
for count < 3 {
132+
runtime.GC() //nolint:revive // explicit GC needed to drive finalizer
133+
134+
select {
135+
case <-finalized:
136+
count++
137+
case <-deadline:
138+
runtime.KeepAlive(bq)
139+
t.Fatalf("only %d/3 payloads finalized; Clear kept references", count)
140+
default:
141+
time.Sleep(10 * time.Millisecond)
142+
}
143+
}
144+
145+
runtime.KeepAlive(bq)
35146
}
36147

37148
func testBlockingNewDoesNotAliasCallerSlice(t *testing.T) {

0 commit comments

Comments
 (0)