File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ package batchsender
2+
3+ import (
4+ "sync"
5+ "testing"
6+ "time"
7+ )
8+
9+ // This test verifies there is no data race between Send() and the timer-triggered flush.
10+ func TestSend_ConcurrentWithTimerFlush (t * testing.T ) {
11+ // The race occurs when:
12+ // 1. a Send() call schedules a timer via time.AfterFunc
13+ // 2. the timer fires and calls flush() on a separate goroutine
14+ // 3. another Send() reads bs.items concurrently.
15+ //
16+ // To trigger this, we send items from multiple goroutines with delays around batchTimeout so the timer fires between Sends.
17+ var mu sync.Mutex
18+ var received []any
19+
20+ const numGoroutines = 5
21+ const sendsPerGoroutine = 20
22+
23+ bs := NewBatchSender (func (items any ) {
24+ mu .Lock ()
25+ defer mu .Unlock ()
26+ received = append (received , items )
27+ })
28+
29+ var wg sync.WaitGroup
30+ wg .Add (numGoroutines )
31+ for range numGoroutines {
32+ go func () {
33+ defer wg .Done ()
34+ for range sendsPerGoroutine {
35+ bs .Send ("item" )
36+ time .Sleep (batchTimeout + 10 * time .Millisecond )
37+ }
38+ }()
39+ }
40+
41+ wg .Wait ()
42+ bs .Close ()
43+ }
You can’t perform that action at this time.
0 commit comments