Skip to content

Commit 9a7385f

Browse files
authored
Fix data race when sending trace data to reporter. (#245)
1 parent 052bf4b commit 9a7385f

7 files changed

Lines changed: 327 additions & 19 deletions

File tree

.github/workflows/skywalking-go.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ jobs:
3737
uses: apache/skywalking-eyes@9bd5feb86b5817aa6072b008f9866a2c3bbc8587
3838
- name: Test
3939
run: make test
40+
- name: Test Race
41+
run: make test-race
4042
- name: Lint
4143
run: make lint
4244

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ Release Notes.
1212
* Add mutex to fix some data race.
1313
* Replace external `goapi` dependency with in-repo generated protocols.
1414
* Support pprof profiling.
15+
1516
#### Plugins
1617

1718
#### Documentation
19+
1820
#### Bug Fixes
1921

2022
* Fix plugin interceptors bypassed on Windows.
2123
* Fix wrong tracing context switch when trace ignore plugin activated.
24+
* Fix data race when sending trace data to reporter.
2225

2326
#### Issues and PR
2427
- All issues are [here](https://github.com/apache/skywalking/milestone/238?closed=1)

Makefile

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,24 @@ test: ## Run E2E scenario tests
8989
fi; \
9090
done
9191

92+
.PHONY: test-race
93+
test-race: ## Run data-race regression tests (TestRace*) under the race detector
94+
@$(LOG_TARGET)
95+
@for dir in $$(find . -name go.mod -exec dirname {} \; ); do \
96+
if [[ $$dir == "./test/"* ]]; then \
97+
continue; \
98+
fi; \
99+
cd $$dir; \
100+
echo "Race testing $$dir"; \
101+
go test -race -run '^TestRace' ./...; \
102+
test_status=$$?; \
103+
cd ${REPODIR}; \
104+
if [ $$test_status -ne 0 ]; then \
105+
echo "Error occurred during race test, exiting..."; \
106+
exit $$test_status; \
107+
fi; \
108+
done
109+
92110
.PHONY: lint
93111
lint: linter ## Run golangci-lint linter
94112
@$(LOG_TARGET)

plugins/core/reporter/grpc/grpc.go

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,27 @@ func (r *gRPCReporter) closeGRPCConn() {
182182
}
183183
}
184184

185+
// sendWithRecover invokes send and recovers from a panic raised while encoding or
186+
// transmitting a single message, so that one corrupted payload cannot tear down the
187+
// whole send pipeline. On a recovered panic it logs via the existing logger and
188+
// returns recovered=true, telling the caller to skip the current message and keep
189+
// streaming the rest.
190+
//
191+
// Such a panic originates in protobuf size/marshal computation (the #13885 crash),
192+
// which runs before any bytes are written to the stream, so the stream stays valid
193+
// and may be reused for the next message. Should a panic ever leave the stream
194+
// inconsistent, the following send returns an error and the caller reconnects.
195+
func (r *gRPCReporter) sendWithRecover(send func() error) (recovered bool, err error) {
196+
defer func() {
197+
if rec := recover(); rec != nil {
198+
r.logger.Errorf("gRPCReporter recovered from panic while sending, skip current message: %v", rec)
199+
recovered = true
200+
}
201+
}()
202+
err = send()
203+
return recovered, err
204+
}
205+
185206
// nolint
186207
func (r *gRPCReporter) initSendPipeline() {
187208
if r.traceClient == nil {
@@ -210,9 +231,12 @@ func (r *gRPCReporter) initSendPipeline() {
210231
continue StreamLoop
211232
}
212233
for s := range r.tracingSendCh {
213-
err = stream.Send(s)
214-
if err != nil {
215-
r.logger.Errorf("send segment error %v", err)
234+
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) })
235+
if recovered {
236+
continue
237+
}
238+
if sendErr != nil {
239+
r.logger.Errorf("send segment error %v", sendErr)
216240
r.closeTracingStream(stream)
217241
continue StreamLoop
218242
}
@@ -245,11 +269,14 @@ func (r *gRPCReporter) initSendPipeline() {
245269
continue StreamLoop
246270
}
247271
for s := range r.metricsSendCh {
248-
err = stream.Send(&agentv3.MeterDataCollection{
249-
MeterData: s,
272+
recovered, sendErr := r.sendWithRecover(func() error {
273+
return stream.Send(&agentv3.MeterDataCollection{MeterData: s})
250274
})
251-
if err != nil {
252-
r.logger.Errorf("send metrics error %v", err)
275+
if recovered {
276+
continue
277+
}
278+
if sendErr != nil {
279+
r.logger.Errorf("send metrics error %v", sendErr)
253280
r.closeMetricsStream(stream)
254281
continue StreamLoop
255282
}
@@ -281,9 +308,12 @@ func (r *gRPCReporter) initSendPipeline() {
281308
continue StreamLoop
282309
}
283310
for s := range r.logSendCh {
284-
err = stream.Send(s)
285-
if err != nil {
286-
r.logger.Errorf("send log error %v", err)
311+
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(s) })
312+
if recovered {
313+
continue
314+
}
315+
if sendErr != nil {
316+
r.logger.Errorf("send log error %v", sendErr)
287317
r.closeLogStream(stream)
288318
continue StreamLoop
289319
}
@@ -325,9 +355,12 @@ func (r *gRPCReporter) initSendPipeline() {
325355
}
326356
r.logger.Infof("Sending profile task: TaskID='%s', PayloadSize=%d, IsLast=%v",
327357
task.TaskID, len(task.Payload), task.IsLast)
328-
err = stream.Send(profileData)
329-
if err != nil {
330-
r.logger.Errorf("send profile data error %v", err)
358+
recovered, sendErr := r.sendWithRecover(func() error { return stream.Send(profileData) })
359+
if recovered {
360+
continue
361+
}
362+
if sendErr != nil {
363+
r.logger.Errorf("send profile data error %v", sendErr)
331364
r.closeProfileStream(stream)
332365
continue StreamLoop
333366
}

plugins/core/reporter/kafka/kafka.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,29 @@ func (r *kafkaReporter) initSendPipeline() {
141141
go r.logSendLoop()
142142
}
143143

144+
// marshalWithRecover invokes marshal and recovers from a panic raised while encoding
145+
// a single message, so that one corrupted payload cannot tear down the whole send
146+
// loop. On a recovered panic it logs via the existing logger and returns
147+
// recovered=true, telling the caller to skip the current message and keep going.
148+
func (r *kafkaReporter) marshalWithRecover(marshal func() ([]byte, error)) (payload []byte, recovered bool, err error) {
149+
defer func() {
150+
if rec := recover(); rec != nil {
151+
r.logger.Errorf("kafkaReporter recovered from panic while marshaling, skip current message: %v", rec)
152+
recovered = true
153+
}
154+
}()
155+
payload, err = marshal()
156+
return payload, recovered, err
157+
}
158+
144159
func (r *kafkaReporter) tracingSendLoop() {
145160
consecutiveErrors := 0
146161
logFrequency := 30
147162
for s := range r.tracingSendCh {
148-
payload, err := proto.Marshal(s)
163+
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) })
164+
if recovered {
165+
continue
166+
}
149167
if err != nil {
150168
r.logger.Errorf("marshal segment error %v", err)
151169
continue
@@ -172,9 +190,12 @@ func (r *kafkaReporter) metricsSendLoop() {
172190
consecutiveErrors := 0
173191
logFrequency := 30
174192
for s := range r.metricsSendCh {
175-
payload, err := proto.Marshal(&agentv3.MeterDataCollection{
176-
MeterData: s,
193+
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) {
194+
return proto.Marshal(&agentv3.MeterDataCollection{MeterData: s})
177195
})
196+
if recovered {
197+
continue
198+
}
178199
if err != nil {
179200
r.logger.Errorf("marshal metrics error %v", err)
180201
continue
@@ -201,7 +222,10 @@ func (r *kafkaReporter) logSendLoop() {
201222
consecutiveErrors := 0
202223
logFrequency := 30
203224
for s := range r.logSendCh {
204-
payload, err := proto.Marshal(s)
225+
payload, recovered, err := r.marshalWithRecover(func() ([]byte, error) { return proto.Marshal(s) })
226+
if recovered {
227+
continue
228+
}
205229
if err != nil {
206230
r.logger.Errorf("marshal log error %v", err)
207231
continue

plugins/core/reporter/transform.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package reporter
2020
import (
2121
"time"
2222

23+
commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
2324
agentv3 "github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
2425
)
2526

@@ -60,8 +61,8 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen
6061
SpanLayer: s.SpanLayer(),
6162
ComponentId: s.ComponentID(),
6263
IsError: s.IsError(),
63-
Tags: s.Tags(),
64-
Logs: s.Logs(),
64+
Tags: copyKeyStringValuePairs(s.Tags()),
65+
Logs: copyLogs(s.Logs()),
6566
}
6667
srr := make([]*agentv3.SegmentReference, 0)
6768
if i == (spanSize-1) && spanCtx.GetParentSpanID() > -1 {
@@ -93,6 +94,42 @@ func (r *Transform) TransformSegmentObject(spans []ReportedSpan) *agentv3.Segmen
9394
return segmentObject
9495
}
9596

97+
// copyLogs returns a deep copy of the span logs for the same reason as
98+
// copyKeyStringValuePairs.
99+
func copyLogs(logs []*agentv3.Log) []*agentv3.Log {
100+
if len(logs) == 0 {
101+
return nil
102+
}
103+
cp := make([]*agentv3.Log, 0, len(logs))
104+
for _, log := range logs {
105+
if log == nil {
106+
continue
107+
}
108+
cp = append(cp, &agentv3.Log{Time: log.Time, Data: copyKeyStringValuePairs(log.Data)})
109+
}
110+
return cp
111+
}
112+
113+
// copyKeyStringValuePairs returns a deep copy of a key/value pair slice (span tags,
114+
// or a log's data) so that the reported SegmentObject never shares mutable backing
115+
// storage with the live span. Without this copy the reporter marshals the slice in a
116+
// different goroutine while the span may still be mutated (e.g. when a span is
117+
// mistakenly shared across goroutines), corrupting the protobuf message during encode
118+
// (apache/skywalking#13885). Nil elements are skipped and an empty input returns nil.
119+
func copyKeyStringValuePairs(pairs []*commonv3.KeyStringValuePair) []*commonv3.KeyStringValuePair {
120+
if len(pairs) == 0 {
121+
return nil
122+
}
123+
cp := make([]*commonv3.KeyStringValuePair, 0, len(pairs))
124+
for _, p := range pairs {
125+
if p == nil {
126+
continue
127+
}
128+
cp = append(cp, &commonv3.KeyStringValuePair{Key: p.Key, Value: p.Value})
129+
}
130+
return cp
131+
}
132+
96133
func (r *Transform) TransformMeterData(metrics []ReportedMeter) []*agentv3.MeterData {
97134
if len(metrics) == 0 {
98135
return nil

0 commit comments

Comments
 (0)