Skip to content
Open
5 changes: 5 additions & 0 deletions packages/protocol-identify/src/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ export const MULTICODEC_IDENTIFY_PUSH_PROTOCOL_VERSION = '1.0.0'
// https://github.com/libp2p/go-libp2p/blob/8d2e54e1637041d5cf4fac1e531287560bd1f4ac/p2p/protocol/identify/id.go#L52
export const MAX_IDENTIFY_MESSAGE_SIZE = 1024 * 8

// Large identify messages are split into smaller messages
export const FIRST_IDENTIFY_MESSAGE_MAX_SIZE = 1024 * 2
export const SUBSEQUENT_IDENTIFY_MESSAGE_MAX_SIZE = 1024 * 4
export const MAX_IDENTIFY_MESSAGES = 10

// https://github.com/libp2p/go-libp2p/blob/0385ec924bad172f74a74db09939e97c079b1420/p2p/protocol/identify/id.go#L47C7-L47C25
export const MAX_PUSH_CONCURRENCY = 32

Expand Down
35 changes: 27 additions & 8 deletions packages/protocol-identify/src/identify-push.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { serviceCapabilities } from '@libp2p/interface'
import { InvalidMessageError, serviceCapabilities } from '@libp2p/interface'
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
import { debounce, pbStream } from '@libp2p/utils'
import { UnexpectedEOFError, debounce, pbStream } from '@libp2p/utils'
import { CODE_P2P } from '@multiformats/multiaddr'
import drain from 'it-drain'
import parallel from 'it-parallel'
Expand All @@ -13,7 +13,7 @@ import {
PUSH_DEBOUNCE_MS
} from './consts.js'
import { Identify as IdentifyMessage } from './pb/message.js'
import { AbstractIdentify, consumeIdentifyMessage, defaultValues } from './utils.js'
import { AbstractIdentify, buildIdentifyMessages, consumeIdentifyMessage, defaultValues, mergeIdentifyMessages } from './utils.js'
import type { IdentifyPush as IdentifyPushInterface, IdentifyPushComponents, IdentifyPushInit } from './index.js'
import type { Stream, Startable, Connection } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal'
Expand Down Expand Up @@ -99,16 +99,18 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif
maxDataLength: self.maxMessageSize
}).pb(IdentifyMessage)

await pb.write({
const msgs = buildIdentifyMessages({
listenAddrs: listenAddresses.map(ma => ma.bytes),
signedPeerRecord: signedPeerRecord.marshal(),
protocols: supportedProtocols,
agentVersion,
protocolVersion
}, {
signal
})

for (const msg of msgs) {
await pb.write(msg, { signal })
}

await stream.close({
signal
})
Expand Down Expand Up @@ -148,10 +150,27 @@ export class IdentifyPush extends AbstractIdentify implements Startable, Identif
maxDataLength: this.maxMessageSize
}).pb(IdentifyMessage)

const message = await pb.read(options)
const messages: IdentifyMessage[] = []

for (let i = 0; i < 10; i++) {
try {
messages.push(await pb.read(options))
} catch (err) {
if (messages.length > 0 && err instanceof UnexpectedEOFError) {
break
}

throw err
}
}

if (messages.length === 0) {
throw new InvalidMessageError('No identify message received')
}

await stream.close(options)

await consumeIdentifyMessage(this.components.peerStore, this.components.events, log, connection, message)
await consumeIdentifyMessage(this.components.peerStore, this.components.events, log, connection, mergeIdentifyMessages(messages))

log.trace('handled push from %p', connection.remotePeer)
}
Expand Down
36 changes: 29 additions & 7 deletions packages/protocol-identify/src/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { publicKeyFromProtobuf, publicKeyToProtobuf } from '@libp2p/crypto/keys'
import { InvalidMessageError, serviceCapabilities } from '@libp2p/interface'
import { peerIdFromCID } from '@libp2p/peer-id'
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
import { isGlobalUnicast, isPrivate, pbStream } from '@libp2p/utils'
import { UnexpectedEOFError, isGlobalUnicast, isPrivate, pbStream } from '@libp2p/utils'
import { CODE_IP6, CODE_IP6ZONE, CODE_P2P } from '@multiformats/multiaddr'
import { IP_OR_DOMAIN, TCP } from '@multiformats/multiaddr-matcher'
import { setMaxListeners } from 'main-event'
Expand All @@ -11,7 +11,7 @@ import {
MULTICODEC_IDENTIFY_PROTOCOL_VERSION
} from './consts.js'
import { Identify as IdentifyMessage } from './pb/message.js'
import { AbstractIdentify, consumeIdentifyMessage, defaultValues, getCleanMultiaddr } from './utils.js'
import { AbstractIdentify, buildIdentifyMessages, consumeIdentifyMessage, defaultValues, getCleanMultiaddr, mergeIdentifyMessages } from './utils.js'
import type { Identify as IdentifyInterface, IdentifyComponents, IdentifyInit } from './index.js'
import type { IdentifyResult, AbortOptions, Connection, Stream, Startable, Logger, NewStreamOptions } from '@libp2p/interface'

