@@ -26,6 +26,7 @@ import (
2626 "math"
2727 "sync"
2828 "time"
29+ stdunsafe "unsafe"
2930
3031 "github.com/m3db/m3/src/msg/producer"
3132 "github.com/m3db/m3/src/msg/protocol/proto"
@@ -35,6 +36,7 @@ import (
3536 "github.com/m3db/m3/src/x/unsafe"
3637
3738 "github.com/uber-go/tally"
39+ "go.uber.org/atomic"
3840)
3941
4042// MessageRetryNanosFn returns the message backoff time for retry in nanoseconds.
@@ -179,7 +181,7 @@ type messageWriter struct {
179181 doneCh chan struct {}
180182 wg sync.WaitGroup
181183 // metrics can be updated when a consumer instance changes, so must be guarded with RLock
182- m * messageWriterMetrics
184+ metrics atomic. UnsafePointer // *messageWriterMetrics
183185 nextFullScan time.Time
184186 lastNewWrite * list.Element
185187
@@ -196,7 +198,7 @@ func newMessageWriter(
196198 opts = NewOptions ()
197199 }
198200 nowFn := time .Now
199- return & messageWriter {
201+ mw := & messageWriter {
200202 replicatedShardID : replicatedShardID ,
201203 mPool : mPool ,
202204 opts : opts ,
@@ -211,9 +213,10 @@ func newMessageWriter(
211213 msgsToWrite : make ([]* message , 0 , opts .MessageQueueScanBatchSize ()),
212214 isClosed : false ,
213215 doneCh : make (chan struct {}),
214- m : m ,
215216 nowFn : nowFn ,
216217 }
218+ mw .metrics .Store (stdunsafe .Pointer (m ))
219+ return mw
217220}
218221
219222// Write writes a message, messages not acknowledged in time will be retried.
@@ -222,9 +225,10 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) {
222225 var (
223226 nowNanos = w .nowFn ().UnixNano ()
224227 msg = w .newMessage ()
228+ metrics = w .Metrics ()
225229 )
226230 w .Lock ()
227- if ! w .isValidWriteWithLock (nowNanos ) {
231+ if ! w .isValidWriteWithLock (nowNanos , metrics ) {
228232 w .Unlock ()
229233 w .close (msg )
230234 return
@@ -240,7 +244,7 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) {
240244 msg .Set (meta , rm , nowNanos )
241245 w .acks .add (meta , msg )
242246 // Make sure all the new writes are ordered in queue.
243- w . m .enqueuedMessages .Inc (1 )
247+ metrics .enqueuedMessages .Inc (1 )
244248 if w .lastNewWrite != nil {
245249 w .lastNewWrite = w .queue .InsertAfter (msg , w .lastNewWrite )
246250 } else {
@@ -249,17 +253,17 @@ func (w *messageWriter) Write(rm *producer.RefCountedMessage) {
249253 w .Unlock ()
250254}
251255
252- func (w * messageWriter ) isValidWriteWithLock (nowNanos int64 ) bool {
256+ func (w * messageWriter ) isValidWriteWithLock (nowNanos int64 , metrics * messageWriterMetrics ) bool {
253257 if w .opts .IgnoreCutoffCutover () {
254258 return true
255259 }
256260
257261 if w .cutOffNanos > 0 && nowNanos >= w .cutOffNanos {
258- w . m .writeAfterCutoff .Inc (1 )
262+ metrics .writeAfterCutoff .Inc (1 )
259263 return false
260264 }
261265 if w .cutOverNanos > 0 && nowNanos < w .cutOverNanos {
262- w . m .writeBeforeCutover .Inc (1 )
266+ metrics .writeBeforeCutover .Inc (1 )
263267 return false
264268 }
265269
@@ -318,12 +322,10 @@ func (w *messageWriter) write(
318322
319323// Ack acknowledges the metadata.
320324func (w * messageWriter ) Ack (meta metadata ) bool {
321- acked , expectedProcessNanos := w .acks .ack (meta )
322- if acked {
323- w .RLock ()
324- defer w .RUnlock ()
325- w .m .messageConsumeLatency .Record (time .Duration (w .nowFn ().UnixNano () - expectedProcessNanos ))
326- w .m .messageAcked .Inc (1 )
325+ if acked , expectedProcessNanos := w .acks .ack (meta ); acked {
326+ m := w .Metrics ()
327+ m .messageConsumeLatency .Record (time .Duration (w .nowFn ().UnixNano () - expectedProcessNanos ))
328+ m .messageAcked .Inc (1 )
327329 return true
328330 }
329331 return false
@@ -367,8 +369,8 @@ func (w *messageWriter) scanMessageQueue() {
367369 e := w .queue .Front ()
368370 w .lastNewWrite = nil
369371 isClosed := w .isClosed
370- m := w .m
371372 w .RUnlock ()
373+
372374 var (
373375 nowFn = w .nowFn
374376 msgsToWrite []* message
@@ -378,6 +380,7 @@ func (w *messageWriter) scanMessageQueue() {
378380 consumerWriters []consumerWriter
379381 iterationIndexes []int
380382 fullScan = isClosed || beforeScan .After (w .nextFullScan )
383+ m = w .Metrics ()
381384 scanMetrics scanBatchMetrics
382385 skipWrites bool
383386 )
@@ -452,6 +455,7 @@ func (w *messageWriter) scanBatchWithLock(
452455 iterated int
453456 next * list.Element
454457 )
458+ metrics := w .Metrics ()
455459 w .msgsToWrite = w .msgsToWrite [:0 ]
456460 for e := start ; e != nil ; e = next {
457461 iterated ++
@@ -469,7 +473,7 @@ func (w *messageWriter) scanBatchWithLock(
469473 // do not stay in memory forever.
470474 // NB: The message must be added to the ack map to be acked here.
471475 w .acks .ack (m .Metadata ())
472- w .removeFromQueueWithLock (e , m )
476+ w .removeFromQueueWithLock (e , m , metrics )
473477 scanMetrics [_messageClosed ]++
474478 continue
475479 }
@@ -491,12 +495,12 @@ func (w *messageWriter) scanBatchWithLock(
491495 if acked , _ := w .acks .ack (m .Metadata ()); acked {
492496 scanMetrics [_messageDroppedTTLExpire ]++
493497 }
494- w .removeFromQueueWithLock (e , m )
498+ w .removeFromQueueWithLock (e , m , metrics )
495499 continue
496500 }
497501 if m .IsAcked () {
498502 scanMetrics [_processedAck ]++
499- w .removeFromQueueWithLock (e , m )
503+ w .removeFromQueueWithLock (e , m , metrics )
500504 continue
501505 }
502506 if m .IsDroppedOrConsumed () {
@@ -509,7 +513,7 @@ func (w *messageWriter) scanBatchWithLock(
509513 continue
510514 }
511515 w .acks .remove (m .Metadata ())
512- w .removeFromQueueWithLock (e , m )
516+ w .removeFromQueueWithLock (e , m , metrics )
513517 scanMetrics [_messageDroppedBufferFull ]++
514518 continue
515519 }
@@ -644,18 +648,14 @@ func (w *messageWriter) RemoveConsumerWriter(addr string) {
644648
645649// Metrics returns the metrics. These are dynamic and change if downstream consumer instance changes.
646650func (w * messageWriter ) Metrics () * messageWriterMetrics {
647- w .RLock ()
648- defer w .RUnlock ()
649- return w .m
651+ return (* messageWriterMetrics )(w .metrics .Load ())
650652}
651653
652654// SetMetrics sets the metrics
653655//
654656// This allows changing the labels of the metrics when the downstream consumer instance changes.
655657func (w * messageWriter ) SetMetrics (m * messageWriterMetrics ) {
656- w .Lock ()
657- w .m = m
658- w .Unlock ()
658+ w .metrics .Store (stdunsafe .Pointer (m ))
659659}
660660
661661// QueueSize returns the number of messages queued in the writer.
@@ -667,9 +667,9 @@ func (w *messageWriter) newMessage() *message {
667667 return w .mPool .Get ()
668668}
669669
670- func (w * messageWriter ) removeFromQueueWithLock (e * list.Element , m * message ) {
670+ func (w * messageWriter ) removeFromQueueWithLock (e * list.Element , m * message , metrics * messageWriterMetrics ) {
671671 w .queue .Remove (e )
672- w . m .dequeuedMessages .Inc (1 )
672+ metrics .dequeuedMessages .Inc (1 )
673673 w .close (m )
674674}
675675
@@ -679,51 +679,53 @@ func (w *messageWriter) close(m *message) {
679679}
680680
681681type acks struct {
682- sync.Mutex
683-
684- ackMap map [metadataKey ]* message
682+ mtx sync.Mutex
683+ acks map [uint64 ]* message
685684}
686685
687686// nolint: unparam
688687func newAckHelper (size int ) * acks {
689688 return & acks {
690- ackMap : make (map [metadataKey ]* message , size ),
689+ acks : make (map [uint64 ]* message , size ),
691690 }
692691}
693692
694693func (a * acks ) add (meta metadata , m * message ) {
695- a .Lock ()
696- a .ackMap [meta .metadataKey ] = m
697- a .Unlock ()
694+ a .mtx . Lock ()
695+ a .acks [meta .metadataKey . id ] = m
696+ a .mtx . Unlock ()
698697}
699698
700699func (a * acks ) remove (meta metadata ) {
701- a .Lock ()
702- delete (a .ackMap , meta .metadataKey )
703- a .Unlock ()
700+ a .mtx . Lock ()
701+ delete (a .acks , meta .metadataKey . id )
702+ a .mtx . Unlock ()
704703}
705704
706705// ack processes the ack. returns true if the message was not already acked. additionally returns the expected
707706// processing time for lag calculations.
708707func (a * acks ) ack (meta metadata ) (bool , int64 ) {
709- a .Lock ()
710- m , ok := a .ackMap [meta .metadataKey ]
708+ a .mtx . Lock ()
709+ m , ok := a .acks [meta .metadataKey . id ]
711710 if ! ok {
712- a .Unlock ()
711+ a .mtx . Unlock ()
713712 // Acking a message that is already acked, which is ok.
714713 return false , 0
715714 }
716- delete (a .ackMap , meta .metadataKey )
717- a .Unlock ()
715+
716+ delete (a .acks , meta .metadataKey .id )
717+ a .mtx .Unlock ()
718+
718719 expectedProcessAtNanos := m .ExpectedProcessAtNanos ()
719720 m .Ack ()
721+
720722 return true , expectedProcessAtNanos
721723}
722724
723725func (a * acks ) size () int {
724- a .Lock ()
725- l := len (a .ackMap )
726- a .Unlock ()
726+ a .mtx . Lock ()
727+ l := len (a .acks )
728+ a .mtx . Unlock ()
727729 return l
728730}
729731
0 commit comments