Skip to content

Commit e5c8361

Browse files
committed
fix: Fix race condition on calling Send from multiple scheduler goroutines
1 parent b7188f1 commit e5c8361

1 file changed

Lines changed: 22 additions & 21 deletions

File tree

scheduler/batchsender/batch_sender.go

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,20 @@ const (
1919
// - If the current batch has reached the batch size, it will be sent immediately
2020
// - Otherwise, a timer will be started to send the current batch after the batch timeout
2121
type BatchSender struct {
22-
sendFn func(any)
23-
items []any
24-
timer *time.Timer
25-
itemsLock sync.Mutex
22+
sendFn func(any)
23+
items []any
24+
timer *time.Timer
25+
mu sync.Mutex
2626
}
2727

2828
func NewBatchSender(sendFn func(any)) *BatchSender {
2929
return &BatchSender{sendFn: sendFn}
3030
}
3131

3232
func (bs *BatchSender) Send(item any) {
33+
bs.mu.Lock()
34+
defer bs.mu.Unlock()
35+
3336
if bs.timer != nil {
3437
bs.timer.Stop()
3538
}
@@ -39,34 +42,29 @@ func (bs *BatchSender) Send(item any) {
3942
// If item is already a slice, send it directly
4043
// together with the current batch
4144
if len(items) > 1 {
42-
bs.flush(items...)
45+
bs.flushLocked(items...)
4346
return
4447
}
4548

4649
// Otherwise, add item to the current batch
47-
bs.appendToBatch(items...)
50+
bs.items = append(bs.items, items...)
4851

4952
// If the current batch has reached the batch size, send it
5053
if len(bs.items) >= batchSize {
51-
bs.flush()
54+
bs.flushLocked()
5255
return
5356
}
5457

5558
// Otherwise, start a timer to send the current batch after the batch timeout
56-
bs.timer = time.AfterFunc(batchTimeout, func() { bs.flush() })
59+
bs.timer = time.AfterFunc(batchTimeout, func() {
60+
bs.mu.Lock()
61+
defer bs.mu.Unlock()
62+
bs.flushLocked()
63+
})
5764
}
5865

59-
func (bs *BatchSender) appendToBatch(items ...any) {
60-
bs.itemsLock.Lock()
61-
defer bs.itemsLock.Unlock()
62-
63-
bs.items = append(bs.items, items...)
64-
}
65-
66-
func (bs *BatchSender) flush(items ...any) {
67-
bs.itemsLock.Lock()
68-
defer bs.itemsLock.Unlock()
69-
66+
// flushLocked sends all buffered items. Must be called with bs.mu held.
67+
func (bs *BatchSender) flushLocked(items ...any) {
7068
bs.items = append(bs.items, items...)
7169

7270
if len(bs.items) == 0 {
@@ -78,8 +76,11 @@ func (bs *BatchSender) flush(items ...any) {
7876
}
7977

8078
func (bs *BatchSender) Close() {
79+
bs.mu.Lock()
80+
defer bs.mu.Unlock()
81+
8182
if bs.timer != nil {
8283
bs.timer.Stop()
8384
}
84-
bs.flush()
85-
}
85+
bs.flushLocked()
86+
}

0 commit comments

Comments
 (0)