Skip to content

Commit d331795

Browse files
authored
Add mutex to fix some data race (#227)
1 parent 04d4d36 commit d331795

14 files changed

Lines changed: 68 additions & 33 deletions

File tree

.github/workflows/plugin-tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ jobs:
8080
- go-redisv9
8181
- go-restfulv3
8282
- gorm
83-
- kratosv2
83+
# - kratosv2 temporary disable because it's not stable
8484
- microv4
8585
- mongo
8686
- mysql

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Release Notes.
99
* Support Windows plugin test.
1010
* Support Kafka reporter.
1111
* Add recover to goroutine to prevent unexpected panics.
12+
* Add mutex to fix some data race.
1213

1314
#### Plugins
1415

plugins/core/context.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package core
1919

20-
import "reflect"
20+
import (
21+
"reflect"
22+
"sync"
23+
)
2124

2225
var (
2326
GetGLS = func() interface{} { return nil }
@@ -29,16 +32,21 @@ var (
2932
)
3033

3134
type ContextSnapshoter interface {
32-
TakeSnapShot(val interface{}) interface{}
35+
TakeSnapShot() interface{}
3336
}
3437

3538
type TracingContext struct {
3639
activeSpan TracingSpan
3740
Runtime *RuntimeContext
3841
ID *IDContext
42+
43+
activeSpanLock sync.RWMutex
3944
}
4045

41-
func (t *TracingContext) TakeSnapShot(val interface{}) interface{} {
46+
func (t *TracingContext) TakeSnapShot() interface{} {
47+
if t == nil {
48+
return nil
49+
}
4250
snapshot := newSnapshotSpan(t.ActiveSpan())
4351
return &TracingContext{
4452
activeSpan: snapshot,
@@ -48,13 +56,17 @@ func (t *TracingContext) TakeSnapShot(val interface{}) interface{} {
4856
}
4957

5058
func (t *TracingContext) ActiveSpan() TracingSpan {
59+
t.activeSpanLock.RLock()
60+
defer t.activeSpanLock.RUnlock()
5161
if t.activeSpan == nil || reflect.ValueOf(t.activeSpan).IsZero() {
5262
return nil
5363
}
5464
return t.activeSpan
5565
}
5666

5767
func (t *TracingContext) SaveActiveSpan(s TracingSpan) {
68+
t.activeSpanLock.Lock()
69+
defer t.activeSpanLock.Unlock()
5870
t.activeSpan = s
5971
}
6072

plugins/core/metrics.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (t *Tracer) reachNotInitMetrics() {
7575
func (t *Tracer) sendMetrics() {
7676
meters := make([]reporter.ReportedMeter, 0)
7777
// call collect hook
78-
for _, hook := range t.meterCollectListeners {
78+
for _, hook := range t.allMeterCollectListeners() {
7979
hook()
8080
}
8181
t.meterMap.Range(func(key, value interface{}) bool {
@@ -90,6 +90,14 @@ func (t *Tracer) sendMetrics() {
9090
t.Reporter.SendMetrics(meters)
9191
}
9292

93+
func (t *Tracer) allMeterCollectListeners() []func() {
94+
t.meterCollectListenersLock.RLock()
95+
defer t.meterCollectListenersLock.RUnlock()
96+
listeners := make([]func(), 0, len(t.meterCollectListeners))
97+
listeners = append(listeners, t.meterCollectListeners...)
98+
return listeners
99+
}
100+
93101
func (t *Tracer) NewCounter(name string, opt interface{}) interface{} {
94102
counter := newCounter(name, nil, 0)
95103
if o, ok := opt.(meterOpts); ok && o != nil {
@@ -118,6 +126,8 @@ func (t *Tracer) NewHistogram(name string, minValue float64, steps []float64, op
118126
}
119127

120128
func (t *Tracer) AddCollectHook(f func()) {
129+
t.meterCollectListenersLock.Lock()
130+
defer t.meterCollectListenersLock.Unlock()
121131
t.meterCollectListeners = append(t.meterCollectListeners, f)
122132
}
123133

@@ -347,7 +357,7 @@ func (h *histogramBucket) Bucket() float64 {
347357
}
348358

349359
func (h *histogramBucket) Count() int64 {
350-
return *h.value
360+
return atomic.LoadInt64(h.value)
351361
}
352362

353363
func (h *histogramBucket) IsNegativeInfinity() bool {

plugins/core/operator/invocation.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ type realInvocation struct {
2727
returnValues []interface{}
2828

2929
context interface{}
30+
31+
// self obs
32+
interTimeCost int64
33+
beforeInterStart int64
3034
}
3135

3236
func (i *realInvocation) CallerInstance() interface{} {

plugins/core/sampler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,13 @@ type DynamicSampler struct {
107107
currentRate float64
108108
defaultRate float64
109109
sampler Sampler
110+
locker sync.RWMutex
110111
}
111112

112113
// IsSampled implements IsSampled() of Sampler.
113114
func (s *DynamicSampler) IsSampled(operation string) bool {
115+
s.locker.RLock()
116+
defer s.locker.RUnlock()
114117
return s.sampler.IsSampled(operation)
115118
}
116119

@@ -136,11 +139,15 @@ func (s *DynamicSampler) Notify(eventType reporter.AgentConfigEventType, newValu
136139
} else {
137140
sampler = NewRandomSampler(samplingRate)
138141
}
142+
s.locker.Lock()
143+
defer s.locker.Unlock()
139144
s.sampler = sampler
140145
s.currentRate = samplingRate
141146
}
142147

143148
func (s *DynamicSampler) Value() string {
149+
s.locker.RLock()
150+
defer s.locker.RUnlock()
144151
return fmt.Sprintf("%f", s.currentRate)
145152
}
146153

plugins/core/span_tracing.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,12 +255,12 @@ func (rs *RootSegmentSpan) AsyncFinish() {
255255
}
256256

257257
func (rs *RootSegmentSpan) end0() {
258-
go func() {
259-
defer func() {
260-
_ = recover()
261-
}()
262-
rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
258+
defer func() {
259+
_ = recover()
263260
}()
261+
if rs != nil && rs.doneCh != nil && rs.SegmentContext.refNum != nil {
262+
rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
263+
}
264264
}
265265

266266
func (rs *RootSegmentSpan) createRootSegmentContext(ctx *TracingContext, _ SegmentSpan) (err error) {

plugins/core/test_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func SetAsNewGoroutine() {
5757
return
5858
}
5959
if e := gls.(ContextSnapshoter); e != nil {
60-
SetGLS(e.TakeSnapShot(GetGLS()))
60+
SetGLS(e.TakeSnapShot())
6161
}
6262
}
6363

plugins/core/tracer.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ type Tracer struct {
4949
// for plugin tools
5050
tools *TracerTools
5151
// for all metrics
52-
meterMap *sync.Map
53-
meterCollectListeners []func()
54-
ignoreSuffix []string
55-
traceIgnorePath []string
56-
mu sync.Mutex
52+
meterMap *sync.Map
53+
meterCollectListeners []func()
54+
meterCollectListenersLock sync.RWMutex
55+
ignoreSuffix []string
56+
traceIgnorePath []string
57+
mu sync.Mutex
5758
}
5859

5960
func (t *Tracer) Init(entity *reporter.Entity, rep reporter.Reporter, samp Sampler, logger operator.LogOperator,

plugins/core/tracing.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ func (t *Tracer) ContinueContext(snapshot interface{}) {
172172
ctx = NewTracingContext()
173173
SetGLS(ctx)
174174
}
175+
ctx.activeSpanLock.Lock()
176+
defer ctx.activeSpanLock.Unlock()
175177
ctx.activeSpan = snap.activeSpan
176178
ctx.Runtime = snap.runtime
177179
}

0 commit comments

Comments
 (0)