Skip to content

Commit e2d6d61

Browse files
committed
fix(kad-dht): sort reprovide CIDs in bounded batches to avoid OOM
Replaces the unbounded toReprovide array with a fixed-size batch approach. CIDs are accumulated into a batch of at most sortBatchSize (default 512) entries; each full batch is sorted by Kademlia key and queued before the next batch begins. A final flush handles the remainder. This bounds peak memory to O(sortBatchSize) regardless of the total number of stored CIDs, while still delivering the connection- reuse benefit for XOR-adjacent CIDs within each batch. Also adds sortBatchSize to ReproviderInit and a test that verifies within-batch Kademlia ordering when batches are smaller than the CID count.
1 parent 9d387dd commit e2d6d61

3 files changed

Lines changed: 124 additions & 31 deletions

File tree

packages/kad-dht/src/constants.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ export const REPROVIDE_INTERVAL = hour
2828
// How long to reprovide for
2929
export const REPROVIDE_TIMEOUT = hour
3030

31+
// How many CIDs to sort at once during reprovide (bounds memory use)
32+
export const REPROVIDE_SORT_BATCH_SIZE = 512
33+
3134
export const READ_MESSAGE_TIMEOUT = 10 * second
3235

3336
// How long to process newly connected peers for

packages/kad-dht/src/reprovider.ts

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { AdaptiveTimeout, Queue } from '@libp2p/utils'
22
import drain from 'it-drain'
33
import { TypedEventEmitter, setMaxListeners } from 'main-event'
4-
import { PROVIDERS_VALIDITY, REPROVIDE_CONCURRENCY, REPROVIDE_INTERVAL, REPROVIDE_MAX_QUEUE_SIZE, REPROVIDE_THRESHOLD, REPROVIDE_TIMEOUT } from './constants.ts'
4+
import { PROVIDERS_VALIDITY, REPROVIDE_CONCURRENCY, REPROVIDE_INTERVAL, REPROVIDE_MAX_QUEUE_SIZE, REPROVIDE_SORT_BATCH_SIZE, REPROVIDE_THRESHOLD, REPROVIDE_TIMEOUT } from './constants.ts'
55
import { convertBuffer, parseProviderKey, readProviderTime, timeOperationMethod } from './utils.ts'
66
import type { ContentRouting } from './content-routing/index.ts'
77
import type { OperationMetrics } from './kad-dht.ts'
@@ -27,6 +27,7 @@ export interface ReproviderInit {
2727
operationMetrics: OperationMetrics
2828
concurrency?: number
2929
maxQueueSize?: number
30+
sortBatchSize?: number
3031
threshold?: number
3132
validity?: number
3233
interval?: number
@@ -52,6 +53,7 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
5253
private running: boolean
5354
private shutdownController?: AbortController
5455
private readonly reprovideThreshold: number
56+
private readonly sortBatchSize: number
5557
private readonly contentRouting: ContentRouting
5658
private readonly datastorePrefix: string
5759
private readonly addressManager: AddressManager
@@ -78,6 +80,7 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
7880
this.addressManager = components.addressManager
7981
this.datastorePrefix = `${init.datastorePrefix}/provider`
8082
this.reprovideThreshold = init.threshold ?? REPROVIDE_THRESHOLD
83+
this.sortBatchSize = init.sortBatchSize ?? REPROVIDE_SORT_BATCH_SIZE
8184
this.maxQueueSize = init.maxQueueSize ?? REPROVIDE_MAX_QUEUE_SIZE
8285
this.validity = init.validity ?? PROVIDERS_VALIDITY
8386
this.interval = init.interval ?? REPROVIDE_INTERVAL
@@ -121,14 +124,18 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
121124
* reprovided consecutively. Since nearby CIDs in the keyspace share the
122125
* same K closest peers, connections opened for one CID are likely to be
123126
* reused for the next, reducing the number of new dials per reprovide run.
127+
*
128+
* To avoid unbounded memory growth, CIDs are sorted and queued in batches
129+
* of at most `sortBatchSize` entries.
124130
*/
125131
private async processRecords (options?: AbortOptions): Promise<void> {
126132
try {
127133
this.safeDispatchEvent('reprovide:start')
128134
this.log('starting reprovide/cleanup')
129135

130-
// collect CIDs that need reproviding so we can sort them before queueing
131-
const toReprovide: CID[] = []
136+
// Accumulate CIDs for batched Kademlia-key sorting. Flushed every
137+
// sortBatchSize entries so the array never grows without bound.
138+
const batch: CID[] = []
132139

133140
// Get all provider entries from the datastore
134141
for await (const entry of this.datastore.query({
@@ -155,40 +162,19 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
155162
// collect for reproviding
156163
if (this.shouldReprovide(isSelf, expires)) {
157164
this.log('scheduling reprovide of %c', cid)
158-
toReprovide.push(cid)
165+
batch.push(cid)
166+
167+
if (batch.length >= this.sortBatchSize) {
168+
await this.sortAndQueueBatch(batch.splice(0), options)
169+
}
159170
}
160171
} catch (err: any) {
161172
this.log.error('error processing datastore key %s - %s', entry.key, err.message)
162173
}
163174
}
164175

165-
// sort collected CIDs by their Kademlia key so XOR-adjacent CIDs are
166-
// queued consecutively — peers responsible for one CID are likely to
167-
// also be responsible for adjacent CIDs, so connections can be reused
168-
if (toReprovide.length > 1) {
169-
const kadKeys = await Promise.all(
170-
toReprovide.map(cid => convertBuffer(cid.multihash.bytes, options))
171-
)
172-
173-
const sortable = toReprovide.map((cid, i) => ({ cid, kadKey: kadKeys[i] }))
174-
sortable.sort((a, b) => {
175-
for (let i = 0; i < a.kadKey.length; i++) {
176-
if (a.kadKey[i] !== b.kadKey[i]) {
177-
return a.kadKey[i] - b.kadKey[i]
178-
}
179-
}
180-
return 0
181-
})
182-
183-
toReprovide.splice(0, toReprovide.length, ...sortable.map(({ cid }) => cid))
184-
}
185-
186-
// queue reprovides in Kademlia key order
187-
for (const cid of toReprovide) {
188-
this.queueReprovide(cid)
189-
.catch(err => {
190-
this.log.error('could not reprovide %c - %e', cid, err)
191-
})
176+
if (batch.length > 0) {
177+
await this.sortAndQueueBatch(batch, options)
192178
}
193179

194180
this.log('reprovide/cleanup successful')
@@ -208,6 +194,33 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
208194
}
209195
}
210196

197+
private async sortAndQueueBatch (batch: CID[], options?: AbortOptions): Promise<void> {
198+
if (batch.length > 1) {
199+
const kadKeys = await Promise.all(
200+
batch.map(cid => convertBuffer(cid.multihash.bytes, options))
201+
)
202+
203+
const sortable = batch.map((cid, i) => ({ cid, kadKey: kadKeys[i] }))
204+
sortable.sort((a, b) => {
205+
for (let i = 0; i < a.kadKey.length; i++) {
206+
if (a.kadKey[i] !== b.kadKey[i]) {
207+
return a.kadKey[i] - b.kadKey[i]
208+
}
209+
}
210+
return 0
211+
})
212+
213+
batch = sortable.map(({ cid }) => cid)
214+
}
215+
216+
for (const cid of batch) {
217+
this.queueReprovide(cid)
218+
.catch(err => {
219+
this.log.error('could not reprovide %c - %e', cid, err)
220+
})
221+
}
222+
}
223+
211224
/**
212225
* Determines if a record should be reprovided
213226
*/

packages/kad-dht/test/reprovider.spec.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ describe('reprovider', () => {
197197
}
198198

199199
// recreate reprovider with concurrency=1 so provides are strictly sequential
200+
// sortBatchSize >= cids.length so all CIDs are sorted in one batch
200201
reprovider = new Reprovider(components, {
201202
logPrefix: 'libp2p',
202203
datastorePrefix: '/dht',
@@ -206,6 +207,7 @@ describe('reprovider', () => {
206207
validity: 200,
207208
interval: 200,
208209
concurrency: 1,
210+
sortBatchSize: cids.length,
209211
operationMetrics: {}
210212
})
211213

@@ -254,6 +256,81 @@ describe('reprovider', () => {
254256
}
255257
})
256258

259+
it('should sort within each batch when CID count exceeds sortBatchSize', async function () {
260+
this.timeout(5000)
261+
262+
const cids = [
263+
CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb'),
264+
CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n'),
265+
CID.parse('QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL'),
266+
CID.parse('QmPZ9gcCEpqKTo6aq61g2nXGUhM4iCL3ewB6LDXZCtioEB'),
267+
CID.parse('QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN')
268+
]
269+
270+
for (const cid of cids) {
271+
await providers.addProvider(cid, components.peerId)
272+
}
273+
274+
// sortBatchSize=2: CIDs are sorted in batches of 2. Ordering within each
275+
// batch is verified; cross-batch ordering is intentionally not guaranteed.
276+
reprovider = new Reprovider(components, {
277+
logPrefix: 'libp2p',
278+
datastorePrefix: '/dht',
279+
metricsPrefix: '',
280+
contentRouting,
281+
threshold: 100,
282+
validity: 200,
283+
interval: 200,
284+
concurrency: 1,
285+
sortBatchSize: 2,
286+
operationMetrics: {}
287+
})
288+
289+
const provisionMultihashes: Uint8Array[] = []
290+
291+
let resolveWhenDone!: () => void
292+
const whenAllDone = new Promise<void>(resolve => { resolveWhenDone = resolve })
293+
let provided = 0
294+
295+
contentRouting.provide.callsFake(async function * (cid: CID) {
296+
provisionMultihashes.push(cid.multihash.bytes)
297+
provided++
298+
if (provided === cids.length) {
299+
resolveWhenDone()
300+
}
301+
yield * []
302+
})
303+
304+
await start(reprovider)
305+
await pEvent(reprovider, 'reprovide:start')
306+
await pEvent(reprovider, 'reprovide:end')
307+
await whenAllDone
308+
309+
expect(provisionMultihashes).to.have.lengthOf(cids.length)
310+
311+
// verify ordering within each batch of sortBatchSize
312+
const batchSize = 2
313+
for (let b = 0; b < provisionMultihashes.length; b += batchSize) {
314+
const batchEnd = Math.min(b + batchSize, provisionMultihashes.length)
315+
for (let i = b + 1; i < batchEnd; i++) {
316+
const prevKey = await convertBuffer(provisionMultihashes[i - 1])
317+
const currKey = await convertBuffer(provisionMultihashes[i])
318+
319+
let comparison = 0
320+
for (let j = 0; j < prevKey.length; j++) {
321+
if (prevKey[j] !== currKey[j]) {
322+
comparison = prevKey[j] - currKey[j]
323+
break
324+
}
325+
}
326+
327+
expect(comparison).to.be.lessThanOrEqual(0,
328+
`within batch: CID at position ${i - 1} should have a smaller or equal Kademlia key than position ${i}`
329+
)
330+
}
331+
}
332+
})
333+
257334
describe('shouldReprovide', () => {
258335
it('should return false for non-self providers', () => {
259336
const expires = Date.now() + 50

0 commit comments

Comments
 (0)