@@ -17,7 +17,7 @@ export class RandomSampler {
1717}
1818
1919/**
20- * A class that samples peers from the libp2p node and returns a peer that we don't already have a connection open to.
20+ * A class that samples peers from the libp2p node and returns a peer that we don't already have a reqresp connection open to.
2121 * If we already have a connection open, we try to sample a different peer.
2222 * We do this MAX_SAMPLE_ATTEMPTS times, if we still don't find a peer we just go for it.
2323 *
@@ -69,32 +69,65 @@ export class ConnectionSampler {
6969 getPeer ( excluding ?: Map < string , boolean > ) : PeerId | undefined {
7070 // In libp2p getPeers performs a shallow copy, so this array can be sliced from safetly
7171 const peers = this . libp2p . getPeers ( ) ;
72+ const { peer } = this . getPeerFromList ( peers , excluding ) ;
73+ return peer ;
74+ }
7275
76+ /**
77+ * Samples a peer from a list of peers, excluding those that have active (reqresp) connections or are in the exclusion list
78+ *
79+ * @param peers - The list of peers to sample from
80+ * @param excluding - The peers to exclude from the sampling
81+ * @returns - A peer from the list, or undefined if no peers are available,
82+ * - a boolean indicating if the peer has active connections, and
83+ * - all sampled peers - to enable optional resampling
84+ *
85+ * @dev The provided list peers, should be mutated by this function. This allows batch sampling
86+ * to be performed without making extra copies of the list.
87+ */
88+ getPeerFromList (
89+ peers : PeerId [ ] ,
90+ excluding ?: Map < string , boolean > ,
91+ ) : {
92+ peer : PeerId | undefined ;
93+ sampledPeers : PeerId [ ] ;
94+ } {
7395 if ( peers . length === 0 ) {
74- return undefined ;
96+ return { peer : undefined , sampledPeers : [ ] } ;
7597 }
7698
77- let randomIndex = this . sampler . random ( peers . length ) ;
78- let attempts = 0 ;
79-
80- // Keep sampling while:
81- // - we haven't exceeded max attempts AND
82- // - either the peer has active connections OR is in the exclusion list
83- while (
84- attempts < MAX_SAMPLE_ATTEMPTS &&
85- ( ( this . activeConnectionsCount . get ( peers [ randomIndex ] ) ?? 0 ) > 0 ||
86- ( excluding ?. get ( peers [ randomIndex ] ?. toString ( ) ) ?? false ) )
87- ) {
99+ const sampledPeers : PeerId [ ] = [ ] ;
100+ // Try to find a peer that has no active connections and is not in the exclusion list
101+ for ( let attempts = 0 ; attempts < MAX_SAMPLE_ATTEMPTS && peers . length > 0 ; attempts ++ ) {
102+ const randomIndex = this . sampler . random ( peers . length ) ;
103+ const peer = peers [ randomIndex ] ;
104+ const hasActiveConnections = ( this . activeConnectionsCount . get ( peer ) ?? 0 ) > 0 ;
105+ const isExcluded = excluding ?. get ( peer . toString ( ) ) ?? false ;
106+
107+ // Remove this peer from consideration
88108 peers . splice ( randomIndex , 1 ) ;
89- randomIndex = this . sampler . random ( peers . length ) ;
90- attempts ++ ;
109+
110+ // If peer is suitable (no active connections and not excluded), return it
111+ if ( ! hasActiveConnections && ! isExcluded ) {
112+ this . logger . trace ( 'Sampled peer' , {
113+ attempts,
114+ peer,
115+ } ) ;
116+ return { peer, sampledPeers } ;
117+ }
118+
119+ // Keep track of peers that have active reqresp channels, batch sampling will use these to resample
120+ sampledPeers . push ( peer ) ;
91121 }
92122
93- this . logger . trace ( `Sampled peer in ${ attempts } attempts` , {
94- attempts,
95- peer : peers [ randomIndex ] ?. toString ( ) ,
123+ // If we've exhausted our attempts or peers list is empty, return the last peer if available
124+ const lastPeer = peers . length > 0 ? peers [ this . sampler . random ( peers . length ) ] : undefined ;
125+
126+ this . logger . trace ( 'Sampled peer' , {
127+ attempts : MAX_SAMPLE_ATTEMPTS ,
128+ peer : lastPeer ?. toString ( ) ,
96129 } ) ;
97- return peers [ randomIndex ] ;
130+ return { peer : lastPeer , sampledPeers } ;
98131 }
99132
100133 /**
@@ -105,34 +138,41 @@ export class ConnectionSampler {
105138 */
106139 samplePeersBatch ( numberToSample : number ) : PeerId [ ] {
107140 const peers = this . libp2p . getPeers ( ) ;
108- const sampledPeers : PeerId [ ] = [ ] ;
109- const peersWithConnections : PeerId [ ] = [ ] ; // Hold onto peers with active connections incase we need to sample more
141+ this . logger . debug ( 'Sampling peers batch' , { numberToSample, peers } ) ;
110142
111- for ( const peer of peers ) {
112- const activeConnections = this . activeConnectionsCount . get ( peer ) ?? 0 ;
113- if ( activeConnections === 0 ) {
114- if ( sampledPeers . push ( peer ) === numberToSample ) {
115- return sampledPeers ;
116- }
117- } else {
118- peersWithConnections . push ( peer ) ;
143+ // Only sample as many peers as we have available
144+ numberToSample = Math . min ( numberToSample , peers . length ) ;
145+
146+ const batch : PeerId [ ] = [ ] ;
147+ const withActiveConnections : Set < PeerId > = new Set ( ) ;
148+ for ( let i = 0 ; i < numberToSample ; i ++ ) {
149+ const { peer, sampledPeers } = this . getPeerFromList ( peers , undefined ) ;
150+ if ( peer ) {
151+ batch . push ( peer ) ;
152+ }
153+ if ( sampledPeers . length > 0 ) {
154+ sampledPeers . forEach ( peer => withActiveConnections . add ( peer ) ) ;
119155 }
120156 }
157+ const lengthWithoutConnections = batch . length ;
121158
122159 // If we still need more peers, sample from those with connections
123- while ( sampledPeers . length < numberToSample && peersWithConnections . length > 0 ) {
124- const randomIndex = this . sampler . random ( peersWithConnections . length ) ;
125- const [ peer ] = peersWithConnections . splice ( randomIndex , 1 ) ;
126- sampledPeers . push ( peer ) ;
160+ while ( batch . length < numberToSample && withActiveConnections . size > 0 ) {
161+ const randomIndex = this . sampler . random ( withActiveConnections . size ) ;
162+
163+ const peer = Array . from ( withActiveConnections ) [ randomIndex ] ;
164+ withActiveConnections . delete ( peer ) ;
165+ batch . push ( peer ) ;
127166 }
128167
129- this . logger . trace ( `Batch sampled ${ sampledPeers . length } unique peers` , {
130- peers : sampledPeers ,
131- withoutConnections : sampledPeers . length - peersWithConnections . length ,
132- withConnections : peersWithConnections . length ,
168+ this . logger . trace ( 'Batch sampled peers' , {
169+ length : batch . length ,
170+ peers : batch ,
171+ withoutConnections : lengthWithoutConnections ,
172+ withConnections : numberToSample - lengthWithoutConnections ,
133173 } ) ;
134174
135- return sampledPeers ;
175+ return batch ;
136176 }
137177
138178 // Set of passthrough functions to keep track of active connections
@@ -155,8 +195,9 @@ export class ConnectionSampler {
155195 const updatedActiveConnectionsCount = ( this . activeConnectionsCount . get ( peerId ) ?? 0 ) + 1 ;
156196 this . activeConnectionsCount . set ( peerId , updatedActiveConnectionsCount ) ;
157197
158- this . logger . trace ( ` Dialed protocol ${ protocol } with peer ${ peerId . toString ( ) } ` , {
198+ this . logger . trace ( ' Dialed protocol' , {
159199 streamId : stream . id ,
200+ protocol,
160201 peerId : peerId . toString ( ) ,
161202 activeConnectionsCount : updatedActiveConnectionsCount ,
162203 } ) ;
@@ -181,7 +222,7 @@ export class ConnectionSampler {
181222 const updatedActiveConnectionsCount = ( this . activeConnectionsCount . get ( peerId ) ?? 1 ) - 1 ;
182223 this . activeConnectionsCount . set ( peerId , updatedActiveConnectionsCount ) ;
183224
184- this . logger . trace ( ` Closing connection to peer ${ peerId . toString ( ) } ` , {
225+ this . logger . trace ( ' Closing connection' , {
185226 streamId,
186227 peerId : peerId . toString ( ) ,
187228 protocol : stream . protocol ,
@@ -207,7 +248,7 @@ export class ConnectionSampler {
207248 // Check if we have lost track of accounting
208249 if ( this . activeConnectionsCount . get ( peerId ) === 0 ) {
209250 await this . close ( streamId ) ;
210- this . logger . debug ( ` Cleaned up stale connection ${ streamId } to peer ${ peerId . toString ( ) } ` ) ;
251+ this . logger . debug ( ' Cleaned up stale connection' , { streamId, peerId : peerId . toString ( ) } ) ;
211252 }
212253 } catch ( error ) {
213254 this . logger . error ( `Error cleaning up stale connection ${ streamId } ` , { error } ) ;
0 commit comments