Skip to content

Commit 6a8e7f3

Browse files
authored
[issue #752] replace go-rate rate limiter with a buffered channel implementation (#799)
* replace go-rate rate limiter with channel implementation * fix linter * update based on review comments * stop ratelimiter goroutine if the rate is unthrottled
1 parent ea6eccf commit 6a8e7f3

3 files changed

Lines changed: 13 additions & 13 deletions

File tree

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ require (
66
github.com/AthenZ/athenz v1.10.39
77
github.com/DataDog/zstd v1.5.0
88
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220120090717-25e59572242e
9-
github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0
109
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
1110
github.com/davecgh/go-spew v1.1.1
1211
github.com/gogo/protobuf v1.3.2

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
6060
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
6161
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
6262
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
63-
github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 h1:0b2vaepXIfMsG++IsjHiI2p4bxALD1Y2nQKGMR5zDQM=
64-
github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
6563
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
6664
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
6765
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

perf/perf-producer.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"encoding/json"
2323
"time"
2424

25-
"github.com/beefsack/go-rate"
2625
"github.com/bmizerany/perks/quantile"
2726
"github.com/spf13/cobra"
2827

@@ -101,25 +100,29 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
101100
payload := make([]byte, produceArgs.MessageSize)
102101

103102
ch := make(chan float64)
104-
105-
go func(stopCh <-chan struct{}) {
106-
var rateLimiter *rate.RateLimiter
107-
if produceArgs.Rate > 0 {
108-
rateLimiter = rate.New(produceArgs.Rate, time.Second)
103+
rateLimitCh := make(chan time.Time, produceArgs.Rate)
104+
go func(rateLimit int, interval time.Duration) {
105+
if rateLimit <= 0 { // 0 as no limit enforced
106+
return
107+
}
108+
for {
109+
oldest := <-rateLimitCh
110+
time.Sleep(interval - time.Since(oldest))
109111
}
112+
}(produceArgs.Rate, time.Second)
110113

114+
go func(stopCh <-chan struct{}) {
111115
for {
112116
select {
113117
case <-stopCh:
114118
return
115119
default:
116120
}
117121

118-
if rateLimiter != nil {
119-
rateLimiter.Wait()
120-
}
121-
122122
start := time.Now()
123+
if produceArgs.Rate > 0 {
124+
rateLimitCh <- start
125+
}
123126

124127
producer.SendAsync(ctx, &pulsar.ProducerMessage{
125128
Payload: payload,

0 commit comments

Comments
 (0)