-
Notifications
You must be signed in to change notification settings - Fork 25
Expand file tree
/
Copy pathbatch_sender.go
More file actions
86 lines (69 loc) · 1.8 KB
/
batch_sender.go
File metadata and controls
86 lines (69 loc) · 1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package batchsender
import (
"sync"
"time"
"github.com/cloudquery/plugin-sdk/v4/helpers"
)
const (
batchSize = 100
batchTimeout = 100 * time.Millisecond
)
// BatchSender is a helper struct that batches items and sends them in batches of batchSize or after batchTimeout.
//
// - If item is already a slice, it will be sent directly
// - Otherwise, it will be added to the current batch
// - If the current batch has reached the batch size, it will be sent immediately
// - Otherwise, a timer will be started to send the current batch after the batch timeout
type BatchSender struct {
sendFn func(any)
items []any
timer *time.Timer
mu sync.Mutex
}
func NewBatchSender(sendFn func(any)) *BatchSender {
return &BatchSender{sendFn: sendFn}
}
func (bs *BatchSender) Send(item any) {
bs.mu.Lock()
defer bs.mu.Unlock()
if bs.timer != nil {
bs.timer.Stop()
}
items := helpers.InterfaceSlice(item)
// If item is already a slice, send it directly
// together with the current batch
if len(items) > 1 {
bs.flushLocked(items...)
return
}
// Otherwise, add item to the current batch
bs.items = append(bs.items, items...)
// If the current batch has reached the batch size, send it
if len(bs.items) >= batchSize {
bs.flushLocked()
return
}
// Otherwise, start a timer to send the current batch after the batch timeout
bs.timer = time.AfterFunc(batchTimeout, func() {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.flushLocked()
})
}
// flushLocked sends all buffered items. Must be called with bs.mu held.
func (bs *BatchSender) flushLocked(items ...any) {
bs.items = append(bs.items, items...)
if len(bs.items) == 0 {
return
}
bs.sendFn(bs.items)
bs.items = nil
}
func (bs *BatchSender) Close() {
bs.mu.Lock()
defer bs.mu.Unlock()
if bs.timer != nil {
bs.timer.Stop()
}
bs.flushLocked()
}