Expand Down Expand Up @@ -64,10 +64,30 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt
maxDataLength: this.maxMessageSize
}).pb(IdentifyMessage)

const message = await pb.read(options)
// Large responses can be subdivided.
// Read all messages until the stream closes.
const MAX_IDENTIFY_MESSAGES = 10
const messages: IdentifyMessage[] = []

for (let i = 0; i < MAX_IDENTIFY_MESSAGES; i++) {
try {
messages.push(await pb.read(options))
} catch (err) {
if (messages.length > 0 && err instanceof UnexpectedEOFError) {
break
}

throw err
}
}

if (messages.length === 0) {
throw new InvalidMessageError('No identify message received')
}

await pb.unwrap().unwrap().close(options)

return message
return mergeIdentifyMessages(messages)
} catch (err: any) {
log?.error('identify failed - %e', err)
stream?.abort(err)
Expand Down Expand Up @@ -178,18 +198,20 @@ export class Identify extends AbstractIdentify implements Startable, IdentifyInt
const pb = pbStream(stream).pb(IdentifyMessage)

log('send response')
await pb.write({
const msgs = buildIdentifyMessages({
protocolVersion: this.host.protocolVersion,
agentVersion: this.host.agentVersion,
publicKey: publicKeyToProtobuf(this.components.privateKey.publicKey),
listenAddrs: multiaddrs.map(addr => addr.bytes),
signedPeerRecord,
observedAddr,
protocols: peerData.protocols
}, {
signal
})

for (const msg of msgs) {
await pb.write(msg, { signal })
}

log('close write')
await pb.unwrap().unwrap().close({
signal
Expand Down
149 changes: 147 additions & 2 deletions packages/protocol-identify/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { publicKeyFromProtobuf } from '@libp2p/crypto/keys'
import { InvalidMessageError } from '@libp2p/interface'
import { peerIdFromCID, peerIdFromPublicKey } from '@libp2p/peer-id'
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
import { isPrivate } from '@libp2p/utils'
import { multiaddr } from '@multiformats/multiaddr'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { IDENTIFY_PROTOCOL_VERSION, MAX_IDENTIFY_MESSAGE_SIZE, MAX_PUSH_CONCURRENCY } from './consts.js'
import { FIRST_IDENTIFY_MESSAGE_MAX_SIZE, IDENTIFY_PROTOCOL_VERSION, MAX_IDENTIFY_MESSAGE_SIZE, MAX_PUSH_CONCURRENCY, SUBSEQUENT_IDENTIFY_MESSAGE_MAX_SIZE } from './consts.js'
import { Identify as IdentifyMessage } from './pb/message.js'
import type { IdentifyComponents, IdentifyInit } from './index.js'
import type { Identify as IdentifyMessage } from './pb/message.js'
import type { Libp2pEvents, IdentifyResult, SignedPeerRecord, Logger, Connection, Peer, PeerData, PeerStore, Startable, Stream } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TypedEventTarget } from 'main-event'
Expand Down Expand Up @@ -171,6 +172,150 @@ export async function consumeIdentifyMessage (peerStore: PeerStore, events: Type
return result
}

/**
* Merge multiple received Identify messages into one
*/
export function mergeIdentifyMessages (messages: IdentifyMessage[]): IdentifyMessage {
const merged: IdentifyMessage = { ...messages[0] }

for (const msg of messages.slice(1)) {
if (msg.protocolVersion != null) {
merged.protocolVersion = msg.protocolVersion
}
if (msg.agentVersion != null) {
merged.agentVersion = msg.agentVersion
}
if (msg.publicKey != null) {
merged.publicKey = msg.publicKey
}
if (msg.observedAddr != null) {
merged.observedAddr = msg.observedAddr
}
if (msg.signedPeerRecord != null) {
merged.signedPeerRecord = msg.signedPeerRecord
}
merged.listenAddrs = [...merged.listenAddrs, ...msg.listenAddrs]
merged.protocols = [...new Set([...merged.protocols, ...msg.protocols])]
}

return merged
}

/**
* Greedily pack items from `remaining` into `current` as long as the encoded
* message produced by `buildCandidate` stays within `maxSize`.
*
* When `guaranteeFirst` is true the size check is skipped for the very first
* item so that the caller always makes progress (used when `current` must end
* up non-empty regardless of size).
*/
function packItems<T> (
current: T[],
remaining: T[],
maxSize: number,
buildCandidate: (items: T[]) => IdentifyMessage,
guaranteeFirst = false
): void {
while (remaining.length > 0) {
if (!guaranteeFirst || current.length > 0) {
const candidate = buildCandidate([...current, remaining[0]])
if (IdentifyMessage.encode(candidate).length > maxSize) {
break
}
}
current.push(remaining.shift()!)
Comment on lines +219 to +226
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried a binary search approach. The speedup was negligible until there were an unreasonable number of addresses so I don't think it justifies the additional complexity.

  ⎿  --- 10 addresses, 20 protocols ---
     linear                         5000 iterations      43.8 ms  (8.8 µs/op)
     binary search                  5000 iterations      32.2 ms  (6.4 µs/op)
       speedup: 1.36x
       address count matches: 10

     --- 20 addresses, 20 protocols ---
     linear                         5000 iterations      37.3 ms  (7.5 µs/op)
     binary search                  5000 iterations      38.5 ms  (7.7 µs/op)
       speedup: 0.97x
       address count matches: 20

     --- 100 addresses, 20 protocols ---
     linear                         2000 iterations      35.1 ms  (17.6 µs/op)
     binary search                  2000 iterations      33.3 ms  (16.6 µs/op)
       speedup: 1.06x
       address count matches: 100

     --- 500 addresses, 20 protocols ---
     linear                         500 iterations    4445.9 ms  (8891.8 µs/op)
     binary search                  500 iterations     199.6 ms  (399.1 µs/op)
       speedup: 22.28x
       address count matches: 500

     --- 1000 addresses, 20 protocols ---
     linear                         100 iterations    2066.3 ms  (20662.7 µs/op)
     binary search                  100 iterations     157.9 ms  (1578.8 µs/op)
       speedup: 13.09x
       address count matches: 1000

}
}

/**
* Split an outgoing Identify message into chunks that respect the per-message size limits:
* - first message SHOULD NOT exceed 2 KB
* - subsequent messages SHOULD NOT exceed 4 KB
*
* Addresses are sorted so that publicly-reachable ones appear in the first
* message, improving backwards-compatibility with old receivers that stop
* after the first message.
*/
export function buildIdentifyMessages (msg: IdentifyMessage): IdentifyMessage[] {
// Sort: non-private (public + circuit-relay-via-public-relay) first
const sortedAddrs = [...msg.listenAddrs].sort((a, b) => {
try {
const aPublic = isPrivate(multiaddr(a)) ? 0 : 1
const bPublic = isPrivate(multiaddr(b)) ? 0 : 1
return bPublic - aPublic
} catch {
return 0
}
})

const full: IdentifyMessage = { ...msg, listenAddrs: sortedAddrs }
if (IdentifyMessage.encode(full).length <= FIRST_IDENTIFY_MESSAGE_MAX_SIZE) {
return [full]
}

const messages: IdentifyMessage[] = []
const remainingAddrs = [...sortedAddrs]
const remainingProtocols = [...msg.protocols]

// First message carries all scalar fields + signedPeerRecord, then as many
// addresses and protocols as fit within 2 KB.
const first: IdentifyMessage = {
protocolVersion: msg.protocolVersion,
agentVersion: msg.agentVersion,
publicKey: msg.publicKey,
observedAddr: msg.observedAddr,
signedPeerRecord: msg.signedPeerRecord,
listenAddrs: [],
protocols: []
}

packItems(first.listenAddrs, remainingAddrs, FIRST_IDENTIFY_MESSAGE_MAX_SIZE,
items => ({ ...first, listenAddrs: items }))

// If signedPeerRecord is so large that no address fits alongside it, defer it
// to its own standalone message so the first message can carry addresses instead.
// The deferred record may exceed 4 KB but cannot be subdivided.
let deferredSignedPeerRecord: Uint8Array | undefined
if (first.listenAddrs.length === 0 && remainingAddrs.length > 0 && first.signedPeerRecord != null) {
deferredSignedPeerRecord = first.signedPeerRecord
first.signedPeerRecord = undefined

// Re-pack without signedPeerRecord, guaranteeing at least one address.
packItems(first.listenAddrs, remainingAddrs, FIRST_IDENTIFY_MESSAGE_MAX_SIZE,
items => ({ ...first, listenAddrs: items }), true)
}

packItems(first.protocols, remainingProtocols, FIRST_IDENTIFY_MESSAGE_MAX_SIZE,
items => ({ ...first, protocols: items }))

messages.push(first)

if (deferredSignedPeerRecord != null) {
const spr: IdentifyMessage = { listenAddrs: [], protocols: [], signedPeerRecord: deferredSignedPeerRecord }

packItems(spr.listenAddrs, remainingAddrs, SUBSEQUENT_IDENTIFY_MESSAGE_MAX_SIZE,
items => ({ ...spr, listenAddrs: items }))
packItems(spr.protocols, remainingProtocols, SUBSEQUENT_IDENTIFY_MESSAGE_MAX_SIZE,
items => ({ ...spr, protocols: items }))

messages.push(spr)
}

// Subsequent messages carry the remaining addresses and protocols in ≤4 KB chunks.
while (remainingAddrs.length > 0 || remainingProtocols.length > 0) {
const subsequent: IdentifyMessage = { listenAddrs: [], protocols: [] }

packItems(subsequent.listenAddrs, remainingAddrs, SUBSEQUENT_IDENTIFY_MESSAGE_MAX_SIZE,
items => ({ ...subsequent, listenAddrs: items }), true)
packItems(subsequent.protocols, remainingProtocols, SUBSEQUENT_IDENTIFY_MESSAGE_MAX_SIZE,
items => ({ ...subsequent, protocols: items }))

messages.push(subsequent)
}

return messages
}

export interface AbstractIdentifyInit extends IdentifyInit {
protocol: string
log: Logger
Expand Down
Loading
Loading