diff --git a/packages/kad-dht/src/content-routing/index.ts b/packages/kad-dht/src/content-routing/index.ts index 8039c7c968..921af99e33 100644 --- a/packages/kad-dht/src/content-routing/index.ts +++ b/packages/kad-dht/src/content-routing/index.ts @@ -151,16 +151,24 @@ export class ContentRouting { finalPeerEvents.push(event) } - finalPeerEvents.forEach(event => { - queue.add(async () => { - for await (const notifyEvent of publishProviderRecord(event)) { - events.push(notifyEvent) - } + // sort connected peers first to reuse existing connections before + // opening new ones when sending ADD_PROVIDER records + finalPeerEvents + .sort((a, b) => { + const aConnected = (self.components.connectionManager.getConnections(a.peer.id) ?? []).length > 0 ? 0 : 1 + const bConnected = (self.components.connectionManager.getConnections(b.peer.id) ?? []).length > 0 ? 0 : 1 + return aConnected - bConnected }) - .catch(err => { - this.log.error('error publishing provider record to peer - %e', err) + .forEach(event => { + queue.add(async () => { + for await (const notifyEvent of publishProviderRecord(event)) { + events.push(notifyEvent) + } }) - }) + .catch(err => { + this.log.error('error publishing provider record to peer - %e', err) + }) + }) }) .catch(err => { events.end(err) diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 681c682fcf..6e3eee91c0 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -172,13 +172,19 @@ export class QueryManager implements Startable { count: this.routingTable.kBucketSize }) - // split peers into d buckets evenly(ish) - const peersToQuery = peers.sort(() => { - if (Math.random() > 0.5) { - return 1 + // split peers into d buckets evenly(ish), preferring already-connected + // peers to avoid opening new connections unnecessarily + const peersToQuery = peers.sort((a, b) => { + const aConnected = (this.connectionManager.getConnections(a) ?? []).length > 0 ? 0 : 1 + const bConnected = (this.connectionManager.getConnections(b) ?? []).length > 0 ? 0 : 1 + + if (aConnected !== bConnected) { + // put connected peers first + return aConnected - bConnected } - return -1 + // randomize within each group to maintain path diversity + return Math.random() > 0.5 ? 1 : -1 }) .reduce((acc: PeerId[][], curr, index) => { acc[index % this.disjointPaths].push(curr) diff --git a/packages/kad-dht/src/reprovider.ts b/packages/kad-dht/src/reprovider.ts index a38e86164a..983249249b 100644 --- a/packages/kad-dht/src/reprovider.ts +++ b/packages/kad-dht/src/reprovider.ts @@ -2,7 +2,7 @@ import { AdaptiveTimeout, Queue } from '@libp2p/utils' import drain from 'it-drain' import { TypedEventEmitter, setMaxListeners } from 'main-event' import { PROVIDERS_VALIDITY, REPROVIDE_CONCURRENCY, REPROVIDE_INTERVAL, REPROVIDE_MAX_QUEUE_SIZE, REPROVIDE_THRESHOLD, REPROVIDE_TIMEOUT } from './constants.js' -import { parseProviderKey, readProviderTime, timeOperationMethod } from './utils.js' +import { convertBuffer, parseProviderKey, readProviderTime, timeOperationMethod } from './utils.js' import type { ContentRouting } from './content-routing/index.js' import type { OperationMetrics } from './kad-dht.js' import type { AbortOptions, ComponentLogger, Logger, Metrics, PeerId } from '@libp2p/interface' @@ -116,11 +116,20 @@ export class Reprovider extends TypedEventEmitter { /** * Check all provider records. Delete them if they have expired, reprovide * them if the provider is us and the expiry is within the reprovide window. + * + * CIDs are queued in Kademlia key order so that XOR-adjacent CIDs are + * reprovided consecutively. Since nearby CIDs in the keyspace share the + * same K closest peers, connections opened for one CID are likely to be + * reused for the next, reducing the number of new dials per reprovide run. */ private async processRecords (options?: AbortOptions): Promise { try { this.safeDispatchEvent('reprovide:start') this.log('starting reprovide/cleanup') + + // collect CIDs that need reproviding so we can sort them before queueing + const toReprovide: CID[] = [] + // Get all provider entries from the datastore for await (const entry of this.datastore.query({ prefix: this.datastorePrefix @@ -143,19 +152,45 @@ export class Reprovider extends TypedEventEmitter { } // if the provider is us and we are within the reprovide threshold, - // reprovide the record + // collect for reproviding if (this.shouldReprovide(isSelf, expires)) { - this.log('reproviding %c as it is within the reprovide threshold (%d)', cid, this.reprovideThreshold) - this.queueReprovide(cid) - .catch(err => { - this.log.error('could not reprovide %c - %e', cid, err) - }) + this.log('scheduling reprovide of %c', cid) + toReprovide.push(cid) } } catch (err: any) { this.log.error('error processing datastore key %s - %s', entry.key, err.message) } } + // sort collected CIDs by their Kademlia key so XOR-adjacent CIDs are + // queued consecutively — peers responsible for one CID are likely to + // also be responsible for adjacent CIDs, so connections can be reused + if (toReprovide.length > 1) { + const kadKeys = await Promise.all( + toReprovide.map(cid => convertBuffer(cid.multihash.bytes, options)) + ) + + const sortable = toReprovide.map((cid, i) => ({ cid, kadKey: kadKeys[i] })) + sortable.sort((a, b) => { + for (let i = 0; i < a.kadKey.length; i++) { + if (a.kadKey[i] !== b.kadKey[i]) { + return a.kadKey[i] - b.kadKey[i] + } + } + return 0 + }) + + toReprovide.splice(0, toReprovide.length, ...sortable.map(({ cid }) => cid)) + } + + // queue reprovides in Kademlia key order + for (const cid of toReprovide) { + this.queueReprovide(cid) + .catch(err => { + this.log.error('could not reprovide %c - %e', cid, err) + }) + } + this.log('reprovide/cleanup successful') } finally { this.safeDispatchEvent('reprovide:end') diff --git a/packages/kad-dht/test/content-routing.spec.ts b/packages/kad-dht/test/content-routing.spec.ts index dbe42a24de..9cc760c47a 100644 --- a/packages/kad-dht/test/content-routing.spec.ts +++ b/packages/kad-dht/test/content-routing.spec.ts @@ -354,4 +354,34 @@ describe('content routing', () => { expect(sendMessageSpy.callCount).to.equal(initialMessageCalls, 'No new network calls should be made after abort') }) + + it('queries connection status to prioritize connected peers when sending ADD_PROVIDER', async function () { + this.timeout(20 * 1000) + + const dhts = await sortDHTs(await Promise.all([ + testDHT.spawn(), + testDHT.spawn(), + testDHT.spawn(), + testDHT.spawn() + ]), await kadUtils.convertBuffer(cid.multihash.bytes)) + + // connect all peers to the provider (dhts[3]) so they appear in its routing table + await Promise.all([ + testDHT.connect(dhts[0], dhts[3]), + testDHT.connect(dhts[1], dhts[3]), + testDHT.connect(dhts[2], dhts[3]) + ]) + + // getConnections is already a sinon stub from stubInterface — reset its history + // and verify it gets called during provide() for the connected-peers-first sort + dhts[3].components.connectionManager.getConnections.resetHistory() + + await drain(dhts[3].dht.provide(cid)) + + // getConnections should have been called to check peer connection status + // when sorting the closest peers for ADD_PROVIDER fan-out + expect(dhts[3].components.connectionManager.getConnections.called).to.be.true( + 'getConnections should be called to prioritize connected peers during provide()' + ) + }) }) diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index d0c7c522f9..041a30425f 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -998,4 +998,63 @@ describe('QueryManager', () => { await manager.stop() }) + + it('should prefer connected peers when distributing to disjoint paths', async () => { + // pick peers at higher XOR indices as "connected" and lower-XOR as "disconnected" + // so XOR-distance sorting alone would put disconnected peers first — only our + // connected-first bucket assignment can ensure each path gets a connected peer + const connectedPeers = peers.slice(10, 12).map(p => p.peerId) + const disconnectedPeers = peers.slice(0, 2).map(p => p.peerId) + // routing table returns the 4 peers in arbitrary order + const startingPeers = [...connectedPeers, ...disconnectedPeers] + + const connectionManager = stubInterface({ + isDialable: async () => true + }) + connectionManager.getConnections.callsFake((peerId?: PeerId) => { + if (peerId != null && connectedPeers.some(p => p.equals(peerId))) { + return [{}] as any + } + return [] + }) + + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger(), + connectionManager + }, { + ...defaultInit(), + disjointPaths: 2, + alpha: 1 + }) + await manager.start() + + routingTable.closestPeers.returns(startingPeers) + + // track which peers are queried on each disjoint path + const peersByPath = new Map([[0, []], [1, []]]) + + const queryFunc: QueryFunc = async function * ({ peer, path }) { + peersByPath.get(path.index)?.push(peer.id) + + yield peerResponseEvent({ + from: peer.id, + messageType: MessageType.GET_VALUE, + path + }) + } + + await drain(manager.run(key, queryFunc)) + + // with connected-peers-first sort, even though disconnected peers have lower + // XOR distance, each disjoint path should get at least one connected starting peer + for (const [pathIndex, queriedPeers] of peersByPath) { + const hasConnectedPeer = queriedPeers.some(p => connectedPeers.some(c => c.equals(p))) + expect(hasConnectedPeer).to.be.true( + `path ${pathIndex} should have at least one connected starting peer` + ) + } + + await manager.stop() + }) }) diff --git a/packages/kad-dht/test/reprovider.spec.ts b/packages/kad-dht/test/reprovider.spec.ts index 803599bd9b..97af65f31c 100644 --- a/packages/kad-dht/test/reprovider.spec.ts +++ b/packages/kad-dht/test/reprovider.spec.ts @@ -8,6 +8,7 @@ import { pEvent } from 'p-event' import { stubInterface } from 'sinon-ts' import { Providers } from '../src/providers.js' import { Reprovider } from '../src/reprovider.js' +import { convertBuffer } from '../src/utils.js' import { createPeerIdWithPrivateKey, createPeerIdsWithPrivateKey } from './utils/create-peer-id.js' import type { PeerAndKey } from './utils/create-peer-id.js' import type { ContentRouting } from '../src/content-routing/index.js' @@ -158,6 +159,101 @@ describe('reprovider', () => { expect(provsAfter[0].toString()).to.equal(components.peerId.toString()) }) + it('should reprovide in Kademlia key order', async function () { + this.timeout(5000) + + // five well-known IPFS CIDs — their Kademlia keys will be in some order + // that is unlikely to match the insertion order below + const cids = [ + CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb'), + CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n'), + CID.parse('QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL'), + CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB'), + CID.parse('QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN') + ] + + // compute expected Kademlia key order — use multihash bytes as canonical + // identity since parseProviderKey always reconstructs CIDs as CIDv1/raw + const kadKeys = await Promise.all(cids.map(cid => convertBuffer(cid.multihash.bytes))) + const expectedMultihashes = cids + .map((cid, i) => ({ multihash: cid.multihash.bytes, kadKey: kadKeys[i] })) + .sort((a, b) => { + for (let i = 0; i < a.kadKey.length; i++) { + if (a.kadKey[i] !== b.kadKey[i]) { + return a.kadKey[i] - b.kadKey[i] + } + } + return 0 + }) + .map(({ multihash }) => multihash) + + // insert CIDs in REVERSE expected order to prove sorting overrides insertion order + for (const { multihash } of [...expectedMultihashes].reverse().map((m, i) => ({ multihash: m, i }))) { + const cid = cids.find(c => c.multihash.bytes === multihash) ?? + cids.find(c => c.multihash.bytes.every((b, j) => b === multihash[j])) + if (cid != null) { + await providers.addProvider(cid, components.peerId) + } + } + + // recreate reprovider with concurrency=1 so provides are strictly sequential + reprovider = new Reprovider(components, { + logPrefix: 'libp2p', + datastorePrefix: '/dht', + metricsPrefix: '', + contentRouting, + threshold: 100, + validity: 200, + interval: 200, + concurrency: 1, + operationMetrics: {} + }) + + const provisionMultihashes: Uint8Array[] = [] + + // resolve when all CIDs have been provided + let resolveWhenDone!: () => void + const whenAllDone = new Promise(resolve => { resolveWhenDone = resolve }) + let provided = 0 + + contentRouting.provide.callsFake(async function * (cid: CID) { + provisionMultihashes.push(cid.multihash.bytes) + provided++ + if (provided === cids.length) { + resolveWhenDone() + } + yield * [] + }) + + await start(reprovider) + await pEvent(reprovider, 'reprovide:start') + await pEvent(reprovider, 'reprovide:end') + + // wait for the queue to finish processing all enqueued reprovides + await whenAllDone + + // verify CIDs were provided in Kademlia key order by checking each + // adjacent pair maintains non-decreasing Kademlia key order + expect(provisionMultihashes).to.have.lengthOf(cids.length) + + for (let i = 1; i < provisionMultihashes.length; i++) { + const prevKey = await convertBuffer(provisionMultihashes[i - 1]) + const currKey = await convertBuffer(provisionMultihashes[i]) + + let comparison = 0 + for (let j = 0; j < prevKey.length; j++) { + if (prevKey[j] !== currKey[j]) { + comparison = prevKey[j] - currKey[j] + break + } + } + + expect(comparison).to.be.lessThanOrEqual(0, + `CID at position ${i - 1} should have a smaller or equal Kademlia key than position ${i}` + ) + } + }) + describe('shouldReprovide', () => { it('should return false for non-self providers', () => { const expires = Date.now() + 50