Skip to content

Commit a868ee9

Browse files
authored
feat(requestmanager): read request from context (#381)
allow a request id to be set in the context for a request
1 parent 847159a commit a868ee9

5 files changed

Lines changed: 31 additions & 4 deletions

File tree

graphsync.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,10 @@ type RequestState uint64
438438
// RequestStates describe a set of request IDs and their current state
439439
type RequestStates map[RequestID]RequestState
440440

441+
// RequestIDContextKey is used to the desired request id in context when
442+
// initializing a request
443+
type RequestIDContextKey struct{}
444+
441445
const (
442446
// Queued means a request has been received and is queued for processing
443447
Queued RequestState = iota

requestmanager/client.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,15 @@ func (rm *RequestManager) NewRequest(ctx context.Context,
178178
return rm.singleErrorResponse(err)
179179
}
180180

181+
requestID := graphsync.NewRequestID()
182+
idFromContext := ctx.Value(graphsync.RequestIDContextKey{})
183+
if existingRequestID, ok := idFromContext.(graphsync.RequestID); ok {
184+
requestID = existingRequestID
185+
}
186+
181187
inProgressRequestChan := make(chan inProgressRequest)
182188

183-
rm.send(&newRequestMessage{span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
189+
rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
184190
var receivedInProgressRequest inProgressRequest
185191
select {
186192
case <-rm.ctx.Done():

requestmanager/messages.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (trm *releaseRequestTaskMessage) handle(rm *RequestManager) {
104104
}
105105

106106
type newRequestMessage struct {
107+
requestID graphsync.RequestID
107108
span trace.Span
108109
p peer.ID
109110
root ipld.Link
@@ -115,7 +116,7 @@ type newRequestMessage struct {
115116
func (nrm *newRequestMessage) handle(rm *RequestManager) {
116117
var ipr inProgressRequest
117118

118-
ipr.request, ipr.incoming, ipr.incomingError = rm.newRequest(nrm.span, nrm.p, nrm.root, nrm.selector, nrm.extensions)
119+
ipr.request, ipr.incoming, ipr.incomingError = rm.newRequest(nrm.requestID, nrm.span, nrm.p, nrm.root, nrm.selector, nrm.extensions)
119120
ipr.requestID = ipr.request.ID()
120121

121122
select {

requestmanager/requestmanager_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,23 @@ func TestStats(t *testing.T) {
988988
require.Len(t, peerState.Pending, 0)
989989
}
990990

991+
func TestCaptureRequestIDFromContext(t *testing.T) {
992+
ctx := context.Background()
993+
td := newTestData(ctx, t)
994+
995+
requestCtx, cancel := context.WithTimeout(ctx, time.Second)
996+
defer cancel()
997+
expectedID := graphsync.NewRequestID()
998+
requestCtx = context.WithValue(requestCtx, graphsync.RequestIDContextKey{}, expectedID)
999+
1000+
peers := testutil.GeneratePeers(1)
1001+
1002+
_, _ = td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
1003+
1004+
requestRecords := readNNetworkRequests(requestCtx, t, td, 1)
1005+
require.Equal(t, expectedID, requestRecords[0].gsr.ID())
1006+
}
1007+
9911008
type requestRecord struct {
9921009
gsr gsmsg.GraphSyncRequest
9931010
p peer.ID

requestmanager/server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ func (rm *RequestManager) cleanupInProcessRequests() {
6060
}
6161
}
6262

63-
func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) {
64-
requestID := graphsync.NewRequestID()
63+
func (rm *RequestManager) newRequest(requestID graphsync.RequestID, parentSpan trace.Span, p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) {
6564

6665
parentSpan.SetAttributes(attribute.String("requestID", requestID.String()))
6766
ctx, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, parentSpan), "newRequest")

0 commit comments

Comments
 (0)