Skip to content

Commit faa87be

Browse files
paschal533claude
andcommitted
fix(kad-dht): reprovide CIDs in Kademlia key order to reduce new dials
Sort CIDs by their Kademlia key before queueing reprovides so that XOR-adjacent CIDs are processed consecutively. Nearby CIDs in the keyspace share the same K closest peers, so connections opened for one CID are likely to be reused for the next, reducing the number of new dials per reprovide run (Amino DHT SweepingProvider optimisation). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 3fcd078 commit faa87be

2 files changed

Lines changed: 138 additions & 7 deletions

File tree

packages/kad-dht/src/reprovider.ts

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { AdaptiveTimeout, Queue } from '@libp2p/utils'
22
import drain from 'it-drain'
33
import { TypedEventEmitter, setMaxListeners } from 'main-event'
44
import { PROVIDERS_VALIDITY, REPROVIDE_CONCURRENCY, REPROVIDE_INTERVAL, REPROVIDE_MAX_QUEUE_SIZE, REPROVIDE_THRESHOLD, REPROVIDE_TIMEOUT } from './constants.js'
5-
import { parseProviderKey, readProviderTime, timeOperationMethod } from './utils.js'
5+
import { convertBuffer, parseProviderKey, readProviderTime, timeOperationMethod } from './utils.js'
66
import type { ContentRouting } from './content-routing/index.js'
77
import type { OperationMetrics } from './kad-dht.js'
88
import type { AbortOptions, ComponentLogger, Logger, Metrics, PeerId } from '@libp2p/interface'
@@ -116,11 +116,20 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
116116
/**
117117
* Check all provider records. Delete them if they have expired, reprovide
118118
* them if the provider is us and the expiry is within the reprovide window.
119+
*
120+
* CIDs are queued in Kademlia key order so that XOR-adjacent CIDs are
121+
* reprovided consecutively. Since nearby CIDs in the keyspace share the
122+
* same K closest peers, connections opened for one CID are likely to be
123+
* reused for the next, reducing the number of new dials per reprovide run.
119124
*/
120125
private async processRecords (options?: AbortOptions): Promise<void> {
121126
try {
122127
this.safeDispatchEvent('reprovide:start')
123128
this.log('starting reprovide/cleanup')
129+
130+
// collect CIDs that need reproviding so we can sort them before queueing
131+
const toReprovide: CID[] = []
132+
124133
// Get all provider entries from the datastore
125134
for await (const entry of this.datastore.query({
126135
prefix: this.datastorePrefix
@@ -143,19 +152,45 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
143152
}
144153

145154
// if the provider is us and we are within the reprovide threshold,
146-
// reprovide the record
155+
// collect for reprovision
147156
if (this.shouldReprovide(isSelf, expires)) {
148-
this.log('reproviding %c as it is within the reprovide threshold (%d)', cid, this.reprovideThreshold)
149-
this.queueReprovide(cid)
150-
.catch(err => {
151-
this.log.error('could not reprovide %c - %e', cid, err)
152-
})
157+
this.log('scheduling reprovide of %c', cid)
158+
toReprovide.push(cid)
153159
}
154160
} catch (err: any) {
155161
this.log.error('error processing datastore key %s - %s', entry.key, err.message)
156162
}
157163
}
158164

165+
// sort collected CIDs by their Kademlia key so XOR-adjacent CIDs are
166+
// queued consecutively — peers responsible for one CID are likely to
167+
// also be responsible for adjacent CIDs, so connections can be reused
168+
if (toReprovide.length > 1) {
169+
const kadKeys = await Promise.all(
170+
toReprovide.map(cid => convertBuffer(cid.multihash.bytes, options))
171+
)
172+
173+
const sortable = toReprovide.map((cid, i) => ({ cid, kadKey: kadKeys[i] }))
174+
sortable.sort((a, b) => {
175+
for (let i = 0; i < a.kadKey.length; i++) {
176+
if (a.kadKey[i] !== b.kadKey[i]) {
177+
return a.kadKey[i] - b.kadKey[i]
178+
}
179+
}
180+
return 0
181+
})
182+
183+
toReprovide.splice(0, toReprovide.length, ...sortable.map(({ cid }) => cid))
184+
}
185+
186+
// queue reprovides in Kademlia key order
187+
for (const cid of toReprovide) {
188+
this.queueReprovide(cid)
189+
.catch(err => {
190+
this.log.error('could not reprovide %c - %e', cid, err)
191+
})
192+
}
193+
159194
this.log('reprovide/cleanup successful')
160195
} finally {
161196
this.safeDispatchEvent('reprovide:end')

packages/kad-dht/test/reprovider.spec.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { pEvent } from 'p-event'
88
import { stubInterface } from 'sinon-ts'
99
import { Providers } from '../src/providers.js'
1010
import { Reprovider } from '../src/reprovider.js'
11+
import { convertBuffer } from '../src/utils.js'
1112
import { createPeerIdWithPrivateKey, createPeerIdsWithPrivateKey } from './utils/create-peer-id.js'
1213
import type { PeerAndKey } from './utils/create-peer-id.js'
1314
import type { ContentRouting } from '../src/content-routing/index.js'
@@ -158,6 +159,101 @@ describe('reprovider', () => {
158159
expect(provsAfter[0].toString()).to.equal(components.peerId.toString())
159160
})
160161

162+
it('should reprovide in Kademlia key order', async function () {
163+
this.timeout(5000)
164+
165+
// five well-known IPFS CIDs — their Kademlia keys will be in some order
166+
// that is unlikely to match the insertion order below
167+
const cids = [
168+
CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb'),
169+
CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n'),
170+
CID.parse('QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL'),
171+
CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB'),
172+
CID.parse('QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN')
173+
]
174+
175+
// compute expected Kademlia key order — use multihash bytes as canonical
176+
// identity since parseProviderKey always reconstructs CIDs as CIDv1/raw
177+
const kadKeys = await Promise.all(cids.map(cid => convertBuffer(cid.multihash.bytes)))
178+
const expectedMultihashes = cids
179+
.map((cid, i) => ({ multihash: cid.multihash.bytes, kadKey: kadKeys[i] }))
180+
.sort((a, b) => {
181+
for (let i = 0; i < a.kadKey.length; i++) {
182+
if (a.kadKey[i] !== b.kadKey[i]) {
183+
return a.kadKey[i] - b.kadKey[i]
184+
}
185+
}
186+
return 0
187+
})
188+
.map(({ multihash }) => multihash)
189+
190+
// insert CIDs in REVERSE expected order to prove sorting overrides insertion order
191+
for (const { multihash } of [...expectedMultihashes].reverse().map((m, i) => ({ multihash: m, i }))) {
192+
const cid = cids.find(c => c.multihash.bytes === multihash) ??
193+
cids.find(c => c.multihash.bytes.every((b, j) => b === multihash[j]))
194+
if (cid != null) {
195+
await providers.addProvider(cid, components.peerId)
196+
}
197+
}
198+
199+
// recreate reprovider with concurrency=1 so provides are strictly sequential
200+
reprovider = new Reprovider(components, {
201+
logPrefix: 'libp2p',
202+
datastorePrefix: '/dht',
203+
metricsPrefix: '',
204+
contentRouting,
205+
threshold: 100,
206+
validity: 200,
207+
interval: 200,
208+
concurrency: 1,
209+
operationMetrics: {}
210+
})
211+
212+
const provisionMultihashes: Uint8Array[] = []
213+
214+
// resolve when all CIDs have been provided
215+
let resolveWhenDone!: () => void
216+
const whenAllDone = new Promise<void>(resolve => { resolveWhenDone = resolve })
217+
let provided = 0
218+
219+
contentRouting.provide.callsFake(async function * (cid: CID) {
220+
provisionMultihashes.push(cid.multihash.bytes)
221+
provided++
222+
if (provided === cids.length) {
223+
resolveWhenDone()
224+
}
225+
yield * []
226+
})
227+
228+
await start(reprovider)
229+
await pEvent(reprovider, 'reprovide:start')
230+
await pEvent(reprovider, 'reprovide:end')
231+
232+
// wait for the queue to finish processing all enqueued reprovides
233+
await whenAllDone
234+
235+
// verify CIDs were provided in Kademlia key order by checking each
236+
// adjacent pair maintains non-decreasing Kademlia key order
237+
expect(provisionMultihashes).to.have.lengthOf(cids.length)
238+
239+
for (let i = 1; i < provisionMultihashes.length; i++) {
240+
const prevKey = await convertBuffer(provisionMultihashes[i - 1])
241+
const currKey = await convertBuffer(provisionMultihashes[i])
242+
243+
let comparison = 0
244+
for (let j = 0; j < prevKey.length; j++) {
245+
if (prevKey[j] !== currKey[j]) {
246+
comparison = prevKey[j] - currKey[j]
247+
break
248+
}
249+
}
250+
251+
expect(comparison).to.be.lessThanOrEqual(0,
252+
`CID at position ${i - 1} should have a smaller or equal Kademlia key than position ${i}`
253+
)
254+
}
255+
})
256+
161257
describe('shouldReprovide', () => {
162258
it('should return false for non-self providers', () => {
163259
const expires = Date.now() + 50

0 commit comments

Comments
 (0)