-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathratelimit.go
More file actions
140 lines (117 loc) · 3.52 KB
/
Copy pathratelimit.go
File metadata and controls
140 lines (117 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package inspect
import (
"context"
"sync"
"time"
"golang.org/x/time/rate"
)
// hostLimit pairs a per-host limiter with the time it was last accessed.
type hostLimit struct {
limiter *rate.Limiter
lastUsed time.Time
}
// RateLimiter provides per-host rate limiting for crawl requests. Each host
// gets its own token-bucket limiter so that aggressive crawling of one host
// does not throttle requests to others. Stale limiters are cleaned up
// periodically.
type RateLimiter struct {
limits map[string]*hostLimit
mu sync.Mutex
requestsPerSecond float64
burst int
cleanupInterval time.Duration
stopCleanup chan struct{}
}
// RateLimiterOption is a functional option for configuring a RateLimiter.
type RateLimiterOption func(*RateLimiter)
// WithCleanupInterval sets how often stale host limiters are reaped.
// A host is considered stale when it has not been used for 5 minutes.
// The default cleanup interval is 1 minute.
func WithCleanupInterval(d time.Duration) RateLimiterOption {
return func(rl *RateLimiter) {
rl.cleanupInterval = d
}
}
// NewRateLimiter creates a per-host rate limiter that allows rps requests per
// second with the given burst size. Each host that is seen gets its own
// independent limiter. Background cleanup removes limiters for hosts that
// have not been accessed in 5 minutes.
func NewRateLimiter(rps float64, burst int, opts ...RateLimiterOption) *RateLimiter {
rl := &RateLimiter{
limits: make(map[string]*hostLimit),
requestsPerSecond: rps,
burst: burst,
cleanupInterval: time.Minute,
stopCleanup: make(chan struct{}),
}
for _, opt := range opts {
opt(rl)
}
go rl.cleanupLoop()
return rl
}
// getOrCreate returns the limiter for host, creating one if it does not exist.
// Caller must hold rl.mu.
func (rl *RateLimiter) getOrCreate(host string) *hostLimit {
hl, ok := rl.limits[host]
if !ok {
hl = &hostLimit{
limiter: rate.NewLimiter(rate.Limit(rl.requestsPerSecond), rl.burst),
}
rl.limits[host] = hl
}
hl.lastUsed = time.Now()
return hl
}
// Wait blocks until a request for host is allowed or ctx is cancelled.
func (rl *RateLimiter) Wait(ctx context.Context, host string) error {
rl.mu.Lock()
hl := rl.getOrCreate(host)
rl.mu.Unlock()
return hl.limiter.Wait(ctx)
}
// Allow reports whether a request for host is allowed right now without
// blocking. Returns true if the request may proceed.
func (rl *RateLimiter) Allow(host string) bool {
rl.mu.Lock()
hl := rl.getOrCreate(host)
rl.mu.Unlock()
return hl.limiter.Allow()
}
// ActiveHosts returns the number of hosts currently being tracked.
func (rl *RateLimiter) ActiveHosts() int {
rl.mu.Lock()
defer rl.mu.Unlock()
return len(rl.limits)
}
// Close stops the background cleanup goroutine. After Close is called the
// limiter should not be used.
func (rl *RateLimiter) Close() {
close(rl.stopCleanup)
}
// cleanupLoop periodically removes limiters for hosts that have not been
// accessed in the stale threshold (5 minutes).
func (rl *RateLimiter) cleanupLoop() {
ticker := time.NewTicker(rl.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rl.reapStale()
case <-rl.stopCleanup:
return
}
}
}
const staleThreshold = 5 * time.Minute
// reapStale removes host limiters that have not been used recently.
func (rl *RateLimiter) reapStale() {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
for host, hl := range rl.limits {
if now.Sub(hl.lastUsed) > staleThreshold {
delete(rl.limits, host)
}
}
}