Skip to content

Commit 4056ea7

Browse files
author
Daniel Ferstay
committed
Remove races reading and updating the default router state
Previously, we used atomic operations to read and update parts of the default router state. Unfortunately, the reads and updates could race under concurrent calls which leads to unnecessary clock reads and an associated slowdown in performance. Now, we use atomic addition to increment the message count and batch size. This removes the race condition by ensuring that each go-routine will have a unique messageCount, and hence only one will perform the clock read. Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
1 parent d5d4903 commit 4056ea7

1 file changed

Lines changed: 14 additions & 20 deletions

File tree

pulsar/default_router.go

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package pulsar
1919

2020
import (
21-
"math"
2221
"math/rand"
2322
"sync/atomic"
2423
"time"
@@ -27,7 +26,7 @@ import (
2726
type defaultRouter struct {
2827
currentPartitionCursor uint32
2928

30-
lastChangeTimestamp int64
29+
lastBatchTimestamp int64
3130
msgCounter uint32
3231
cumulativeBatchSize uint32
3332
}
@@ -45,7 +44,7 @@ func NewDefaultRouter(
4544
disableBatching bool) func(*ProducerMessage, uint32) int {
4645
state := &defaultRouter{
4746
currentPartitionCursor: rand.Uint32(),
48-
lastChangeTimestamp: math.MinInt64,
47+
lastBatchTimestamp: time.Now().UnixNano(),
4948
}
5049

5150
readClockAfterNumMessages := uint32(maxBatchingMessages / 10)
@@ -80,32 +79,27 @@ func NewDefaultRouter(
8079
// spread the data on different partitions but not necessarily in a specific sequence.
8180
var now int64
8281
size := uint32(len(message.Payload))
83-
previousMessageCount := atomic.LoadUint32(&state.msgCounter)
84-
previousBatchingMaxSize := atomic.LoadUint32(&state.cumulativeBatchSize)
85-
previousLastChange := atomic.LoadInt64(&state.lastChangeTimestamp)
82+
messageCount := atomic.AddUint32(&state.msgCounter, 1)
83+
batchSize := atomic.AddUint32(&state.cumulativeBatchSize, size)
8684

87-
messageCountReached := previousMessageCount >= uint32(maxBatchingMessages-1)
88-
sizeReached := (size >= uint32(maxBatchingSize)-previousBatchingMaxSize)
85+
messageCountReached := messageCount%uint32(maxBatchingMessages) == 0
86+
sizeReached := (batchSize >= uint32(maxBatchingSize))
8987
durationReached := false
90-
if readClockAfterNumMessages == 0 || previousMessageCount%readClockAfterNumMessages == 0 {
88+
if readClockAfterNumMessages == 0 || messageCount%readClockAfterNumMessages == 0 {
9189
now = time.Now().UnixNano()
92-
durationReached = now-previousLastChange >= maxBatchingDelay.Nanoseconds()
90+
previousBatch := atomic.LoadInt64(&state.lastBatchTimestamp)
91+
durationReached = now-previousBatch >= maxBatchingDelay.Nanoseconds()
9392
}
9493
if messageCountReached || sizeReached || durationReached {
95-
atomic.AddUint32(&state.currentPartitionCursor, 1)
96-
atomic.StoreUint32(&state.msgCounter, 0)
94+
cursor := atomic.AddUint32(&state.currentPartitionCursor, 1)
9795
atomic.StoreUint32(&state.cumulativeBatchSize, 0)
98-
if now != 0 {
99-
atomic.StoreInt64(&state.lastChangeTimestamp, now)
96+
if now == 0 {
97+
now = time.Now().UnixNano()
10098
}
101-
return int(state.currentPartitionCursor % numPartitions)
99+
atomic.StoreInt64(&state.lastBatchTimestamp, now)
100+
return int(cursor % numPartitions)
102101
}
103102

104-
atomic.AddUint32(&state.msgCounter, 1)
105-
atomic.AddUint32(&state.cumulativeBatchSize, size)
106-
if now != 0 {
107-
atomic.StoreInt64(&state.lastChangeTimestamp, now)
108-
}
109103
return int(state.currentPartitionCursor % numPartitions)
110104
}
111105
}

0 commit comments

Comments
 (0)