Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions packages/kad-dht/src/content-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,24 @@ export class ContentRouting {
finalPeerEvents.push(event)
}

finalPeerEvents.forEach(event => {
queue.add(async () => {
for await (const notifyEvent of publishProviderRecord(event)) {
events.push(notifyEvent)
}
// sort connected peers first to reuse existing connections before
// opening new ones when sending ADD_PROVIDER records
finalPeerEvents
.sort((a, b) => {
const aConnected = (self.components.connectionManager.getConnections(a.peer.id) ?? []).length > 0 ? 0 : 1
const bConnected = (self.components.connectionManager.getConnections(b.peer.id) ?? []).length > 0 ? 0 : 1
return aConnected - bConnected
})
.catch(err => {
this.log.error('error publishing provider record to peer - %e', err)
.forEach(event => {
queue.add(async () => {
for await (const notifyEvent of publishProviderRecord(event)) {
events.push(notifyEvent)
}
})
})
.catch(err => {
this.log.error('error publishing provider record to peer - %e', err)
})
})
})
.catch(err => {
events.end(err)
Expand Down
16 changes: 11 additions & 5 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,19 @@ export class QueryManager implements Startable {
count: this.routingTable.kBucketSize
})

// split peers into d buckets evenly(ish)
const peersToQuery = peers.sort(() => {
if (Math.random() > 0.5) {
return 1
// split peers into d buckets evenly(ish), preferring already-connected
// peers to avoid opening new connections unnecessarily
const peersToQuery = peers.sort((a, b) => {
const aConnected = (this.connectionManager.getConnections(a) ?? []).length > 0 ? 0 : 1
const bConnected = (this.connectionManager.getConnections(b) ?? []).length > 0 ? 0 : 1

if (aConnected !== bConnected) {
// put connected peers first
return aConnected - bConnected
}

return -1
// randomize within each group to maintain path diversity
return Math.random() > 0.5 ? 1 : -1
})
.reduce((acc: PeerId[][], curr, index) => {
acc[index % this.disjointPaths].push(curr)
Expand Down
49 changes: 42 additions & 7 deletions packages/kad-dht/src/reprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AdaptiveTimeout, Queue } from '@libp2p/utils'
import drain from 'it-drain'
import { TypedEventEmitter, setMaxListeners } from 'main-event'
import { PROVIDERS_VALIDITY, REPROVIDE_CONCURRENCY, REPROVIDE_INTERVAL, REPROVIDE_MAX_QUEUE_SIZE, REPROVIDE_THRESHOLD, REPROVIDE_TIMEOUT } from './constants.js'
import { parseProviderKey, readProviderTime, timeOperationMethod } from './utils.js'
import { convertBuffer, parseProviderKey, readProviderTime, timeOperationMethod } from './utils.js'
import type { ContentRouting } from './content-routing/index.js'
import type { OperationMetrics } from './kad-dht.js'
import type { AbortOptions, ComponentLogger, Logger, Metrics, PeerId } from '@libp2p/interface'
Expand Down Expand Up @@ -116,11 +116,20 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
/**
* Check all provider records. Delete them if they have expired, reprovide
* them if the provider is us and the expiry is within the reprovide window.
*
* CIDs are queued in Kademlia key order so that XOR-adjacent CIDs are
* reprovided consecutively. Since nearby CIDs in the keyspace share the
* same K closest peers, connections opened for one CID are likely to be
* reused for the next, reducing the number of new dials per reprovide run.
*/
private async processRecords (options?: AbortOptions): Promise<void> {
try {
this.safeDispatchEvent('reprovide:start')
this.log('starting reprovide/cleanup')

// collect CIDs that need reproviding so we can sort them before queueing
const toReprovide: CID[] = []

// Get all provider entries from the datastore
for await (const entry of this.datastore.query({
prefix: this.datastorePrefix
Expand All @@ -143,19 +152,45 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
}

// if the provider is us and we are within the reprovide threshold,
// reprovide the record
// collect for reproviding
if (this.shouldReprovide(isSelf, expires)) {
this.log('reproviding %c as it is within the reprovide threshold (%d)', cid, this.reprovideThreshold)
this.queueReprovide(cid)
.catch(err => {
this.log.error('could not reprovide %c - %e', cid, err)
})
this.log('scheduling reprovide of %c', cid)
toReprovide.push(cid)
}
} catch (err: any) {
this.log.error('error processing datastore key %s - %s', entry.key, err.message)
}
}

// sort collected CIDs by their Kademlia key so XOR-adjacent CIDs are
// queued consecutively — peers responsible for one CID are likely to
// also be responsible for adjacent CIDs, so connections can be reused
if (toReprovide.length > 1) {
const kadKeys = await Promise.all(
toReprovide.map(cid => convertBuffer(cid.multihash.bytes, options))
)

const sortable = toReprovide.map((cid, i) => ({ cid, kadKey: kadKeys[i] }))
sortable.sort((a, b) => {
for (let i = 0; i < a.kadKey.length; i++) {
if (a.kadKey[i] !== b.kadKey[i]) {
return a.kadKey[i] - b.kadKey[i]
}
}
return 0
})

toReprovide.splice(0, toReprovide.length, ...sortable.map(({ cid }) => cid))
}

// queue reprovides in Kademlia key order
for (const cid of toReprovide) {
this.queueReprovide(cid)
.catch(err => {
this.log.error('could not reprovide %c - %e', cid, err)
})
}

this.log('reprovide/cleanup successful')
} finally {
this.safeDispatchEvent('reprovide:end')
Expand Down
30 changes: 30 additions & 0 deletions packages/kad-dht/test/content-routing.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,34 @@ describe('content routing', () => {
expect(sendMessageSpy.callCount).to.equal(initialMessageCalls,
'No new network calls should be made after abort')
})

it('queries connection status to prioritize connected peers when sending ADD_PROVIDER', async function () {
this.timeout(20 * 1000)

const dhts = await sortDHTs(await Promise.all([
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn(),
testDHT.spawn()
]), await kadUtils.convertBuffer(cid.multihash.bytes))

// connect all peers to the provider (dhts[3]) so they appear in its routing table
await Promise.all([
testDHT.connect(dhts[0], dhts[3]),
testDHT.connect(dhts[1], dhts[3]),
testDHT.connect(dhts[2], dhts[3])
])

// getConnections is already a sinon stub from stubInterface — reset its history
// and verify it gets called during provide() for the connected-peers-first sort
dhts[3].components.connectionManager.getConnections.resetHistory()

await drain(dhts[3].dht.provide(cid))

// getConnections should have been called to check peer connection status
// when sorting the closest peers for ADD_PROVIDER fan-out
expect(dhts[3].components.connectionManager.getConnections.called).to.be.true(
'getConnections should be called to prioritize connected peers during provide()'
)
})
})
59 changes: 59 additions & 0 deletions packages/kad-dht/test/query.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -998,4 +998,63 @@ describe('QueryManager', () => {

await manager.stop()
})

it('should prefer connected peers when distributing to disjoint paths', async () => {
// pick peers at higher XOR indices as "connected" and lower-XOR as "disconnected"
// so XOR-distance sorting alone would put disconnected peers first — only our
// connected-first bucket assignment can ensure each path gets a connected peer
const connectedPeers = peers.slice(10, 12).map(p => p.peerId)
const disconnectedPeers = peers.slice(0, 2).map(p => p.peerId)
// routing table returns the 4 peers in arbitrary order
const startingPeers = [...connectedPeers, ...disconnectedPeers]

const connectionManager = stubInterface<ConnectionManager>({
isDialable: async () => true
})
connectionManager.getConnections.callsFake((peerId?: PeerId) => {
if (peerId != null && connectedPeers.some(p => p.equals(peerId))) {
return [{}] as any
}
return []
})

const manager = new QueryManager({
peerId: ourPeerId,
logger: defaultLogger(),
connectionManager
}, {
...defaultInit(),
disjointPaths: 2,
alpha: 1
})
await manager.start()

routingTable.closestPeers.returns(startingPeers)

// track which peers are queried on each disjoint path
const peersByPath = new Map<number, PeerId[]>([[0, []], [1, []]])

const queryFunc: QueryFunc = async function * ({ peer, path }) {
peersByPath.get(path.index)?.push(peer.id)

yield peerResponseEvent({
from: peer.id,
messageType: MessageType.GET_VALUE,
path
})
}

await drain(manager.run(key, queryFunc))

// with connected-peers-first sort, even though disconnected peers have lower
// XOR distance, each disjoint path should get at least one connected starting peer
for (const [pathIndex, queriedPeers] of peersByPath) {
const hasConnectedPeer = queriedPeers.some(p => connectedPeers.some(c => c.equals(p)))
expect(hasConnectedPeer).to.be.true(
`path ${pathIndex} should have at least one connected starting peer`
)
}

await manager.stop()
})
})
96 changes: 96 additions & 0 deletions packages/kad-dht/test/reprovider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { pEvent } from 'p-event'
import { stubInterface } from 'sinon-ts'
import { Providers } from '../src/providers.js'
import { Reprovider } from '../src/reprovider.js'
import { convertBuffer } from '../src/utils.js'
import { createPeerIdWithPrivateKey, createPeerIdsWithPrivateKey } from './utils/create-peer-id.js'
import type { PeerAndKey } from './utils/create-peer-id.js'
import type { ContentRouting } from '../src/content-routing/index.js'
Expand Down Expand Up @@ -158,6 +159,101 @@ describe('reprovider', () => {
expect(provsAfter[0].toString()).to.equal(components.peerId.toString())
})

it('should reprovide in Kademlia key order', async function () {
this.timeout(5000)

// five well-known IPFS CIDs — their Kademlia keys will be in some order
// that is unlikely to match the insertion order below
const cids = [
CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb'),
CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n'),
CID.parse('QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL'),
CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB'),
CID.parse('QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN')
]

// compute expected Kademlia key order — use multihash bytes as canonical
// identity since parseProviderKey always reconstructs CIDs as CIDv1/raw
const kadKeys = await Promise.all(cids.map(cid => convertBuffer(cid.multihash.bytes)))
const expectedMultihashes = cids
.map((cid, i) => ({ multihash: cid.multihash.bytes, kadKey: kadKeys[i] }))
.sort((a, b) => {
for (let i = 0; i < a.kadKey.length; i++) {
if (a.kadKey[i] !== b.kadKey[i]) {
return a.kadKey[i] - b.kadKey[i]
}
}
return 0
})
.map(({ multihash }) => multihash)

// insert CIDs in REVERSE expected order to prove sorting overrides insertion order
for (const { multihash } of [...expectedMultihashes].reverse().map((m, i) => ({ multihash: m, i }))) {
const cid = cids.find(c => c.multihash.bytes === multihash) ??
cids.find(c => c.multihash.bytes.every((b, j) => b === multihash[j]))
if (cid != null) {
await providers.addProvider(cid, components.peerId)
}
}

// recreate reprovider with concurrency=1 so provides are strictly sequential
reprovider = new Reprovider(components, {
logPrefix: 'libp2p',
datastorePrefix: '/dht',
metricsPrefix: '',
contentRouting,
threshold: 100,
validity: 200,
interval: 200,
concurrency: 1,
operationMetrics: {}
})

const provisionMultihashes: Uint8Array[] = []

// resolve when all CIDs have been provided
let resolveWhenDone!: () => void
const whenAllDone = new Promise<void>(resolve => { resolveWhenDone = resolve })
let provided = 0

contentRouting.provide.callsFake(async function * (cid: CID) {
provisionMultihashes.push(cid.multihash.bytes)
provided++
if (provided === cids.length) {
resolveWhenDone()
}
yield * []
})

await start(reprovider)
await pEvent(reprovider, 'reprovide:start')
await pEvent(reprovider, 'reprovide:end')

// wait for the queue to finish processing all enqueued reprovides
await whenAllDone

// verify CIDs were provided in Kademlia key order by checking each
// adjacent pair maintains non-decreasing Kademlia key order
expect(provisionMultihashes).to.have.lengthOf(cids.length)

for (let i = 1; i < provisionMultihashes.length; i++) {
const prevKey = await convertBuffer(provisionMultihashes[i - 1])
const currKey = await convertBuffer(provisionMultihashes[i])

let comparison = 0
for (let j = 0; j < prevKey.length; j++) {
if (prevKey[j] !== currKey[j]) {
comparison = prevKey[j] - currKey[j]
break
}
}

expect(comparison).to.be.lessThanOrEqual(0,
`CID at position ${i - 1} should have a smaller or equal Kademlia key than position ${i}`
)
}
})

describe('shouldReprovide', () => {
it('should return false for non-self providers', () => {
const expires = Date.now() + 50
Expand Down
Loading