@@ -34,6 +34,7 @@ import (
3434 "github.com/ipld/go-ipld-prime/traversal/selector/builder"
3535 "github.com/libp2p/go-libp2p-core/host"
3636 "github.com/libp2p/go-libp2p-core/peer"
37+ "github.com/libp2p/go-libp2p-core/protocol"
3738 mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
3839 "github.com/stretchr/testify/require"
3940
@@ -49,6 +50,22 @@ import (
4950 "github.com/ipfs/go-graphsync/testutil"
5051)
5152
53+ // nil means use the default protocols
54+ // tests data transfer for the following protocol combinations:
55+ // default protocol -> default protocols
56+ // old protocol -> default protocols
57+ // default protocols -> old protocol
58+ // old protocol -> old protocol
59+ var protocolsForTest = map [string ]struct {
60+ host1Protocols []protocol.ID
61+ host2Protocols []protocol.ID
62+ }{
63+ "(v1.1 -> v1.1)" : {nil , nil },
64+ "(v1.0 -> v1.1)" : {[]protocol.ID {gsnet .ProtocolGraphsync_1_0_0 }, nil },
65+ "(v1.1 -> v1.0)" : {nil , []protocol.ID {gsnet .ProtocolGraphsync_1_0_0 }},
66+ "(v1.0 -> v1.0)" : {[]protocol.ID {gsnet .ProtocolGraphsync_1_0_0 }, []protocol.ID {gsnet .ProtocolGraphsync_1_0_0 }},
67+ }
68+
5269func TestMakeRequestToNetwork (t * testing.T ) {
5370
5471 // create network
@@ -269,7 +286,6 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
269286}
270287
271288func TestGraphsyncRoundTripRequestBudgetResponder (t * testing.T ) {
272-
273289 // create network
274290 ctx := context .Background ()
275291 ctx , collectTracing := testutil .SetupTracing (ctx )
@@ -317,112 +333,115 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
317333}
318334
319335func TestGraphsyncRoundTrip (t * testing.T ) {
320-
321- // create network
322- ctx := context .Background ()
323- ctx , collectTracing := testutil .SetupTracing (ctx )
324- ctx , cancel := context .WithTimeout (ctx , 3 * time .Second )
325- defer cancel ()
326- td := newGsTestData (ctx , t )
327-
328- // initialize graphsync on first node to make requests
329- requestor := td .GraphSyncHost1 ()
330-
331- // setup receiving peer to just record message coming in
332- blockChainLength := 100
333- blockChain := testutil .SetupBlockChain (ctx , t , td .persistence2 , 100 , blockChainLength )
334-
335- // initialize graphsync on second node to response to requests
336- responder := td .GraphSyncHost2 ()
337- assertComplete := assertCompletionFunction (responder , 1 )
338-
339- var receivedResponseData []byte
340- var receivedRequestData []byte
341-
342- requestor .RegisterIncomingResponseHook (
343- func (p peer.ID , responseData graphsync.ResponseData , hookActions graphsync.IncomingResponseHookActions ) {
344- data , has := responseData .Extension (td .extensionName )
345- if has {
346- receivedResponseData = data
336+ for pname , ps := range protocolsForTest {
337+ t .Run (pname , func (t * testing.T ) {
338+ // create network
339+ ctx := context .Background ()
340+ ctx , collectTracing := testutil .SetupTracing (ctx )
341+ ctx , cancel := context .WithTimeout (ctx , 3 * time .Second )
342+ defer cancel ()
343+ td := newOptionalGsTestData (ctx , t , ps .host1Protocols , ps .host2Protocols )
344+
345+ // initialize graphsync on first node to make requests
346+ requestor := td .GraphSyncHost1 ()
347+
348+ // setup receiving peer to just record message coming in
349+ blockChainLength := 100
350+ blockChain := testutil .SetupBlockChain (ctx , t , td .persistence2 , 100 , blockChainLength )
351+
352+ // initialize graphsync on second node to response to requests
353+ responder := td .GraphSyncHost2 ()
354+ assertComplete := assertCompletionFunction (responder , 1 )
355+
356+ var receivedResponseData []byte
357+ var receivedRequestData []byte
358+
359+ requestor .RegisterIncomingResponseHook (
360+ func (p peer.ID , responseData graphsync.ResponseData , hookActions graphsync.IncomingResponseHookActions ) {
361+ data , has := responseData .Extension (td .extensionName )
362+ if has {
363+ receivedResponseData = data
364+ }
365+ })
366+
367+ responder .RegisterIncomingRequestHook (func (p peer.ID , requestData graphsync.RequestData , hookActions graphsync.IncomingRequestHookActions ) {
368+ var has bool
369+ receivedRequestData , has = requestData .Extension (td .extensionName )
370+ if ! has {
371+ hookActions .TerminateWithError (errors .New ("Missing extension" ))
372+ } else {
373+ hookActions .SendExtensionData (td .extensionResponse )
374+ }
375+ })
376+
377+ finalResponseStatusChan := make (chan graphsync.ResponseStatusCode , 1 )
378+ responder .RegisterCompletedResponseListener (func (p peer.ID , request graphsync.RequestData , status graphsync.ResponseStatusCode ) {
379+ select {
380+ case finalResponseStatusChan <- status :
381+ default :
382+ }
383+ })
384+ progressChan , errChan := requestor .Request (ctx , td .host2 .ID (), blockChain .TipLink , blockChain .Selector (), td .extension )
385+
386+ blockChain .VerifyWholeChain (ctx , progressChan )
387+ testutil .VerifyEmptyErrors (ctx , t , errChan )
388+ require .Len (t , td .blockStore1 , blockChainLength , "did not store all blocks" )
389+
390+ // verify extension roundtrip
391+ require .Equal (t , td .extensionData , receivedRequestData , "did not receive correct extension request data" )
392+ require .Equal (t , td .extensionResponseData , receivedResponseData , "did not receive correct extension response data" )
393+
394+ // verify listener
395+ var finalResponseStatus graphsync.ResponseStatusCode
396+ testutil .AssertReceive (ctx , t , finalResponseStatusChan , & finalResponseStatus , "should receive status" )
397+ require .Equal (t , graphsync .RequestCompletedFull , finalResponseStatus )
398+
399+ drain (requestor )
400+ drain (responder )
401+ assertComplete (ctx , t )
402+
403+ tracing := collectTracing (t )
404+
405+ traceStrings := tracing .TracesToStrings ()
406+ require .Contains (t , traceStrings , "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)" )
407+ require .Contains (t , traceStrings , "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)" )
408+ require .Contains (t , traceStrings , "request(0)->newRequest(0)" )
409+ require .Contains (t , traceStrings , "request(0)->executeTask(0)" )
410+ require .Contains (t , traceStrings , "request(0)->terminateRequest(0)" )
411+ require .Contains (t , traceStrings , "processResponses(0)->loaderProcess(0)->cacheProcess(0)" ) // should have one of these per response
412+ require .Contains (t , traceStrings , "request(0)->verifyBlock(0)" ) // should have one of these per block
413+
414+ processUpdateSpan := tracing .FindSpanByTraceString ("response(0)" )
415+ require .Equal (t , int64 (0 ), testutil .AttributeValueInTraceSpan (t , * processUpdateSpan , "priority" ).AsInt64 ())
416+ require .Equal (t , []string {string (td .extensionName )}, testutil .AttributeValueInTraceSpan (t , * processUpdateSpan , "extensions" ).AsStringSlice ())
417+
418+ // each verifyBlock span should link to a cacheProcess span that stored it
419+
420+ cacheProcessSpans := tracing .FindSpans ("cacheProcess" )
421+ cacheProcessLinks := make (map [string ]int64 )
422+ verifyBlockSpans := tracing .FindSpans ("verifyBlock" )
423+
424+ for _ , verifyBlockSpan := range verifyBlockSpans {
425+ require .Len (t , verifyBlockSpan .Links , 1 , "verifyBlock span should have one link" )
426+ found := false
427+ for _ , cacheProcessSpan := range cacheProcessSpans {
428+ sid := cacheProcessSpan .SpanContext .SpanID ().String ()
429+ if verifyBlockSpan .Links [0 ].SpanContext .SpanID ().String () == sid {
430+ found = true
431+ cacheProcessLinks [sid ] = cacheProcessLinks [sid ] + 1
432+ break
433+ }
434+ }
435+ require .True (t , found , "verifyBlock should link to a known cacheProcess span" )
347436 }
348- })
349437
350- responder .RegisterIncomingRequestHook (func (p peer.ID , requestData graphsync.RequestData , hookActions graphsync.IncomingRequestHookActions ) {
351- var has bool
352- receivedRequestData , has = requestData .Extension (td .extensionName )
353- if ! has {
354- hookActions .TerminateWithError (errors .New ("Missing extension" ))
355- } else {
356- hookActions .SendExtensionData (td .extensionResponse )
357- }
358- })
438+ // each cacheProcess span should be linked to one verifyBlock span per block it stored
359439
360- finalResponseStatusChan := make (chan graphsync.ResponseStatusCode , 1 )
361- responder .RegisterCompletedResponseListener (func (p peer.ID , request graphsync.RequestData , status graphsync.ResponseStatusCode ) {
362- select {
363- case finalResponseStatusChan <- status :
364- default :
365- }
366- })
367- progressChan , errChan := requestor .Request (ctx , td .host2 .ID (), blockChain .TipLink , blockChain .Selector (), td .extension )
368-
369- blockChain .VerifyWholeChain (ctx , progressChan )
370- testutil .VerifyEmptyErrors (ctx , t , errChan )
371- require .Len (t , td .blockStore1 , blockChainLength , "did not store all blocks" )
372-
373- // verify extension roundtrip
374- require .Equal (t , td .extensionData , receivedRequestData , "did not receive correct extension request data" )
375- require .Equal (t , td .extensionResponseData , receivedResponseData , "did not receive correct extension response data" )
376-
377- // verify listener
378- var finalResponseStatus graphsync.ResponseStatusCode
379- testutil .AssertReceive (ctx , t , finalResponseStatusChan , & finalResponseStatus , "should receive status" )
380- require .Equal (t , graphsync .RequestCompletedFull , finalResponseStatus )
381-
382- drain (requestor )
383- drain (responder )
384- assertComplete (ctx , t )
385-
386- tracing := collectTracing (t )
387-
388- traceStrings := tracing .TracesToStrings ()
389- require .Contains (t , traceStrings , "response(0)->executeTask(0)->processBlock(0)->loadBlock(0)" )
390- require .Contains (t , traceStrings , "response(0)->executeTask(0)->processBlock(0)->sendBlock(0)->processBlockHooks(0)" )
391- require .Contains (t , traceStrings , "request(0)->newRequest(0)" )
392- require .Contains (t , traceStrings , "request(0)->executeTask(0)" )
393- require .Contains (t , traceStrings , "request(0)->terminateRequest(0)" )
394- require .Contains (t , traceStrings , "processResponses(0)->loaderProcess(0)->cacheProcess(0)" ) // should have one of these per response
395- require .Contains (t , traceStrings , "request(0)->verifyBlock(0)" ) // should have one of these per block
396-
397- processUpdateSpan := tracing .FindSpanByTraceString ("response(0)" )
398- require .Equal (t , int64 (0 ), testutil .AttributeValueInTraceSpan (t , * processUpdateSpan , "priority" ).AsInt64 ())
399- require .Equal (t , []string {string (td .extensionName )}, testutil .AttributeValueInTraceSpan (t , * processUpdateSpan , "extensions" ).AsStringSlice ())
400-
401- // each verifyBlock span should link to a cacheProcess span that stored it
402-
403- cacheProcessSpans := tracing .FindSpans ("cacheProcess" )
404- cacheProcessLinks := make (map [string ]int64 )
405- verifyBlockSpans := tracing .FindSpans ("verifyBlock" )
406-
407- for _ , verifyBlockSpan := range verifyBlockSpans {
408- require .Len (t , verifyBlockSpan .Links , 1 , "verifyBlock span should have one link" )
409- found := false
410- for _ , cacheProcessSpan := range cacheProcessSpans {
411- sid := cacheProcessSpan .SpanContext .SpanID ().String ()
412- if verifyBlockSpan .Links [0 ].SpanContext .SpanID ().String () == sid {
413- found = true
414- cacheProcessLinks [sid ] = cacheProcessLinks [sid ] + 1
415- break
440+ for _ , cacheProcessSpan := range cacheProcessSpans {
441+ blockCount := testutil .AttributeValueInTraceSpan (t , cacheProcessSpan , "blockCount" ).AsInt64 ()
442+ require .Equal (t , cacheProcessLinks [cacheProcessSpan .SpanContext .SpanID ().String ()], blockCount , "cacheProcess span should be linked to one verifyBlock span per block it processed" )
416443 }
417- }
418- require .True (t , found , "verifyBlock should link to a known cacheProcess span" )
419- }
420-
421- // each cacheProcess span should be linked to one verifyBlock span per block it stored
422-
423- for _ , cacheProcessSpan := range cacheProcessSpans {
424- blockCount := testutil .AttributeValueInTraceSpan (t , cacheProcessSpan , "blockCount" ).AsInt64 ()
425- require .Equal (t , cacheProcessLinks [cacheProcessSpan .SpanContext .SpanID ().String ()], blockCount , "cacheProcess span should be linked to one verifyBlock span per block it processed" )
444+ })
426445 }
427446}
428447
@@ -1701,6 +1720,10 @@ func assertCancelOrCompleteFunction(gs graphsync.GraphExchange, requestCount int
17011720}
17021721
17031722func newGsTestData (ctx context.Context , t * testing.T ) * gsTestData {
1723+ return newOptionalGsTestData (ctx , t , nil , nil )
1724+ }
1725+
1726+ func newOptionalGsTestData (ctx context.Context , t * testing.T , network1Protocols []protocol.ID , network2Protocols []protocol.ID ) * gsTestData {
17041727 t .Helper ()
17051728 td := & gsTestData {ctx : ctx }
17061729 td .mn = mocknet .New (ctx )
@@ -1713,8 +1736,16 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
17131736 err = td .mn .LinkAll ()
17141737 require .NoError (t , err , "error linking hosts" )
17151738
1716- td .gsnet1 = gsnet .NewFromLibp2pHost (td .host1 )
1717- td .gsnet2 = gsnet .NewFromLibp2pHost (td .host2 )
1739+ opts := make ([]gsnet.Option , 0 )
1740+ if network1Protocols != nil {
1741+ opts = append (opts , gsnet .GraphsyncProtocols (network1Protocols ))
1742+ }
1743+ td .gsnet1 = gsnet .NewFromLibp2pHost (td .host1 , opts ... )
1744+ opts = make ([]gsnet.Option , 0 )
1745+ if network2Protocols != nil {
1746+ opts = append (opts , gsnet .GraphsyncProtocols (network2Protocols ))
1747+ }
1748+ td .gsnet2 = gsnet .NewFromLibp2pHost (td .host2 , opts ... )
17181749 td .blockStore1 = make (map [ipld.Link ][]byte )
17191750 td .persistence1 = testutil .NewTestStore (td .blockStore1 )
17201751 td .blockStore2 = make (map [ipld.Link ][]byte )
@@ -1784,7 +1815,7 @@ func processResponsesTraces(t *testing.T, tracing *testutil.Collector, responseC
17841815 finalStub := tracing .FindSpanByTraceString (fmt .Sprintf ("processResponses(%d)->loaderProcess(0)" , responseCount - 1 ))
17851816 require .NotNil (t , finalStub )
17861817 if len (testutil .AttributeValueInTraceSpan (t , * finalStub , "requestIDs" ).AsStringSlice ()) == 0 {
1787- return append (traces , fmt .Sprintf ("responseMessage (%d)->loaderProcess(0)" , responseCount - 1 ))
1818+ return append (traces , fmt .Sprintf ("processResponses (%d)->loaderProcess(0)" , responseCount - 1 ))
17881819 }
17891820 return append (traces , fmt .Sprintf ("processResponses(%d)->loaderProcess(0)->cacheProcess(0)" , responseCount - 1 ))
17901821}
0 commit comments