Skip to content

Commit 99b9b00

Browse files
authored
Response message tracing (#327)
* feat(queryexecutor): trace per block spans * feat(responseassembler): add timings in response assembler * feat(messagequeue): add message tracing * test(fix tests and cleanup): N * style(lint): fix staticcheck * test(impl): fix flaky test impossible to know if one or two messages will be sent * feat(messagequeue): record error * fix(graphsync): test cleanups and refactors * fix(impl): fix data race
1 parent 4dfb817 commit 99b9b00

3 files changed

Lines changed: 39 additions & 3 deletions

File tree

messagequeue/builder.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package messagequeue
22

33
import (
4+
"context"
45
"io"
56

67
"github.com/ipfs/go-graphsync"
@@ -11,6 +12,7 @@ import (
1112
// Builder wraps a message builder with additional functions related to metadata
1213
// and notifications in the message queue
1314
type Builder struct {
15+
ctx context.Context
1416
*gsmsg.Builder
1517
topic Topic
1618
responseStreams map[graphsync.RequestID]io.Closer
@@ -19,8 +21,9 @@ type Builder struct {
1921
}
2022

2123
// NewBuilder sets up a new builder for the given topic
22-
func NewBuilder(topic Topic) *Builder {
24+
func NewBuilder(ctx context.Context, topic Topic) *Builder {
2325
return &Builder{
26+
ctx: ctx,
2427
Builder: gsmsg.NewBuilder(),
2528
topic: topic,
2629
responseStreams: make(map[graphsync.RequestID]io.Closer),
@@ -29,6 +32,10 @@ func NewBuilder(topic Topic) *Builder {
2932
}
3033
}
3134

35+
func (b *Builder) Context() context.Context {
36+
return b.ctx
37+
}
38+
3239
// SetResponseStream sets the given response stream to close should the message fail to send
3340
func (b *Builder) SetResponseStream(requestID graphsync.RequestID, stream io.Closer) {
3441
b.responseStreams[requestID] = stream
@@ -82,6 +89,7 @@ func (b *Builder) build(publisher notifications.Publisher) (gsmsg.GraphSyncMessa
8289
BlockData: b.blockData,
8390
ResponseCodes: message.ResponseCodes(),
8491
},
92+
ctx: b.ctx,
8593
topic: b.topic,
8694
msgSize: b.BlockSize(),
8795
responseStreams: b.responseStreams,

messagequeue/messagequeue.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ import (
1111
"github.com/ipfs/go-graphsync"
1212
logging "github.com/ipfs/go-log/v2"
1313
"github.com/libp2p/go-libp2p-core/peer"
14+
"go.opentelemetry.io/otel"
15+
"go.opentelemetry.io/otel/attribute"
16+
"go.opentelemetry.io/otel/codes"
17+
"go.opentelemetry.io/otel/trace"
1418

1519
gsmsg "github.com/ipfs/go-graphsync/message"
1620
gsnet "github.com/ipfs/go-graphsync/network"
@@ -112,7 +116,10 @@ func (mq *MessageQueue) buildMessage(size uint64, buildMessageFn func(*Builder))
112116
if shouldBeginNewResponse(mq.builders, size) {
113117
topic := mq.nextBuilderTopic
114118
mq.nextBuilderTopic++
115-
mq.builders = append(mq.builders, NewBuilder(topic))
119+
ctx, _ := otel.Tracer("graphsync").Start(mq.ctx, "message", trace.WithAttributes(
120+
attribute.Int64("topic", int64(topic)),
121+
))
122+
mq.builders = append(mq.builders, NewBuilder(ctx, topic))
116123
}
117124
builder := mq.builders[len(mq.builders)-1]
118125
buildMessageFn(builder)
@@ -156,7 +163,12 @@ func (mq *MessageQueue) runQueue() {
156163
for {
157164
_, metadata, err := mq.extractOutgoingMessage()
158165
if err == nil {
159-
mq.publishError(metadata, fmt.Errorf("message queue shutdown"))
166+
span := trace.SpanFromContext(metadata.ctx)
167+
err := fmt.Errorf("message queue shutdown")
168+
span.RecordError(err)
169+
span.SetStatus(codes.Error, err.Error())
170+
span.End()
171+
mq.publishError(metadata, err)
160172
mq.eventPublisher.Close(metadata.topic)
161173
} else {
162174
break
@@ -211,12 +223,20 @@ func (mq *MessageQueue) extractOutgoingMessage() (gsmsg.GraphSyncMessage, intern
211223

212224
func (mq *MessageQueue) sendMessage() {
213225
message, metadata, err := mq.extractOutgoingMessage()
226+
214227
if err != nil {
215228
if err != errEmptyMessage {
216229
log.Errorf("Unable to assemble GraphSync message: %s", err.Error())
217230
}
218231
return
219232
}
233+
span := trace.SpanFromContext(metadata.ctx)
234+
defer span.End()
235+
_, sendSpan := otel.Tracer("graphsync").Start(metadata.ctx, "sendMessage", trace.WithAttributes(
236+
attribute.Int64("topic", int64(metadata.topic)),
237+
attribute.Int64("size", int64(metadata.msgSize)),
238+
))
239+
defer sendSpan.End()
220240
mq.publishQueued(metadata)
221241
defer mq.eventPublisher.Close(metadata.topic)
222242

@@ -337,6 +357,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTime
337357
}
338358

339359
type internalMetadata struct {
360+
ctx context.Context
340361
public Metadata
341362
topic Topic
342363
msgSize uint64

messagequeue/messagequeue_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func TestProcessingNotification(t *testing.T) {
183183

184184
func TestDedupingMessages(t *testing.T) {
185185
ctx := context.Background()
186+
ctx, collectTracing := testutil.SetupTracing(ctx)
186187
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
187188
defer cancel()
188189

@@ -251,6 +252,12 @@ func TestDedupingMessages(t *testing.T) {
251252
t.Fatal("incorrect request added to message")
252253
}
253254
}
255+
256+
tracing := collectTracing(t)
257+
require.ElementsMatch(t, []string{
258+
"message(0)->sendMessage(0)",
259+
"message(1)->sendMessage(0)",
260+
}, tracing.TracesToStrings())
254261
}
255262

256263
func TestSendsVeryLargeBlocksResponses(t *testing.T) {

0 commit comments

Comments
 (0)