Skip to content

Commit d5d4903

Browse files
dferstayDaniel Ferstay
andauthored
Provide lock-free access to partition consumer compression providers (#689)
The compression providers map in the partition consumer is a textbook case for using go's lock-free sync.Map: the set of map entries is stable and access is read-only. On machines with 4 cores or greater, read contention on the sync.RWMutex outweighs the cost of using a sync.Map. Below is an old article on the subject, but it still holds true today: https://medium.com/@deckarep/the-new-kid-in-town-gos-sync-map-de24a6bf7c2c Signed-off-by: Daniel Ferstay <dferstay@splunk.com> Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
1 parent 310d480 commit d5d4903

2 files changed

Lines changed: 30 additions & 21 deletions

File tree

pulsar/consumer_partition.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,7 @@ type partitionConsumer struct {
144144

145145
log log.Logger
146146

147-
providersMutex sync.RWMutex
148-
compressionProviders map[pb.CompressionType]compression.Provider
147+
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
149148
metrics *internal.LeveledMetrics
150149
decryptor cryptointernal.Decryptor
151150
}
@@ -171,7 +170,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
171170
closeCh: make(chan struct{}),
172171
clearQueueCh: make(chan func(id trackingMessageID)),
173172
clearMessageQueuesCh: make(chan chan struct{}),
174-
compressionProviders: make(map[pb.CompressionType]compression.Provider),
173+
compressionProviders: sync.Map{},
175174
dlq: dlq,
176175
metrics: metrics,
177176
}
@@ -967,11 +966,15 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
967966
pc.log.Info("Closed consumer")
968967
}
969968

970-
pc.providersMutex.Lock()
971-
for _, provider := range pc.compressionProviders {
972-
provider.Close()
973-
}
974-
pc.providersMutex.Unlock()
969+
pc.compressionProviders.Range(func(_, v interface{}) bool {
970+
if provider, ok := v.(compression.Provider); ok {
971+
provider.Close()
972+
} else {
973+
err := fmt.Errorf("unexpected compression provider type: %T", v)
974+
pc.log.WithError(err).Warn("Failed to close compression provider")
975+
}
976+
return true
977+
})
975978

976979
pc.setConsumerState(consumerClosed)
977980
pc._getConn().DeleteConsumeHandler(pc.consumerID)
@@ -1192,19 +1195,26 @@ func getPreviousMessage(mid trackingMessageID) trackingMessageID {
11921195
}
11931196

11941197
func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload internal.Buffer) (internal.Buffer, error) {
1195-
pc.providersMutex.RLock()
1196-
provider, ok := pc.compressionProviders[msgMeta.GetCompression()]
1197-
pc.providersMutex.RUnlock()
1198+
providerEntry, ok := pc.compressionProviders.Load(msgMeta.GetCompression())
11981199
if !ok {
1199-
var err error
1200-
if provider, err = pc.initializeCompressionProvider(msgMeta.GetCompression()); err != nil {
1200+
newProvider, err := pc.initializeCompressionProvider(msgMeta.GetCompression())
1201+
if err != nil {
12011202
pc.log.WithError(err).Error("Failed to decompress message.")
12021203
return nil, err
12031204
}
12041205

1205-
pc.providersMutex.Lock()
1206-
pc.compressionProviders[msgMeta.GetCompression()] = provider
1207-
pc.providersMutex.Unlock()
1206+
var loaded bool
1207+
providerEntry, loaded = pc.compressionProviders.LoadOrStore(msgMeta.GetCompression(), newProvider)
1208+
if loaded {
1209+
// another thread already loaded this provider, so close the one we just initialized
1210+
newProvider.Close()
1211+
}
1212+
}
1213+
provider, ok := providerEntry.(compression.Provider)
1214+
if !ok {
1215+
err := fmt.Errorf("unexpected compression provider type: %T", providerEntry)
1216+
pc.log.WithError(err).Error("Failed to decompress message.")
1217+
return nil, err
12081218
}
12091219

12101220
uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))

pulsar/consumer_partition_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
package pulsar
1919

2020
import (
21+
"sync"
2122
"testing"
2223

23-
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
2424
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
25-
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
2625

2726
"github.com/stretchr/testify/assert"
2827

@@ -34,7 +33,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
3433
pc := partitionConsumer{
3534
queueCh: make(chan []*message, 1),
3635
eventsCh: eventsCh,
37-
compressionProviders: make(map[pb.CompressionType]compression.Provider),
36+
compressionProviders: sync.Map{},
3837
options: &partitionConsumerOpts{},
3938
metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"),
4039
decryptor: crypto.NewNoopDecryptor(),
@@ -66,7 +65,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
6665
pc := partitionConsumer{
6766
queueCh: make(chan []*message, 1),
6867
eventsCh: eventsCh,
69-
compressionProviders: make(map[pb.CompressionType]compression.Provider),
68+
compressionProviders: sync.Map{},
7069
options: &partitionConsumerOpts{},
7170
metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"),
7271
decryptor: crypto.NewNoopDecryptor(),
@@ -98,7 +97,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
9897
pc := partitionConsumer{
9998
queueCh: make(chan []*message, 1),
10099
eventsCh: eventsCh,
101-
compressionProviders: make(map[pb.CompressionType]compression.Provider),
100+
compressionProviders: sync.Map{},
102101
options: &partitionConsumerOpts{},
103102
metrics: internal.NewMetricsProvider(4, map[string]string{}).GetLeveledMetrics("topic"),
104103
decryptor: crypto.NewNoopDecryptor(),

0 commit comments

Comments
 (0)