diff --git a/packages/gossipsub/package.json b/packages/gossipsub/package.json index b7e3e4710a..4d36f8f5b6 100644 --- a/packages/gossipsub/package.json +++ b/packages/gossipsub/package.json @@ -30,6 +30,10 @@ "./types": { "types": "./dist/src/types.d.ts", "import": "./dist/src/types.js" + }, + "./partial": { + "types": "./dist/src/partial/index.d.ts", + "import": "./dist/src/partial/index.js" } }, "typesVersions": { @@ -51,7 +55,7 @@ "pretest:e2e": "npm run build", "benchmark": "yarn benchmark:files 'test/benchmark/**/*.test.ts'", "benchmark:files": "NODE_OPTIONS='--max-old-space-size=4096 --loader=ts-node/esm' benchmark --config .benchrc.yaml --defaultBranch master", - "test": "aegir test -f './dist/test/*.spec.js'", + "test": "aegir test -f './dist/test/*.spec.js' -f './dist/test/partial-messages/*.spec.js'", "test:unit": "aegir test -f './dist/test/unit/*.test.js' --target node", "test:e2e": "aegir test -f './dist/test/e2e/*.spec.js'", "test:browser": "npm run test -- --target browser" diff --git a/packages/gossipsub/src/constants.ts b/packages/gossipsub/src/constants.ts index dc5ef80124..cae1b74393 100644 --- a/packages/gossipsub/src/constants.ts +++ b/packages/gossipsub/src/constants.ts @@ -259,3 +259,29 @@ export const BACKOFF_SLACK = 1 export const GossipsubIdontwantMinDataSize = 512 export const GossipsubIdontwantMaxMessages = 512 + +// Partial Messages Extension constants + +/** + * Maximum number of groups to track per topic in PartialMessageState. + * When exceeded, oldest groups are evicted (LRU). + * + * @default 128 + */ +export const PartialMessagesMaxGroups = 128 + +/** + * Time-to-live for partial message groups in milliseconds. + * Groups older than this are pruned during heartbeat. + * + * @default 120000 (2 minutes) + */ +export const PartialMessagesGroupTTLMs = 2 * minute + +/** + * Maximum size in bytes for parts metadata. + * Metadata larger than this will be rejected. + * + * @default 1024 + */ +export const PartialMessagesMaxMetadataSize = 1024 diff --git a/packages/gossipsub/src/gossipsub.ts b/packages/gossipsub/src/gossipsub.ts index 69f5b55ce0..7d22e70ebd 100644 --- a/packages/gossipsub/src/gossipsub.ts +++ b/packages/gossipsub/src/gossipsub.ts @@ -9,7 +9,10 @@ import { ACCEPT_FROM_WHITELIST_DURATION_MS, ACCEPT_FROM_WHITELIST_MAX_MESSAGES, ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE, - BACKOFF_SLACK + BACKOFF_SLACK, + PartialMessagesMaxGroups, + PartialMessagesGroupTTLMs, + PartialMessagesMaxMetadataSize } from './constants.js' import { StrictNoSign, StrictSign, TopicValidatorResult } from './index.ts' import { defaultDecodeRpcLimits } from './message/decodeRpc.js' @@ -52,12 +55,14 @@ import { multiaddrToIPStr } from './utils/multiaddr.js' import { getPublishConfigFromPeerId } from './utils/publishConfig.js' import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js' import { SimpleTimeCache } from './utils/time-cache.js' +import { BitwiseOrMerger } from './partial/bitwise-or-merger.js' +import { PartialMessageState } from './partial/partial-message-state.js' import type { GossipSubComponents, GossipSubEvents, GossipsubMessage, GossipsubOpts, MeshPeer, Message, PublishResult, SubscriptionChangeData, TopicValidatorFn } from './index.ts' import type { DecodeRPCLimits } from './message/decodeRpc.js' import type { MessageCacheRecord } from './message-cache.js' import type { Metrics, ToSendGroupCount } from './metrics.js' import type { PeerScoreParams, PeerScoreThresholds, PeerScoreStatsDump } from './score/index.js' -import type { MsgIdFn, PublishConfig, TopicStr, MsgIdStr, PeerIdStr, RejectReasonObj, FastMsgIdFn, DataTransform, MsgIdToStrFn, MessageId, PublishOpts } from './types.js' +import type { MsgIdFn, PublishConfig, TopicStr, MsgIdStr, PeerIdStr, RejectReasonObj, FastMsgIdFn, DataTransform, MsgIdToStrFn, MessageId, PublishOpts, PartialSubscriptionOpts, PartialMessage, PartsMetadataMerger } from './types.js' import type { Connection, Stream, PeerId, Peer, Logger, @@ -254,6 +259,29 @@ export class GossipSub extends TypedEventEmitter implements Typ */ private readonly idontwants = new Map>() + // Partial Messages Extension fields + + /** Topics we subscribe to with partial message support */ + public readonly partialTopics = new Map() + + /** Per-topic partial message group tracking */ + private readonly partialMessageState = new Map() + + /** What partial opts each peer has signaled per topic */ + private readonly peerPartialOpts = new Map>() + + /** Peers that have received our extension handshake */ + private readonly sentExtensions = new Set() + + /** Configurable merger for parts metadata (default: BitwiseOrMerger) */ + private readonly partsMetadataMerger: PartsMetadataMerger + + /** Shared TextEncoder for topicID string->bytes conversion */ + private readonly textEncoder = new TextEncoder() + + /** Shared TextDecoder for topicID bytes->string conversion */ + private readonly textDecoder = new TextDecoder() + private readonly components: GossipSubComponents private directPeerInitial: ReturnType | null = null @@ -407,6 +435,9 @@ export class GossipSub extends TypedEventEmitter implements Typ this.runOnLimitedConnection = options.runOnLimitedConnection this.allowedTopics = (opts.allowedTopics != null) ? new Set(opts.allowedTopics) : null + + // Partial Messages Extension + this.partsMetadataMerger = options.partsMetadataMerger ?? new BitwiseOrMerger() } readonly [Symbol.toStringTag] = '@chainsafe/libp2p-gossipsub' @@ -600,6 +631,15 @@ export class GossipSub extends TypedEventEmitter implements Typ this.idontwantCounts.clear() this.idontwants.clear() + // Partial Messages Extension cleanup + this.partialTopics.clear() + for (const state of this.partialMessageState.values()) { + state.clear() + } + this.partialMessageState.clear() + this.peerPartialOpts.clear() + this.sentExtensions.clear() + this.log('stopped') } @@ -807,6 +847,13 @@ export class GossipSub extends TypedEventEmitter implements Typ this.idontwantCounts.delete(id) this.idontwants.delete(id) + // Remove from partial messages tracking + this.peerPartialOpts.delete(id) + this.sentExtensions.delete(id) + for (const state of this.partialMessageState.values()) { + state.removePeer(id) + } + // Remove from peer scoring this.score.removePeer(id) @@ -964,6 +1011,29 @@ export class GossipSub extends TypedEventEmitter implements Typ this.handleReceivedSubscription(from, topic, subscribe) + // Track partial message opts from peer + const fromId = from.toString() + const normalizedOpts = this.normalizePartialSubscriptionOpts({ + requestsPartial: subOpt.requestsPartial === true, + supportsSendingPartial: subOpt.supportsSendingPartial === true + }) + const hasPartialOpts = normalizedOpts.requestsPartial || normalizedOpts.supportsSendingPartial + + if (subscribe && hasPartialOpts) { + let peerOpts = this.peerPartialOpts.get(fromId) + if (peerOpts == null) { + peerOpts = new Map() + this.peerPartialOpts.set(fromId, peerOpts) + } + peerOpts.set(topic, normalizedOpts) + } else { + const peerOpts = this.peerPartialOpts.get(fromId) + peerOpts?.delete(topic) + if (peerOpts?.size === 0) { + this.peerPartialOpts.delete(fromId) + } + } + subscriptions.push({ topic, subscribe }) } }) @@ -998,6 +1068,11 @@ export class GossipSub extends TypedEventEmitter implements Typ if (rpc.control != null) { await this.handleControlMessage(from.toString(), rpc.control) } + + // Handle partial messages extension + if (rpc.partial != null) { + this.handleReceivedPartial(from, rpc.partial) + } } /** @@ -1204,7 +1279,17 @@ export class GossipSub extends TypedEventEmitter implements Typ */ private sendSubscriptions (toPeer: PeerIdStr, topics: string[], subscribe: boolean): void { this.sendRpc(toPeer, { - subscriptions: topics.map((topic) => ({ topic, subscribe })), + subscriptions: topics.map((topic) => { + const subOpts: RPC.SubOpts = { topic, subscribe } + // Include partial message flags if this topic has partial support + const partialOpts = this.partialTopics.get(topic) + if (partialOpts != null) { + const normalizedOpts = this.normalizePartialSubscriptionOpts(partialOpts) + subOpts.requestsPartial = normalizedOpts.requestsPartial + subOpts.supportsSendingPartial = normalizedOpts.supportsSendingPartial + } + return subOpts + }), messages: [] }) } @@ -1223,6 +1308,11 @@ export class GossipSub extends TypedEventEmitter implements Typ ;(controlMsg.prune?.length > 0) && (await this.handlePrune(id, controlMsg.prune)) ;(controlMsg.idontwant?.length > 0) && this.handleIdontwant(id, controlMsg.idontwant) + // Handle extensions handshake - log that peer supports partial messages + if (controlMsg.extensions?.partialMessages === true) { + this.log('peer %s supports partial messages extension', id) + } + if ((iwant.length === 0) && (ihave.length === 0) && (prune.length === 0)) { return } @@ -1763,6 +1853,223 @@ export class GossipSub extends TypedEventEmitter implements Typ this.leave(topic) } + // PARTIAL MESSAGES EXTENSION METHODS + + /** + * Subscribe to a topic with partial message support. + * Sends updated SubOpts to peers indicating partial capabilities. + */ + subscribePartial (topic: TopicStr, opts: PartialSubscriptionOpts): void { + if (this.status.code !== GossipStatusCode.started) { + throw new Error('Pubsub has not started') + } + + this.partialTopics.set(topic, this.normalizePartialSubscriptionOpts(opts)) + + // Ensure we have a PartialMessageState for this topic + if (!this.partialMessageState.has(topic)) { + this.partialMessageState.set(topic, new PartialMessageState( + this.partsMetadataMerger, + this.opts.partialMessagesMaxGroups ?? PartialMessagesMaxGroups, + this.opts.partialMessagesGroupTTLMs ?? PartialMessagesGroupTTLMs + )) + } + + // Subscribe to the topic normally if not already subscribed + if (!this.subscriptions.has(topic)) { + this.subscribe(topic) + } else { + // Re-send subscriptions to peers with partial flags + for (const peerId of this.peers.keys()) { + this.sendSubscriptions(peerId, [topic], true) + } + } + } + + /** + * Remove partial message support for a topic. + * The topic remains subscribed for normal messages. + */ + unsubscribePartial (topic: TopicStr): void { + if (this.status.code !== GossipStatusCode.started) { + throw new Error('Pubsub is not started') + } + + const hadPartial = this.partialTopics.delete(topic) + + if (hadPartial) { + // Clean up state + const state = this.partialMessageState.get(topic) + if (state != null) { + state.clear() + this.partialMessageState.delete(topic) + } + + // Re-send subscription without partial flags + if (this.subscriptions.has(topic)) { + for (const peerId of this.peers.keys()) { + this.sendSubscriptions(peerId, [topic], true) + } + } + } + } + + /** + * Publish a partial message to peers that support the partial messages extension. + * Sends the full partial (with partialMessage data) to peers that requestsPartial. + * Sends metadata-only to peers that supportsSendingPartial but don't requestsPartial. + */ + publishPartial (partialMsg: PartialMessage): void { + if (this.status.code !== GossipStatusCode.started) { + throw new Error('Pubsub has not started') + } + + const { topic, groupID, partialMessage, partsMetadata } = partialMsg + + // Update our own state + let state = this.partialMessageState.get(topic) + if (state == null) { + state = new PartialMessageState( + this.partsMetadataMerger, + this.opts.partialMessagesMaxGroups ?? PartialMessagesMaxGroups, + this.opts.partialMessagesGroupTTLMs ?? PartialMessagesGroupTTLMs + ) + this.partialMessageState.set(topic, state) + } + state.updateMetadata(groupID, this.components.peerId.toString(), partsMetadata) + + const topicIDBytes = this.textEncoder.encode(topic) + + // Send to all peers subscribed to this topic with partial support + const topicPeers = this.topics.get(topic) + if (topicPeers == null) { + return + } + + for (const peerId of topicPeers) { + const peerOpts = this.peerPartialOpts.get(peerId)?.get(topic) + if (peerOpts == null) { + continue + } + + if (peerOpts.requestsPartial) { + // Send full partial message (with data + metadata) + this.sendRpc(peerId, createGossipRpc([], undefined, { + topicID: topicIDBytes, + groupID, + partialMessage, + partsMetadata + })) + } else if (peerOpts.supportsSendingPartial) { + // Send metadata only + this.sendRpc(peerId, createGossipRpc([], undefined, { + topicID: topicIDBytes, + groupID, + partsMetadata + })) + } + } + } + + /** + * Handle a received partial message from a peer. + * Updates state and dispatches event to the application. + */ + private handleReceivedPartial (from: PeerId, partial: RPC.PartialMessagesExtension): void { + if (partial.topicID == null || partial.groupID == null || partial.partsMetadata == null) { + this.log('received incomplete partial message from %p, ignoring', from) + return + } + + // Validate metadata size + if (partial.partsMetadata.length > PartialMessagesMaxMetadataSize) { + this.log('received oversized partsMetadata from %p (%d bytes), ignoring', from, partial.partsMetadata.length) + return + } + + // Validate partial message payload size (if present) + if (partial.partialMessage != null && partial.partialMessage.length > this.decodeRpcLimits.maxPartialMessageSize) { + this.log('received oversized partialMessage from %p (%d bytes), ignoring', from, partial.partialMessage.length) + return + } + + const topic = this.textDecoder.decode(partial.topicID) + const fromId = from.toString() + + // Only process if we're subscribed with partial support for this topic + if (!this.partialTopics.has(topic)) { + return + } + + if ((this.allowedTopics != null) && !this.allowedTopics.has(topic)) { + return + } + + // Update state + let state = this.partialMessageState.get(topic) + if (state == null) { + state = new PartialMessageState( + this.partsMetadataMerger, + this.opts.partialMessagesMaxGroups ?? PartialMessagesMaxGroups, + this.opts.partialMessagesGroupTTLMs ?? PartialMessagesGroupTTLMs + ) + this.partialMessageState.set(topic, state) + } + state.updateMetadata(partial.groupID, fromId, partial.partsMetadata) + + // Dispatch event to application + const partialMsg: PartialMessage = { + topic, + groupID: partial.groupID, + partialMessage: partial.partialMessage, + partsMetadata: partial.partsMetadata + } + + this.safeDispatchEvent('gossipsub:partial-message', { + detail: partialMsg + }) + } + + /** + * During heartbeat, gossip parts metadata to non-mesh peers that support partial messages. + */ + private emitPartialGossip (peersToGossipByTopic: Map>): void { + for (const [topic, state] of this.partialMessageState) { + const groups = state.getGroupsForGossip() + if (groups.size === 0) { + continue + } + + const topicIDBytes = this.textEncoder.encode(topic) + const peersToGossip = peersToGossipByTopic.get(topic) + if (peersToGossip == null) { + continue + } + + for (const peerId of peersToGossip) { + const peerOpts = this.peerPartialOpts.get(peerId)?.get(topic) + if (peerOpts == null || !peerOpts.supportsSendingPartial) { + continue + } + + // Send metadata for each group to this peer + for (const [_groupKey, metadata] of groups) { + // Convert hex key back to bytes for groupID + const groupIDBytes = new Uint8Array(_groupKey.length / 2) + for (let i = 0; i < groupIDBytes.length; i++) { + groupIDBytes[i] = parseInt(_groupKey.substring(i * 2, i * 2 + 2), 16) + } + + this.sendRpc(peerId, createGossipRpc([], undefined, { + topicID: topicIDBytes, + groupID: groupIDBytes, + partsMetadata: metadata + })) + } + } + } + } + /** * Join topic */ @@ -2287,6 +2594,14 @@ export class GossipSub extends TypedEventEmitter implements Typ this.gossip.delete(id) } + // Extension handshake: on first successful RPC to peer, include partialMessages=true if we have partial topics + let shouldMarkExtensionAsSent = false + if (this.partialTopics.size > 0 && !this.sentExtensions.has(id)) { + const rpcWithControl = ensureControl(rpc) + rpcWithControl.control.extensions = { partialMessages: true } + shouldMarkExtensionAsSent = true + } + const rpcBytes = RPC.encode(rpc) try { outboundStream.push(rpcBytes) @@ -2304,6 +2619,10 @@ export class GossipSub extends TypedEventEmitter implements Typ return false } + if (shouldMarkExtensionAsSent) { + this.sentExtensions.add(id) + } + this.metrics?.onRpcSent(rpc, rpcBytes.length) if (rpc.control?.graft != null) { @@ -2418,19 +2737,23 @@ export class GossipSub extends TypedEventEmitter implements Typ this.log('too many messages for gossip; will truncate IHAVE list (%d messages)', messageIDs.length) } - if (candidateToGossip.size === 0) { return } + const eligiblePeers = Array.from(candidateToGossip).filter((id) => { + return this.peerPartialOpts.get(id)?.get(topic)?.requestsPartial !== true + }) + + if (eligiblePeers.length === 0) { return } let target = this.opts.Dlazy const gossipFactor = this.opts.gossipFactor - const factor = gossipFactor * candidateToGossip.size - let peersToGossip: Set | PeerIdStr[] = candidateToGossip + const factor = gossipFactor * eligiblePeers.length + let peersToGossip = eligiblePeers if (factor > target) { target = factor } - if (target > peersToGossip.size) { - target = peersToGossip.size + if (target > peersToGossip.length) { + target = peersToGossip.length } else { // only shuffle if needed - peersToGossip = shuffle(Array.from(peersToGossip)).slice(0, target) + peersToGossip = shuffle(peersToGossip.slice()).slice(0, target) } // Emit the IHAVE gossip to the selected peers up to the target @@ -2475,6 +2798,21 @@ export class GossipSub extends TypedEventEmitter implements Typ this.gossip.set(id, gossip.concat(controlIHaveMsgs)) } + private normalizePartialSubscriptionOpts (opts: PartialSubscriptionOpts): PartialSubscriptionOpts { + if (opts.requestsPartial) { + return { + requestsPartial: true, + // Spec invariant: requesting partial implies ability to send partial + supportsSendingPartial: true + } + } + + return { + requestsPartial: false, + supportsSendingPartial: opts.supportsSendingPartial === true + } + } + /** * Make a PRUNE control message for a peer in a topic */ @@ -2896,6 +3234,14 @@ export class GossipSub extends TypedEventEmitter implements Typ this.emitGossip(peersToGossipByTopic) + // Emit partial message gossip (metadata) to non-mesh partial peers + this.emitPartialGossip(peersToGossipByTopic) + + // Prune expired partial message groups + for (const state of this.partialMessageState.values()) { + state.pruneExpired() + } + // send coalesced GRAFT/PRUNE messages (will piggyback gossip) await this.sendGraftPrune(tograft, toprune, noPX) diff --git a/packages/gossipsub/src/index.ts b/packages/gossipsub/src/index.ts index 70dc4d7344..8378088fc4 100644 --- a/packages/gossipsub/src/index.ts +++ b/packages/gossipsub/src/index.ts @@ -5,7 +5,7 @@ import type { GossipsubOptsSpec } from './config.js' import type { DecodeRPCLimits } from './message/decodeRpc.js' import type { MetricsRegister, TopicStrToLabel } from './metrics.js' import type { PeerScoreParams, PeerScoreThresholds } from './score/index.js' -import type { MsgIdFn, MsgIdStr, FastMsgIdFn, AddrInfo, DataTransform, MsgIdToStrFn } from './types.js' +import type { MsgIdFn, MsgIdStr, FastMsgIdFn, AddrInfo, DataTransform, MsgIdToStrFn, PartialMessage, PartialSubscriptionOpts, PartsMetadataMerger } from './types.js' import type { PeerId, PeerStore, ComponentLogger, @@ -255,6 +255,28 @@ export interface GossipsubOpts extends GossipsubOptsSpec { * handle this many incoming pubsub messages concurrently */ messageProcessingConcurrency?: number + + /** + * Custom parts metadata merger for partial messages. + * Defaults to BitwiseOrMerger which uses bitwise OR to combine bitmasks. + */ + partsMetadataMerger?: PartsMetadataMerger + + /** + * Maximum number of groups to track per topic in PartialMessageState. + * When exceeded, oldest groups are evicted (LRU). + * + * @default 128 + */ + partialMessagesMaxGroups?: number + + /** + * Time-to-live for partial message groups in milliseconds. + * Groups older than this are pruned during heartbeat. + * + * @default 120000 + */ + partialMessagesGroupTTLMs?: number } export interface GossipsubMessage { @@ -276,6 +298,7 @@ export interface GossipSubEvents { 'gossipsub:message': CustomEvent 'gossipsub:graft': CustomEvent 'gossipsub:prune': CustomEvent + 'gossipsub:partial-message': CustomEvent } export interface GossipSubComponents { @@ -395,6 +418,22 @@ export interface GossipSub extends TypedEventTarget { * ``` */ publish(topic: string, data?: Uint8Array): Promise + + /** + * Subscribe to a topic with partial message support. + * Sends updated SubOpts to peers indicating partial capabilities. + */ + subscribePartial(topic: string, opts: PartialSubscriptionOpts): void + + /** + * Remove partial message support for a topic. + */ + unsubscribePartial(topic: string): void + + /** + * Publish a partial message to peers that support the partial messages extension. + */ + publishPartial(partialMsg: PartialMessage): void } export function gossipsub ( @@ -402,3 +441,5 @@ export function gossipsub ( ): (components: GossipSubComponents) => GossipSub { return (components: GossipSubComponents) => new GossipSubClass(components, init) } + +export type { PartialMessage, PartialSubscriptionOpts, PartsMetadataMerger } from './types.js' diff --git a/packages/gossipsub/src/message/decodeRpc.ts b/packages/gossipsub/src/message/decodeRpc.ts index 7b9ecd99a5..c7beecfb37 100644 --- a/packages/gossipsub/src/message/decodeRpc.ts +++ b/packages/gossipsub/src/message/decodeRpc.ts @@ -6,6 +6,7 @@ export interface DecodeRPCLimits { maxIdontwantMessageIDs: number maxControlMessages: number maxPeerInfos: number + maxPartialMessageSize: number } export const defaultDecodeRpcLimits: DecodeRPCLimits = { @@ -15,5 +16,6 @@ export const defaultDecodeRpcLimits: DecodeRPCLimits = { maxIwantMessageIDs: Infinity, maxIdontwantMessageIDs: Infinity, maxControlMessages: Infinity, - maxPeerInfos: Infinity + maxPeerInfos: Infinity, + maxPartialMessageSize: Infinity } diff --git a/packages/gossipsub/src/message/rpc.proto b/packages/gossipsub/src/message/rpc.proto index efe8fafe37..ea000bc682 100644 --- a/packages/gossipsub/src/message/rpc.proto +++ b/packages/gossipsub/src/message/rpc.proto @@ -8,6 +8,8 @@ message RPC { message SubOpts { optional bool subscribe = 1; // subscribe or unsubcribe optional string topic = 2; + optional bool requestsPartial = 3; + optional bool supportsSendingPartial = 4; } message Message { @@ -25,6 +27,11 @@ message RPC { repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; repeated ControlIDontWant idontwant = 5; + optional ControlExtensions extensions = 6; + } + + message ControlExtensions { + optional bool partialMessages = 10; } message ControlIHave { @@ -55,4 +62,13 @@ message RPC { repeated bytes messageIDs = 1; } + optional PartialMessagesExtension partial = 10; + + message PartialMessagesExtension { + optional bytes topicID = 1; + optional bytes groupID = 2; + optional bytes partialMessage = 3; + optional bytes partsMetadata = 4; + } + } \ No newline at end of file diff --git a/packages/gossipsub/src/message/rpc.ts b/packages/gossipsub/src/message/rpc.ts index eb018dd6e1..612ac095b5 100644 --- a/packages/gossipsub/src/message/rpc.ts +++ b/packages/gossipsub/src/message/rpc.ts @@ -8,12 +8,15 @@ export interface RPC { subscriptions: RPC.SubOpts[] messages: RPC.Message[] control?: RPC.ControlMessage + partial?: RPC.PartialMessagesExtension } export namespace RPC { export interface SubOpts { subscribe?: boolean topic?: string + requestsPartial?: boolean + supportsSendingPartial?: boolean } export namespace SubOpts { @@ -36,6 +39,16 @@ export namespace RPC { w.string(obj.topic) } + if (obj.requestsPartial != null) { + w.uint32(24) + w.bool(obj.requestsPartial) + } + + if (obj.supportsSendingPartial != null) { + w.uint32(32) + w.bool(obj.supportsSendingPartial) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -56,6 +69,14 @@ export namespace RPC { obj.topic = reader.string() break } + case 3: { + obj.requestsPartial = reader.bool() + break + } + case 4: { + obj.supportsSendingPartial = reader.bool() + break + } default: { reader.skipType(tag & 7) break @@ -195,6 +216,7 @@ export namespace RPC { graft: RPC.ControlGraft[] prune: RPC.ControlPrune[] idontwant: RPC.ControlIDontWant[] + extensions?: RPC.ControlExtensions } export namespace ControlMessage { @@ -242,6 +264,11 @@ export namespace RPC { } } + if (obj.extensions != null) { + w.uint32(50) + RPC.ControlExtensions.codec().encode(obj.extensions, w) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -310,6 +337,12 @@ export namespace RPC { })) break } + case 6: { + obj.extensions = RPC.ControlExtensions.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.extensions$ + }) + break + } default: { reader.skipType(tag & 7) break @@ -755,6 +788,152 @@ export namespace RPC { } } + export interface ControlExtensions { + partialMessages?: boolean + } + + export namespace ControlExtensions { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.partialMessages != null) { + w.uint32(80) + w.bool(obj.partialMessages) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 10: { + obj.partialMessages = reader.bool() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, ControlExtensions.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): ControlExtensions => { + return decodeMessage(buf, ControlExtensions.codec(), opts) + } + } + + export interface PartialMessagesExtension { + topicID?: Uint8Array + groupID?: Uint8Array + partialMessage?: Uint8Array + partsMetadata?: Uint8Array + } + + export namespace PartialMessagesExtension { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if (obj.topicID != null) { + w.uint32(10) + w.bytes(obj.topicID) + } + + if (obj.groupID != null) { + w.uint32(18) + w.bytes(obj.groupID) + } + + if (obj.partialMessage != null) { + w.uint32(26) + w.bytes(obj.partialMessage) + } + + if (obj.partsMetadata != null) { + w.uint32(34) + w.bytes(obj.partsMetadata) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.topicID = reader.bytes() + break + } + case 2: { + obj.groupID = reader.bytes() + break + } + case 3: { + obj.partialMessage = reader.bytes() + break + } + case 4: { + obj.partsMetadata = reader.bytes() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, PartialMessagesExtension.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): PartialMessagesExtension => { + return decodeMessage(buf, PartialMessagesExtension.codec(), opts) + } + } + let _codec: Codec export const codec = (): Codec => { @@ -783,6 +962,11 @@ export namespace RPC { RPC.ControlMessage.codec().encode(obj.control, w) } + if (obj.partial != null) { + w.uint32(82) + RPC.PartialMessagesExtension.codec().encode(obj.partial, w) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -824,6 +1008,12 @@ export namespace RPC { }) break } + case 10: { + obj.partial = RPC.PartialMessagesExtension.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.partial$ + }) + break + } default: { reader.skipType(tag & 7) break diff --git a/packages/gossipsub/src/partial/bitwise-or-merger.ts b/packages/gossipsub/src/partial/bitwise-or-merger.ts new file mode 100644 index 0000000000..f666e286db --- /dev/null +++ b/packages/gossipsub/src/partial/bitwise-or-merger.ts @@ -0,0 +1,21 @@ +import type { PartsMetadataMerger } from '../types.js' + +/** + * Default PartsMetadataMerger that combines metadata using bitwise OR. + * This is appropriate when parts metadata is a bitmask where each bit + * represents whether a particular part is available. + */ +export class BitwiseOrMerger implements PartsMetadataMerger { + /** + * Merge two parts metadata bitmasks by applying bitwise OR byte-by-byte. + * The result length is the max of the two inputs; missing bytes are treated as 0. + */ + merge (a: Uint8Array, b: Uint8Array): Uint8Array { + const len = Math.max(a.length, b.length) + const result = new Uint8Array(len) + for (let i = 0; i < len; i++) { + result[i] = (a[i] ?? 0) | (b[i] ?? 0) + } + return result + } +} diff --git a/packages/gossipsub/src/partial/index.ts b/packages/gossipsub/src/partial/index.ts new file mode 100644 index 0000000000..5da1b3b407 --- /dev/null +++ b/packages/gossipsub/src/partial/index.ts @@ -0,0 +1,2 @@ +export { BitwiseOrMerger } from './bitwise-or-merger.js' +export { PartialMessageState } from './partial-message-state.js' diff --git a/packages/gossipsub/src/partial/partial-message-state.ts b/packages/gossipsub/src/partial/partial-message-state.ts new file mode 100644 index 0000000000..a13ad7cb90 --- /dev/null +++ b/packages/gossipsub/src/partial/partial-message-state.ts @@ -0,0 +1,168 @@ +import type { PartsMetadataMerger } from '../types.js' +import type { PeerIdStr } from '../types.js' + +interface GroupState { + /** Combined local metadata (merged from all received metadata) */ + localMetadata: Uint8Array + /** Per-peer metadata tracking what each peer has reported */ + peerMetadata: Map + /** Timestamp when this group was first seen */ + createdAt: number + /** Timestamp of last access (for LRU eviction) */ + lastAccessedAt: number +} + +/** + * Tracks partial message state for a single topic. + * Maintains per-group state with LRU eviction and TTL-based pruning. + */ +export class PartialMessageState { + private readonly groups = new Map() + private readonly maxGroups: number + private readonly groupTTLMs: number + private readonly merger: PartsMetadataMerger + + constructor (merger: PartsMetadataMerger, maxGroups: number, groupTTLMs: number) { + this.merger = merger + this.maxGroups = maxGroups + this.groupTTLMs = groupTTLMs + } + + /** + * Convert a groupID Uint8Array to a string key for the map + */ + private groupKey (groupID: Uint8Array): string { + // Use hex encoding for consistent string keys + let key = '' + for (let i = 0; i < groupID.length; i++) { + key += groupID[i].toString(16).padStart(2, '0') + } + return key + } + + /** + * Update state with received metadata for a group from a peer. + * Merges the metadata into the local combined metadata. + */ + updateMetadata (groupID: Uint8Array, peerId: PeerIdStr, metadata: Uint8Array): void { + const key = this.groupKey(groupID) + const now = Date.now() + + let group = this.groups.get(key) + if (group == null) { + // Evict oldest if at capacity + if (this.groups.size >= this.maxGroups) { + this.evictOldest() + } + group = { + localMetadata: new Uint8Array(metadata.length), + peerMetadata: new Map(), + createdAt: now, + lastAccessedAt: now + } + this.groups.set(key, group) + } + + group.lastAccessedAt = now + group.peerMetadata.set(peerId, metadata) + group.localMetadata = this.merger.merge(group.localMetadata, metadata) + } + + /** + * Get the combined local metadata for a group. + */ + getLocalMetadata (groupID: Uint8Array): Uint8Array | undefined { + const key = this.groupKey(groupID) + const group = this.groups.get(key) + if (group != null) { + group.lastAccessedAt = Date.now() + } + return group?.localMetadata + } + + /** + * Get the metadata a specific peer has reported for a group. + */ + getPeerMetadata (groupID: Uint8Array, peerId: PeerIdStr): Uint8Array | undefined { + const key = this.groupKey(groupID) + return this.groups.get(key)?.peerMetadata.get(peerId) + } + + /** + * Get all groups that have metadata, for gossip during heartbeat. + * Returns groupID (as hex key) => localMetadata pairs. + */ + getGroupsForGossip (): Map { + const result = new Map() + for (const [key, group] of this.groups) { + if (group.localMetadata.length > 0) { + result.set(key, group.localMetadata) + } + } + return result + } + + /** + * Check if we have any state for a group. + */ + hasGroup (groupID: Uint8Array): boolean { + return this.groups.has(this.groupKey(groupID)) + } + + /** + * Remove all entries for a peer across all groups. + */ + removePeer (peerId: PeerIdStr): void { + for (const group of this.groups.values()) { + group.peerMetadata.delete(peerId) + } + } + + /** + * Prune groups older than the TTL. + */ + pruneExpired (): number { + const now = Date.now() + let pruned = 0 + for (const [key, group] of this.groups) { + if (now - group.createdAt > this.groupTTLMs) { + this.groups.delete(key) + pruned++ + } + } + return pruned + } + + /** + * Evict the least recently accessed group. + */ + private evictOldest (): void { + let oldestKey: string | undefined + let oldestTime = Infinity + + for (const [key, group] of this.groups) { + if (group.lastAccessedAt < oldestTime) { + oldestTime = group.lastAccessedAt + oldestKey = key + } + } + + if (oldestKey != null) { + this.groups.delete(oldestKey) + } + } + + /** + * Get the number of tracked groups. + */ + get size (): number { + return this.groups.size + } + + /** + * Clear all state. + */ + clear (): void { + this.groups.clear() + } +} diff --git a/packages/gossipsub/src/types.ts b/packages/gossipsub/src/types.ts index 025085b26d..87f7a58325 100644 --- a/packages/gossipsub/src/types.ts +++ b/packages/gossipsub/src/types.ts @@ -176,3 +176,38 @@ export function rejectReasonFromAcceptance ( throw new Error('Unreachable') } } + +/** + * A partial message to be sent or received via the partial messages extension. + * Contains either a partial message payload, parts metadata, or both. + */ +export interface PartialMessage { + /** The topic this partial message belongs to */ + topic: TopicStr + /** Unique identifier for the group of partial messages */ + groupID: Uint8Array + /** The partial message data (a subset of the full message) */ + partialMessage?: Uint8Array + /** Metadata describing which parts are available */ + partsMetadata: Uint8Array +} + +/** + * Options for partial message subscription signaling. + * Sent in SubOpts to indicate partial message capabilities. + */ +export interface PartialSubscriptionOpts { + /** Whether this peer wants to receive partial messages */ + requestsPartial: boolean + /** Whether this peer can send partial messages */ + supportsSendingPartial: boolean +} + +/** + * Merges parts metadata from multiple sources. + * The default implementation uses bitwise OR to combine bitmasks. + */ +export interface PartsMetadataMerger { + /** Merge two parts metadata buffers into a combined result */ + merge(a: Uint8Array, b: Uint8Array): Uint8Array +} diff --git a/packages/gossipsub/src/utils/create-gossip-rpc.ts b/packages/gossipsub/src/utils/create-gossip-rpc.ts index efd59fcb1c..4779475a32 100644 --- a/packages/gossipsub/src/utils/create-gossip-rpc.ts +++ b/packages/gossipsub/src/utils/create-gossip-rpc.ts @@ -3,7 +3,7 @@ import type { RPC } from '../message/rpc.js' /** * Create a gossipsub RPC object */ -export function createGossipRpc (messages: RPC.Message[] = [], control?: Partial): RPC { +export function createGossipRpc (messages: RPC.Message[] = [], control?: Partial, partial?: RPC.PartialMessagesExtension): RPC { return { subscriptions: [], messages, @@ -15,11 +15,12 @@ export function createGossipRpc (messages: RPC.Message[] = [], control?: Partial iwant: control.iwant ?? [], idontwant: control.idontwant ?? [] } - : undefined + : undefined, + partial } } -export function ensureControl (rpc: RPC): Required { +export function ensureControl (rpc: RPC): Required> & RPC { if (rpc.control === undefined) { rpc.control = { graft: [], @@ -30,5 +31,5 @@ export function ensureControl (rpc: RPC): Required { } } - return rpc as Required + return rpc as Required> & RPC } diff --git a/packages/gossipsub/test/partial-messages/bitwise-or-merger.spec.ts b/packages/gossipsub/test/partial-messages/bitwise-or-merger.spec.ts new file mode 100644 index 0000000000..297572dd39 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/bitwise-or-merger.spec.ts @@ -0,0 +1,55 @@ +import { expect } from 'aegir/chai' +import { BitwiseOrMerger } from '../../src/partial/bitwise-or-merger.js' + +describe('BitwiseOrMerger', () => { + const merger = new BitwiseOrMerger() + + it('should merge two equal-length buffers with bitwise OR', () => { + const a = new Uint8Array([0b1010, 0b0011]) + const b = new Uint8Array([0b0101, 0b1100]) + const result = merger.merge(a, b) + expect(result).to.deep.equal(new Uint8Array([0b1111, 0b1111])) + }) + + it('should handle different-length buffers by padding shorter one with zeros', () => { + const a = new Uint8Array([0b1010]) + const b = new Uint8Array([0b0101, 0b1100]) + const result = merger.merge(a, b) + expect(result).to.deep.equal(new Uint8Array([0b1111, 0b1100])) + }) + + it('should handle empty buffer with non-empty buffer', () => { + const a = new Uint8Array([]) + const b = new Uint8Array([0b1010, 0b0101]) + const result = merger.merge(a, b) + expect(result).to.deep.equal(new Uint8Array([0b1010, 0b0101])) + }) + + it('should handle two empty buffers', () => { + const a = new Uint8Array([]) + const b = new Uint8Array([]) + const result = merger.merge(a, b) + expect(result).to.deep.equal(new Uint8Array([])) + }) + + it('should handle identical buffers', () => { + const a = new Uint8Array([0b1010, 0b0101]) + const b = new Uint8Array([0b1010, 0b0101]) + const result = merger.merge(a, b) + expect(result).to.deep.equal(new Uint8Array([0b1010, 0b0101])) + }) + + it('should merge with all zeros', () => { + const a = new Uint8Array([0, 0]) + const b = new Uint8Array([0b1111, 0b1111]) + const result = merger.merge(a, b) + expect(result).to.deep.equal(new Uint8Array([0b1111, 0b1111])) + }) + + it('should merge with all ones', () => { + const a = new Uint8Array([0xFF, 0xFF]) + const b = new Uint8Array([0b1010, 0b0101]) + const result = merger.merge(a, b) + expect(result).to.deep.equal(new Uint8Array([0xFF, 0xFF])) + }) +}) diff --git a/packages/gossipsub/test/partial-messages/cleanup.spec.ts b/packages/gossipsub/test/partial-messages/cleanup.spec.ts new file mode 100644 index 0000000000..7d9977b43a --- /dev/null +++ b/packages/gossipsub/test/partial-messages/cleanup.spec.ts @@ -0,0 +1,133 @@ +import { stop } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import { createComponents } from '../utils/create-pubsub.js' +import { setupTwoNodes, teardownTwoNodes } from './utils.js' +import type { TwoNodeContext } from './utils.js' + +describe('partial messages - cleanup', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should clean up partial state when unsubscribePartial is called', () => { + const topic = 'test-topic' + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Verify state exists + const gsA = ctx.nodeA.pubsub as any + expect(gsA.partialMessageState.has(topic)).to.be.true() + expect(ctx.nodeA.pubsub.partialTopics.has(topic)).to.be.true() + + ctx.nodeA.pubsub.unsubscribePartial(topic) + + // Verify state cleaned up + expect(gsA.partialMessageState.has(topic)).to.be.false() + expect(ctx.nodeA.pubsub.partialTopics.has(topic)).to.be.false() + }) + + it('should clean up peer partial opts when peer is removed', () => { + const peerAId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + // Manually set peer opts to verify they get cleaned up + gsB.peerPartialOpts.set(peerAId, new Map()) + gsB.peerPartialOpts.get(peerAId).set('test-topic', { + requestsPartial: true, + supportsSendingPartial: true + }) + + expect(gsB.peerPartialOpts.has(peerAId)).to.be.true() + + // Directly trigger removePeer (simulates disconnect) + gsB.removePeer(ctx.nodeA.components.peerId) + + // After peer removal, partial opts should be cleaned + expect(gsB.peerPartialOpts.has(peerAId)).to.be.false() + }) + + it('should clean up sentExtensions when peer is removed', () => { + const peerAId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + // Manually set sentExtensions + gsB.sentExtensions.add(peerAId) + expect(gsB.sentExtensions.has(peerAId)).to.be.true() + + // Directly trigger removePeer + gsB.removePeer(ctx.nodeA.components.peerId) + + expect(gsB.sentExtensions.has(peerAId)).to.be.false() + }) + + it('should clean up partialMessageState peer entries when peer is removed', () => { + const topic = 'test-topic' + const peerAId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + // Subscribe nodeB with partial support + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Simulate having received partial metadata from nodeA + const state = gsB.partialMessageState.get(topic) + state.updateMetadata(new Uint8Array([1]), peerAId, new Uint8Array([0b1010])) + expect(state.getPeerMetadata(new Uint8Array([1]), peerAId)).to.not.be.undefined() + + // Directly trigger removePeer + gsB.removePeer(ctx.nodeA.components.peerId) + + // Peer metadata should be cleaned up + expect(state.getPeerMetadata(new Uint8Array([1]), peerAId)).to.be.undefined() + }) + + it('should clear all partial state on stop', async () => { + const topic = 'test-topic' + const gsA = ctx.nodeA.pubsub as any + + // Set up various partial state + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const bId = ctx.nodeB.components.peerId.toString() + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + gsA.sentExtensions.add(bId) + + // Verify state exists + expect(ctx.nodeA.pubsub.partialTopics.size).to.be.greaterThan(0) + expect(gsA.partialMessageState.size).to.be.greaterThan(0) + expect(gsA.peerPartialOpts.size).to.be.greaterThan(0) + expect(gsA.sentExtensions.size).to.be.greaterThan(0) + + // Stop the node (including components to avoid resource leaks) + await stop(ctx.nodeA.pubsub, ...Object.entries(ctx.nodeA.components)) + + // All partial state should be cleared + expect(ctx.nodeA.pubsub.partialTopics.size).to.equal(0) + expect(gsA.partialMessageState.size).to.equal(0) + expect(gsA.peerPartialOpts.size).to.equal(0) + expect(gsA.sentExtensions.size).to.equal(0) + + // Re-create nodeA for afterEach cleanup (old components already stopped above) + ctx.nodeA = await createComponents({ + init: { emitSelf: false } + }) + }) +}) diff --git a/packages/gossipsub/test/partial-messages/extension-handshake.spec.ts b/packages/gossipsub/test/partial-messages/extension-handshake.spec.ts new file mode 100644 index 0000000000..bc0cea54da --- /dev/null +++ b/packages/gossipsub/test/partial-messages/extension-handshake.spec.ts @@ -0,0 +1,180 @@ +import { expect } from 'aegir/chai' +import { RPC } from '../../src/message/rpc.js' +import { setupTwoNodes, teardownTwoNodes } from './utils.js' +import type { TwoNodeContext } from './utils.js' + +describe('partial messages - extension handshake', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should include extensions handshake in first RPC when node has partial topics', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Subscribe with partial to set up partialTopics + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Clear sentExtensions to ensure we can test the first-RPC behavior + gsA.sentExtensions.clear() + + // Capture encoded RPCs + const sentRpcBytes: Uint8Array[] = [] + const outboundStream = gsA.streamsOutbound.get(bId) + if (outboundStream != null) { + const origPush = outboundStream.push.bind(outboundStream) + outboundStream.push = (bytes: Uint8Array) => { + sentRpcBytes.push(bytes) + return origPush(bytes) + } + } + + // Send an RPC to nodeB + gsA.sendRpc(bId, { + subscriptions: [], + messages: [] + }) + + // Verify RPC bytes were actually captured and decode them + expect(sentRpcBytes.length).to.be.greaterThan(0, 'expected RPC bytes to be captured') + const decoded = RPC.decode(sentRpcBytes[0]) + expect(decoded.control?.extensions?.partialMessages).to.equal(true) + + // sentExtensions should now include this peer + expect(gsA.sentExtensions.has(bId)).to.be.true() + }) + + it('should not include extensions handshake on subsequent RPCs', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Mark that we already sent extensions to this peer + gsA.sentExtensions.add(bId) + + // Capture encoded RPCs + const sentRpcBytes: Uint8Array[] = [] + const outboundStream = gsA.streamsOutbound.get(bId) + if (outboundStream != null) { + const origPush = outboundStream.push.bind(outboundStream) + outboundStream.push = (bytes: Uint8Array) => { + sentRpcBytes.push(bytes) + return origPush(bytes) + } + } + + // Send another RPC + gsA.sendRpc(bId, { + subscriptions: [], + messages: [] + }) + + // Verify RPC bytes were captured and second RPC does NOT include extensions + expect(sentRpcBytes.length).to.be.greaterThan(0, 'expected RPC bytes to be captured') + const decoded = RPC.decode(sentRpcBytes[0]) + expect(decoded.control?.extensions).to.be.undefined() + }) + + it('should not include extensions handshake when no partial topics', () => { + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Ensure no partial topics + expect(ctx.nodeA.pubsub.partialTopics.size).to.equal(0) + + // Capture encoded RPCs + const sentRpcBytes: Uint8Array[] = [] + const outboundStream = gsA.streamsOutbound.get(bId) + if (outboundStream != null) { + const origPush = outboundStream.push.bind(outboundStream) + outboundStream.push = (bytes: Uint8Array) => { + sentRpcBytes.push(bytes) + return origPush(bytes) + } + } + + // Send an RPC + gsA.sendRpc(bId, { + subscriptions: [], + messages: [] + }) + + // Verify RPC bytes were captured and no extensions are included + expect(sentRpcBytes.length).to.be.greaterThan(0, 'expected RPC bytes to be captured') + const decoded = RPC.decode(sentRpcBytes[0]) + expect(decoded.control?.extensions).to.be.undefined() + + // sentExtensions should NOT include this peer + expect(gsA.sentExtensions.has(bId)).to.be.false() + }) + + it('should not mark extension as sent when first RPC send fails', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + gsA.sentExtensions.clear() + + const outboundStream = gsA.streamsOutbound.get(bId) + expect(outboundStream).to.not.be.undefined() + + const originalPush = outboundStream.push.bind(outboundStream) + outboundStream.push = () => { + throw new Error('boom') + } + + const sent = gsA.sendRpc(bId, { + subscriptions: [], + messages: [] + }) + + expect(sent).to.be.false() + expect(gsA.sentExtensions.has(bId)).to.be.false() + + outboundStream.push = originalPush + }) + + it('should log peer support when receiving extension handshake', async () => { + const gsB = ctx.nodeB.pubsub as any + const aId = ctx.nodeA.components.peerId.toString() + + // Send an RPC with extension handshake from nodeA + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + control: { + ihave: [], + iwant: [], + graft: [], + prune: [], + idontwant: [], + extensions: { partialMessages: true } + } + }) + + // The implementation logs "peer %s supports partial messages extension" + // We just verify the RPC was processed without error - the logging is internal + // The peer should still be tracked (no crash) + expect(gsB.peers.has(aId)).to.be.true() + }) +}) diff --git a/packages/gossipsub/test/partial-messages/gossip-heartbeat.spec.ts b/packages/gossipsub/test/partial-messages/gossip-heartbeat.spec.ts new file mode 100644 index 0000000000..2e3333ed71 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/gossip-heartbeat.spec.ts @@ -0,0 +1,204 @@ +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { RPC } from '../../src/message/rpc.js' +import { setupTwoNodes, teardownTwoNodes } from './utils.js' +import type { TwoNodeContext } from './utils.js' +import type { PartialSubscriptionOpts } from '../../src/types.js' + +describe('partial messages - gossip and heartbeat', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should gossip partial metadata to non-mesh partial peers during heartbeat', async () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Subscribe nodeA with partial support + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Add some state to gossip + const state = gsA.partialMessageState.get(topic) + state.updateMetadata(new Uint8Array([1]), ctx.nodeA.components.peerId.toString(), new Uint8Array([0b1010])) + + // Set nodeB as a peer with partial support for the topic + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: false, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + // Spy on sendRpc + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + // Call emitPartialGossip directly with nodeB in the gossip set + const peersToGossipByTopic = new Map>() + peersToGossipByTopic.set(topic, new Set([bId])) + + gsA.emitPartialGossip(peersToGossipByTopic) + + // Verify partial gossip was sent to nodeB + const sentToB = sentRpcs.filter(s => s.peerId === bId && s.rpc.partial != null) + expect(sentToB.length).to.be.greaterThan(0) + expect(sentToB[0].rpc.partial?.partsMetadata).to.not.be.undefined() + }) + + it('should skip peers without partial support during partial gossip', async () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const state = gsA.partialMessageState.get(topic) + state.updateMetadata(new Uint8Array([1]), ctx.nodeA.components.peerId.toString(), new Uint8Array([0b1010])) + + // Do NOT set any peerPartialOpts for nodeB + + // Spy on sendRpc + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + const peersToGossipByTopic = new Map>() + peersToGossipByTopic.set(topic, new Set([bId])) + + gsA.emitPartialGossip(peersToGossipByTopic) + + // No partial RPC should be sent + const sentToB = sentRpcs.filter(s => s.peerId === bId && s.rpc.partial != null) + expect(sentToB.length).to.equal(0) + }) + + it('should prune expired partial message groups during heartbeat', async () => { + const sandbox = sinon.createSandbox() + try { + sandbox.useFakeTimers() + + const topic = 'test-topic' + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const state = gsA.partialMessageState.get(topic) + state.updateMetadata(new Uint8Array([1]), 'peer1', new Uint8Array([0b1010])) + + expect(state.size).to.equal(1) + + // Advance past the default group TTL (2 minutes) + sandbox.clock.tick(3 * 60 * 1000) + + // Pruning happens in heartbeat - call pruneExpired directly + state.pruneExpired() + + expect(state.size).to.equal(0) + } finally { + sandbox.restore() + } + }) + + it('should gossip metadata for all tracked groups per topic', async () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const state = gsA.partialMessageState.get(topic) + // Add two groups + state.updateMetadata(new Uint8Array([1]), ctx.nodeA.components.peerId.toString(), new Uint8Array([0b1010])) + state.updateMetadata(new Uint8Array([2]), ctx.nodeA.components.peerId.toString(), new Uint8Array([0b0101])) + + // Set nodeB as partial-supporting + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: false, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + // Spy on sendRpc + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + const peersToGossipByTopic = new Map>() + peersToGossipByTopic.set(topic, new Set([bId])) + + gsA.emitPartialGossip(peersToGossipByTopic) + + // Should have sent one partial RPC per group (2 groups) + const sentToB = sentRpcs.filter(s => s.peerId === bId && s.rpc.partial != null) + expect(sentToB.length).to.equal(2) + }) + + it('should not send IHAVE to peers that request partial messages', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: true, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + const pushedIHave: Array<{ peerId: string }> = [] + const originalPushGossip = gsA.pushGossip.bind(gsA) + gsA.pushGossip = (peerId: string, controlIHaveMsgs: unknown) => { + pushedIHave.push({ peerId }) + return originalPushGossip(peerId, controlIHaveMsgs) + } + + gsA.doEmitGossip(topic, new Set([bId]), [new Uint8Array([1, 2, 3])]) + + expect(pushedIHave).to.have.length(0) + }) + + it('should prune partial group state during heartbeat execution', async () => { + const topic = 'test-topic' + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const state = gsA.partialMessageState.get(topic) + state.updateMetadata(new Uint8Array([1]), 'peer1', new Uint8Array([0b1010])) + + const pruneSpy = sinon.spy(state, 'pruneExpired') + await gsA.heartbeat() + + expect(pruneSpy.called).to.be.true() + }) +}) diff --git a/packages/gossipsub/test/partial-messages/handle-received-partial.spec.ts b/packages/gossipsub/test/partial-messages/handle-received-partial.spec.ts new file mode 100644 index 0000000000..6f1d15bb54 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/handle-received-partial.spec.ts @@ -0,0 +1,477 @@ +import { stop } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import { PartialMessagesMaxMetadataSize } from '../../src/constants.js' +import { defaultDecodeRpcLimits } from '../../src/message/decodeRpc.js' +import { RPC } from '../../src/message/rpc.js' +import { createComponents } from '../utils/create-pubsub.js' +import { setupTwoNodes, teardownTwoNodes } from './utils.js' +import type { TwoNodeContext } from './utils.js' +import type { PartialMessage } from '../../src/types.js' + +describe('partial messages - handleReceivedPartial', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should dispatch partial-message event when receiving partial RPC', async () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + // Subscribe nodeB with partial support so it has the state + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + // Set up listener for the partial-message event + const received = new Promise((resolve) => { + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', (evt: CustomEvent) => { + resolve(evt.detail) + }, { once: true }) + }) + + // Simulate receiving a partial RPC from nodeA + const partialRpc: RPC = { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + } + } + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, partialRpc) + + const msg = await received + expect(msg.topic).to.equal(topic) + expect(msg.groupID).to.deep.equal(new Uint8Array([1, 2, 3])) + expect(msg.partsMetadata).to.deep.equal(new Uint8Array([0b1010])) + }) + + it('should update PartialMessageState when receiving partial RPC', async () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + // Subscribe nodeB with partial support + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + // Simulate receiving a partial RPC + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]), + partsMetadata: new Uint8Array([0b1010]) + } + }) + + // Verify the PartialMessageState was updated + const state = gsB.partialMessageState.get(topic) + expect(state).to.not.be.undefined() + expect(state.hasGroup(new Uint8Array([1, 2]))).to.be.true() + expect(state.getLocalMetadata(new Uint8Array([1, 2]))).to.deep.equal(new Uint8Array([0b1010])) + }) + + it('should reject partial messages with missing topicID', () => { + const gsB = ctx.nodeB.pubsub as any + + const partialRpc: RPC = { + subscriptions: [], + messages: [], + partial: { + // Missing topicID + groupID: new Uint8Array([1, 2]), + partsMetadata: new Uint8Array([0b1010]) + } + } + + // Should not throw, just silently return + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, partialRpc) + + // No state should be created + expect(gsB.partialMessageState.size).to.equal(0) + }) + + it('should reject partial messages with missing groupID', () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + const partialRpc: RPC = { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + // Missing groupID + partsMetadata: new Uint8Array([0b1010]) + } + } + + let eventFired = false + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', () => { + eventFired = true + }, { once: true }) + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, partialRpc) + + // No event should be dispatched + expect(eventFired).to.be.false() + }) + + it('should reject partial messages with missing partsMetadata', () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + const partialRpc: RPC = { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]) + // Missing partsMetadata + } + } + + let eventFired = false + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', () => { + eventFired = true + }, { once: true }) + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, partialRpc) + + // No event should be dispatched + expect(eventFired).to.be.false() + }) + + it('should reject partial messages with oversized partsMetadata', () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + // Create metadata that exceeds the max size + const oversizedMetadata = new Uint8Array(PartialMessagesMaxMetadataSize + 1) + + const partialRpc: RPC = { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]), + partsMetadata: oversizedMetadata + } + } + + let eventFired = false + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', () => { + eventFired = true + }, { once: true }) + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, partialRpc) + + // Should be silently ignored + expect(eventFired).to.be.false() + const state = gsB.partialMessageState.get(topic) + expect(state?.hasGroup(new Uint8Array([1, 2]))).to.not.be.true() + }) + + it('should reject partial messages with oversized partialMessage', async () => { + const limitedNode = await createComponents({ + init: { + emitSelf: false, + decodeRpcLimits: { + ...defaultDecodeRpcLimits, + maxPartialMessageSize: 1 + } + } + }) + + try { + const gsLimited = limitedNode.pubsub as any + const topic = 'test-topic' + + limitedNode.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + let eventFired = false + limitedNode.pubsub.addEventListener('gossipsub:partial-message', () => { + eventFired = true + }, { once: true }) + + await gsLimited.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]), + partialMessage: new Uint8Array([4, 5]), // exceeds maxPartialMessageSize + partsMetadata: new Uint8Array([0b1010]) + } + }) + + expect(eventFired).to.be.false() + const state = gsLimited.partialMessageState.get(topic) + expect(state?.hasGroup(new Uint8Array([1, 2]))).to.not.be.true() + } finally { + await stop(limitedNode.pubsub, ...Object.entries(limitedNode.components)) + } + }) + + it('should ignore partial for topic not subscribed with partial support', () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + // Subscribe normally, not via subscribePartial + ctx.nodeB.pubsub.subscribe(topic) + + const topicIDBytes = new TextEncoder().encode(topic) + + let eventFired = false + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', () => { + eventFired = true + }, { once: true }) + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]), + partsMetadata: new Uint8Array([0b1010]) + } + }) + + // Should be ignored - topic is not in partialTopics + expect(eventFired).to.be.false() + expect(gsB.partialMessageState.has(topic)).to.be.false() + }) + + it('should ignore partial for disallowed topic', async () => { + // Create a node with allowedTopics + const restrictedNode = await createComponents({ + init: { + emitSelf: false, + allowedTopics: ['allowed-topic'] + } + }) + + try { + const gsRestricted = restrictedNode.pubsub as any + const topic = 'disallowed-topic' + + // Subscribe with partial to the disallowed topic (bypassing normal checks for test) + restrictedNode.pubsub.partialTopics.set(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + let eventFired = false + restrictedNode.pubsub.addEventListener('gossipsub:partial-message', () => { + eventFired = true + }, { once: true }) + + gsRestricted.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]), + partsMetadata: new Uint8Array([0b1010]) + } + }) + + expect(eventFired).to.be.false() + } finally { + await stop(restrictedNode.pubsub, ...Object.entries(restrictedNode.components)) + } + }) + + it('should dispatch event for metadata-only partial (no partialMessage)', async () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + const received = new Promise((resolve) => { + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', (evt: CustomEvent) => { + resolve(evt.detail) + }, { once: true }) + }) + + // Send partial with partsMetadata only, no partialMessage + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2, 3]), + partsMetadata: new Uint8Array([0b1010]) + } + }) + + const msg = await received + expect(msg.topic).to.equal(topic) + expect(msg.groupID).to.deep.equal(new Uint8Array([1, 2, 3])) + expect(msg.partsMetadata).to.deep.equal(new Uint8Array([0b1010])) + expect(msg.partialMessage).to.be.undefined() + }) + + it('should include partialMessage data in dispatched event when present', async () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + const received = new Promise((resolve) => { + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', (evt: CustomEvent) => { + resolve(evt.detail) + }, { once: true }) + }) + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + } + }) + + const msg = await received + expect(msg.partialMessage).to.deep.equal(new Uint8Array([4, 5, 6])) + }) + + it('should dispatch event from non-mesh peer', async () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Ensure nodeA is NOT in nodeB's mesh for this topic + const aId = ctx.nodeA.components.peerId.toString() + const mesh = gsB.mesh.get(topic) + if (mesh != null) { + mesh.delete(aId) + } + + const topicIDBytes = new TextEncoder().encode(topic) + + const received = new Promise((resolve) => { + ctx.nodeB.pubsub.addEventListener('gossipsub:partial-message', (evt: CustomEvent) => { + resolve(evt.detail) + }, { once: true }) + }) + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2, 3]), + partsMetadata: new Uint8Array([0b1010]) + } + }) + + // Event should still be dispatched regardless of mesh membership + const msg = await received + expect(msg.topic).to.equal(topic) + expect(msg.groupID).to.deep.equal(new Uint8Array([1, 2, 3])) + }) + + it('should replace peer metadata on subsequent updates for same group', async () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + const aId = ctx.nodeA.components.peerId.toString() + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const topicIDBytes = new TextEncoder().encode(topic) + + // First update + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]), + partsMetadata: new Uint8Array([0b1010]) + } + }) + + const state = gsB.partialMessageState.get(topic) + expect(state.getPeerMetadata(new Uint8Array([1, 2]), aId)).to.deep.equal(new Uint8Array([0b1010])) + + // Second update from same peer, same group, different metadata + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [], + messages: [], + partial: { + topicID: topicIDBytes, + groupID: new Uint8Array([1, 2]), + partsMetadata: new Uint8Array([0b0101]) + } + }) + + // Peer metadata should be replaced with the latest + expect(state.getPeerMetadata(new Uint8Array([1, 2]), aId)).to.deep.equal(new Uint8Array([0b0101])) + // Local metadata should be the merge of all received + expect(state.getLocalMetadata(new Uint8Array([1, 2]))).to.deep.equal(new Uint8Array([0b1111])) + }) +}) diff --git a/packages/gossipsub/test/partial-messages/mixed-config-eager.spec.ts b/packages/gossipsub/test/partial-messages/mixed-config-eager.spec.ts new file mode 100644 index 0000000000..0742f06a74 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/mixed-config-eager.spec.ts @@ -0,0 +1,340 @@ +import { stop } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { RPC } from '../../src/message/rpc.js' +import { createComponents } from '../utils/create-pubsub.js' +import { setupTwoNodes, teardownTwoNodes } from './utils.js' +import type { TwoNodeContext } from './utils.js' +import type { PartialSubscriptionOpts, PartsMetadataMerger } from '../../src/types.js' + +describe('partial messages - mixed network and upgrade path', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should still process full messages when subscribed with partial', () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + // Subscribe nodeB with partial support + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Subscribe nodeA to topic to make it a valid sender + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ subscribe: true, topic }], + messages: [] + }) + + // Simulate a regular full message RPC (not partial) from nodeA + // This is what happens when a non-partial-supporting peer sends a message + const rpc: RPC = { + subscriptions: [], + messages: [{ + topic, + data: new TextEncoder().encode('hello world'), + from: ctx.nodeA.components.peerId.toMultihash().bytes, + seqno: new Uint8Array(8) + }] + } + + // Process the full message - should not throw + // This verifies that partial support doesn't break full message reception + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, rpc) + + // Node should still be functional after processing the full message + expect(ctx.nodeB.pubsub.getTopics()).to.include(topic) + }) + + it('should handle supportsSendingPartial-only subscription correctly', () => { + const topic = 'test-topic' + const aId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + // Simulate nodeA subscribing with supportsSendingPartial only (no requestsPartial) + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: true, + topic, + requestsPartial: false, + supportsSendingPartial: true + }], + messages: [] + }) + + // Verify nodeB tracked nodeA's opts correctly + const peerOpts = gsB.peerPartialOpts.get(aId)?.get(topic) + expect(peerOpts).to.not.be.undefined() + expect(peerOpts?.requestsPartial).to.be.false() + expect(peerOpts?.supportsSendingPartial).to.be.true() + }) + + it('should still process full messages when supportsSendingPartial-only is set', () => { + const topic = 'test-topic' + const gsB = ctx.nodeB.pubsub as any + + ctx.nodeB.pubsub.subscribePartial(topic, { + requestsPartial: false, + supportsSendingPartial: true + }) + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ subscribe: true, topic }], + messages: [] + }) + + const rpc: RPC = { + subscriptions: [], + messages: [{ + topic, + data: new TextEncoder().encode('full message'), + from: ctx.nodeA.components.peerId.toMultihash().bytes, + seqno: new Uint8Array(8) + }] + } + + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, rpc) + expect(ctx.nodeB.pubsub.getTopics()).to.include(topic) + }) + + it('should update peer behavior when upgrading from supports-only to requestsPartial', () => { + const topic = 'test-topic' + const aId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + // Step 1: peer advertises supports-only + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: true, + topic, + requestsPartial: false, + supportsSendingPartial: true + }], + messages: [] + }) + + expect(gsB.peerPartialOpts.get(aId)?.get(topic)?.requestsPartial).to.be.false() + expect(gsB.peerPartialOpts.get(aId)?.get(topic)?.supportsSendingPartial).to.be.true() + + // Step 2: same peer upgrades to request partials + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: true, + topic, + requestsPartial: true, + supportsSendingPartial: true + }], + messages: [] + }) + + expect(gsB.peerPartialOpts.get(aId)?.get(topic)?.requestsPartial).to.be.true() + expect(gsB.peerPartialOpts.get(aId)?.get(topic)?.supportsSendingPartial).to.be.true() + + // After upgrade, publishPartial should include data (not metadata-only) + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const originalSendRpc = gsB.sendRpc.bind(gsB) + gsB.sendRpc = (peerId: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId, rpc }) + return originalSendRpc(peerId, rpc) + } + + ctx.nodeB.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([9, 9, 9]), + partsMetadata: new Uint8Array([0b1010]) + }) + + const sentToA = sentRpcs.find((entry) => entry.peerId === aId && entry.rpc.partial != null) + expect(sentToA).to.not.be.undefined() + expect(sentToA?.rpc.partial?.partialMessage).to.deep.equal(new Uint8Array([9, 9, 9])) + }) +}) + +describe('partial messages - configuration', () => { + it('should use custom PartsMetadataMerger when provided', async () => { + let mergeCallCount = 0 + + // Create a custom merger that just returns the longer buffer + const customMerger: PartsMetadataMerger = { + merge (a: Uint8Array, b: Uint8Array): Uint8Array { + mergeCallCount++ + return a.length >= b.length ? a : b + } + } + + const customNode = await createComponents({ + init: { + emitSelf: false, + partsMetadataMerger: customMerger + } + }) + + try { + const topic = 'test-topic' + const gsCustom = customNode.pubsub as any + + customNode.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Publish partial to trigger the merger + customNode.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1]), + partialMessage: new Uint8Array([2]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // The custom merger should have been called + expect(mergeCallCount).to.be.greaterThan(0) + + // Verify the merger's behavior (returns longer buffer, not bitwise OR) + const state = gsCustom.partialMessageState.get(topic) + expect(state).to.not.be.undefined() + } finally { + await stop(customNode.pubsub, ...Object.entries(customNode.components)) + } + }) + + it('should respect custom maxGroups configuration', async () => { + const customNode = await createComponents({ + init: { + emitSelf: false, + partialMessagesMaxGroups: 2 + } + }) + + try { + const topic = 'test-topic' + const gsCustom = customNode.pubsub as any + + customNode.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const state = gsCustom.partialMessageState.get(topic) + + // Add 3 groups to a state configured with maxGroups=2 + state.updateMetadata(new Uint8Array([1]), 'peer1', new Uint8Array([0b1010])) + state.updateMetadata(new Uint8Array([2]), 'peer1', new Uint8Array([0b0101])) + state.updateMetadata(new Uint8Array([3]), 'peer1', new Uint8Array([0b1100])) + + // Should not exceed maxGroups + expect(state.size).to.equal(2) + // Oldest group should have been evicted + expect(state.hasGroup(new Uint8Array([1]))).to.be.false() + expect(state.hasGroup(new Uint8Array([2]))).to.be.true() + expect(state.hasGroup(new Uint8Array([3]))).to.be.true() + } finally { + await stop(customNode.pubsub, ...Object.entries(customNode.components)) + } + }) + + it('should respect custom groupTTLMs configuration', async () => { + const sandbox = sinon.createSandbox() + try { + sandbox.useFakeTimers() + + const customNode = await createComponents({ + init: { + emitSelf: false, + partialMessagesGroupTTLMs: 1000 // 1 second TTL + } + }) + + try { + const topic = 'test-topic' + const gsCustom = customNode.pubsub as any + + customNode.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + const state = gsCustom.partialMessageState.get(topic) + state.updateMetadata(new Uint8Array([1]), 'peer1', new Uint8Array([0b1010])) + + expect(state.size).to.equal(1) + + // Advance past custom TTL + sandbox.clock.tick(1500) + + const pruned = state.pruneExpired() + expect(pruned).to.equal(1) + expect(state.size).to.equal(0) + } finally { + await stop(customNode.pubsub, ...Object.entries(customNode.components)) + } + } finally { + sandbox.restore() + } + }) +}) + +describe('partial messages - eager data pushing', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should allow publishing partial data before receiving partsMetadata from peer', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Add nodeB to the topic's peer set with requestsPartial=true + if (!gsA.topics.has(topic)) { + gsA.topics.set(topic, new Set()) + } + gsA.topics.get(topic).add(bId) + + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: true, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + // Spy on sendRpc + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + // Publish partial data without any prior partsMetadata exchange + // This tests eager push - the spec says implementations SHOULD support this + ctx.nodeA.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // Data should be sent successfully + const sentToB = sentRpcs.find(s => s.peerId === bId && s.rpc.partial != null) + expect(sentToB).to.not.be.undefined() + expect(sentToB?.rpc.partial?.partialMessage).to.deep.equal(new Uint8Array([4, 5, 6])) + }) +}) diff --git a/packages/gossipsub/test/partial-messages/partial-message-state.spec.ts b/packages/gossipsub/test/partial-messages/partial-message-state.spec.ts new file mode 100644 index 0000000000..7cb963bbe2 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/partial-message-state.spec.ts @@ -0,0 +1,181 @@ +import { expect } from 'aegir/chai' +import sinon from 'sinon' +import { BitwiseOrMerger } from '../../src/partial/bitwise-or-merger.js' +import { PartialMessageState } from '../../src/partial/partial-message-state.js' + +describe('PartialMessageState', () => { + const merger = new BitwiseOrMerger() + const maxGroups = 3 + const groupTTLMs = 5000 + const sandbox = sinon.createSandbox() + + afterEach(() => { + sandbox.restore() + }) + + function makeGroupID (id: number): Uint8Array { + return new Uint8Array([id]) + } + + it('should track metadata for a group', () => { + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + const groupID = makeGroupID(1) + const metadata = new Uint8Array([0b1010]) + + state.updateMetadata(groupID, 'peer1', metadata) + + expect(state.size).to.equal(1) + expect(state.hasGroup(groupID)).to.be.true() + expect(state.getLocalMetadata(groupID)).to.deep.equal(new Uint8Array([0b1010])) + expect(state.getPeerMetadata(groupID, 'peer1')).to.deep.equal(new Uint8Array([0b1010])) + }) + + it('should merge metadata from multiple peers', () => { + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + const groupID = makeGroupID(1) + + state.updateMetadata(groupID, 'peer1', new Uint8Array([0b1010])) + state.updateMetadata(groupID, 'peer2', new Uint8Array([0b0101])) + + const local = state.getLocalMetadata(groupID) + expect(local).to.deep.equal(new Uint8Array([0b1111])) + + // Each peer's individual metadata is tracked + expect(state.getPeerMetadata(groupID, 'peer1')).to.deep.equal(new Uint8Array([0b1010])) + expect(state.getPeerMetadata(groupID, 'peer2')).to.deep.equal(new Uint8Array([0b0101])) + }) + + it('should evict oldest group when at capacity', () => { + sandbox.useFakeTimers() + + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + + // Fill up to capacity + state.updateMetadata(makeGroupID(1), 'peer1', new Uint8Array([1])) + sandbox.clock.tick(10) + state.updateMetadata(makeGroupID(2), 'peer1', new Uint8Array([2])) + sandbox.clock.tick(10) + state.updateMetadata(makeGroupID(3), 'peer1', new Uint8Array([3])) + + expect(state.size).to.equal(3) + + // Adding a 4th should evict the oldest (group 1) + sandbox.clock.tick(10) + state.updateMetadata(makeGroupID(4), 'peer1', new Uint8Array([4])) + + expect(state.size).to.equal(3) + expect(state.hasGroup(makeGroupID(1))).to.be.false() + expect(state.hasGroup(makeGroupID(2))).to.be.true() + expect(state.hasGroup(makeGroupID(3))).to.be.true() + expect(state.hasGroup(makeGroupID(4))).to.be.true() + }) + + it('should prune expired groups', () => { + sandbox.useFakeTimers() + + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + + state.updateMetadata(makeGroupID(1), 'peer1', new Uint8Array([1])) + sandbox.clock.tick(1000) + state.updateMetadata(makeGroupID(2), 'peer1', new Uint8Array([2])) + + expect(state.size).to.equal(2) + + // Advance past TTL for group 1 + sandbox.clock.tick(groupTTLMs) + + const pruned = state.pruneExpired() + expect(pruned).to.equal(1) + expect(state.size).to.equal(1) + expect(state.hasGroup(makeGroupID(1))).to.be.false() + expect(state.hasGroup(makeGroupID(2))).to.be.true() + }) + + it('should remove a peer from all groups', () => { + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + + state.updateMetadata(makeGroupID(1), 'peer1', new Uint8Array([0b1010])) + state.updateMetadata(makeGroupID(1), 'peer2', new Uint8Array([0b0101])) + state.updateMetadata(makeGroupID(2), 'peer1', new Uint8Array([0b1100])) + + state.removePeer('peer1') + + expect(state.getPeerMetadata(makeGroupID(1), 'peer1')).to.be.undefined() + expect(state.getPeerMetadata(makeGroupID(1), 'peer2')).to.deep.equal(new Uint8Array([0b0101])) + expect(state.getPeerMetadata(makeGroupID(2), 'peer1')).to.be.undefined() + }) + + it('should return groups for gossip', () => { + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + + state.updateMetadata(makeGroupID(1), 'peer1', new Uint8Array([0b1010])) + state.updateMetadata(makeGroupID(2), 'peer1', new Uint8Array([0b0101])) + + const gossipGroups = state.getGroupsForGossip() + expect(gossipGroups.size).to.equal(2) + }) + + it('should clear all state', () => { + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + + state.updateMetadata(makeGroupID(1), 'peer1', new Uint8Array([1])) + state.updateMetadata(makeGroupID(2), 'peer1', new Uint8Array([2])) + + state.clear() + + expect(state.size).to.equal(0) + expect(state.hasGroup(makeGroupID(1))).to.be.false() + }) + + it('should return undefined for unknown group/peer', () => { + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + + expect(state.getLocalMetadata(makeGroupID(99))).to.be.undefined() + expect(state.getPeerMetadata(makeGroupID(99), 'peer1')).to.be.undefined() + expect(state.hasGroup(makeGroupID(99))).to.be.false() + }) + + it('should replace peer metadata on second update for same group', () => { + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + const groupID = makeGroupID(1) + + // First update + state.updateMetadata(groupID, 'peer1', new Uint8Array([0b1010])) + expect(state.getPeerMetadata(groupID, 'peer1')).to.deep.equal(new Uint8Array([0b1010])) + + // Second update from same peer, same group, different metadata + state.updateMetadata(groupID, 'peer1', new Uint8Array([0b0101])) + expect(state.getPeerMetadata(groupID, 'peer1')).to.deep.equal(new Uint8Array([0b0101])) + + // Local metadata is cumulative (bitwise OR of all received) + // First 0b1010 | initial 0b0000 = 0b1010, then 0b1010 | 0b0101 = 0b1111 + expect(state.getLocalMetadata(groupID)).to.deep.equal(new Uint8Array([0b1111])) + }) + + it('should evict by access time not creation time (LRU)', () => { + sandbox.useFakeTimers() + + const state = new PartialMessageState(merger, maxGroups, groupTTLMs) + + // Create groups 1, 2, 3 in order + state.updateMetadata(makeGroupID(1), 'peer1', new Uint8Array([1])) + sandbox.clock.tick(10) + state.updateMetadata(makeGroupID(2), 'peer1', new Uint8Array([2])) + sandbox.clock.tick(10) + state.updateMetadata(makeGroupID(3), 'peer1', new Uint8Array([3])) + + // Access group 1 again, making group 2 the least-recently-accessed + sandbox.clock.tick(10) + state.getLocalMetadata(makeGroupID(1)) + + // Adding group 4 should evict group 2 (least recently accessed), not group 1 + sandbox.clock.tick(10) + state.updateMetadata(makeGroupID(4), 'peer1', new Uint8Array([4])) + + expect(state.size).to.equal(3) + expect(state.hasGroup(makeGroupID(1))).to.be.true() + expect(state.hasGroup(makeGroupID(2))).to.be.false() + expect(state.hasGroup(makeGroupID(3))).to.be.true() + expect(state.hasGroup(makeGroupID(4))).to.be.true() + }) +}) diff --git a/packages/gossipsub/test/partial-messages/protobuf-round-trip.spec.ts b/packages/gossipsub/test/partial-messages/protobuf-round-trip.spec.ts new file mode 100644 index 0000000000..23faa347c9 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/protobuf-round-trip.spec.ts @@ -0,0 +1,161 @@ +import { expect } from 'aegir/chai' +import { RPC } from '../../src/message/rpc.js' + +describe('partial messages - protobuf round-trip', () => { + it('should encode and decode SubOpts with only requestsPartial set', () => { + const subOpts: RPC.SubOpts = { + subscribe: true, + topic: 'test-topic', + requestsPartial: true + } + + const encoded = RPC.SubOpts.encode(subOpts) + const decoded = RPC.SubOpts.decode(encoded) + + expect(decoded.subscribe).to.equal(true) + expect(decoded.topic).to.equal('test-topic') + expect(decoded.requestsPartial).to.equal(true) + expect(decoded.supportsSendingPartial).to.be.undefined() + }) + + it('should encode and decode SubOpts with only supportsSendingPartial set', () => { + const subOpts: RPC.SubOpts = { + subscribe: true, + topic: 'test-topic', + supportsSendingPartial: true + } + + const encoded = RPC.SubOpts.encode(subOpts) + const decoded = RPC.SubOpts.decode(encoded) + + expect(decoded.subscribe).to.equal(true) + expect(decoded.topic).to.equal('test-topic') + expect(decoded.requestsPartial).to.be.undefined() + expect(decoded.supportsSendingPartial).to.equal(true) + }) + + it('should encode and decode SubOpts with partial fields', () => { + const subOpts: RPC.SubOpts = { + subscribe: true, + topic: 'test-topic', + requestsPartial: true, + supportsSendingPartial: false + } + + const encoded = RPC.SubOpts.encode(subOpts) + const decoded = RPC.SubOpts.decode(encoded) + + expect(decoded.subscribe).to.equal(true) + expect(decoded.topic).to.equal('test-topic') + expect(decoded.requestsPartial).to.equal(true) + expect(decoded.supportsSendingPartial).to.equal(false) + }) + + it('should encode and decode ControlExtensions', () => { + const extensions: RPC.ControlExtensions = { + partialMessages: true + } + + const encoded = RPC.ControlExtensions.encode(extensions) + const decoded = RPC.ControlExtensions.decode(encoded) + + expect(decoded.partialMessages).to.equal(true) + }) + + it('should encode and decode PartialMessagesExtension', () => { + const partial: RPC.PartialMessagesExtension = { + topicID: new Uint8Array([1, 2, 3]), + groupID: new Uint8Array([4, 5, 6]), + partialMessage: new Uint8Array([7, 8, 9]), + partsMetadata: new Uint8Array([10, 11, 12]) + } + + const encoded = RPC.PartialMessagesExtension.encode(partial) + const decoded = RPC.PartialMessagesExtension.decode(encoded) + + expect(decoded.topicID).to.deep.equal(new Uint8Array([1, 2, 3])) + expect(decoded.groupID).to.deep.equal(new Uint8Array([4, 5, 6])) + expect(decoded.partialMessage).to.deep.equal(new Uint8Array([7, 8, 9])) + expect(decoded.partsMetadata).to.deep.equal(new Uint8Array([10, 11, 12])) + }) + + it('should encode and decode RPC with partial field', () => { + const rpc: RPC = { + subscriptions: [{ + subscribe: true, + topic: 'test', + requestsPartial: true, + supportsSendingPartial: true + }], + messages: [], + control: { + ihave: [], + iwant: [], + graft: [], + prune: [], + idontwant: [], + extensions: { partialMessages: true } + }, + partial: { + topicID: new Uint8Array([1]), + groupID: new Uint8Array([2]), + partsMetadata: new Uint8Array([3]) + } + } + + const encoded = RPC.encode(rpc) + const decoded = RPC.decode(encoded) + + expect(decoded.subscriptions[0].requestsPartial).to.equal(true) + expect(decoded.subscriptions[0].supportsSendingPartial).to.equal(true) + expect(decoded.control?.extensions?.partialMessages).to.equal(true) + expect(decoded.partial?.topicID).to.deep.equal(new Uint8Array([1])) + expect(decoded.partial?.groupID).to.deep.equal(new Uint8Array([2])) + expect(decoded.partial?.partsMetadata).to.deep.equal(new Uint8Array([3])) + }) + + it('should encode and decode PartialMessagesExtension without partialMessage', () => { + const partial: RPC.PartialMessagesExtension = { + topicID: new Uint8Array([1, 2, 3]), + groupID: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([10, 11, 12]) + } + + const encoded = RPC.PartialMessagesExtension.encode(partial) + const decoded = RPC.PartialMessagesExtension.decode(encoded) + + expect(decoded.topicID).to.deep.equal(new Uint8Array([1, 2, 3])) + expect(decoded.groupID).to.deep.equal(new Uint8Array([4, 5, 6])) + expect(decoded.partialMessage).to.be.undefined() + expect(decoded.partsMetadata).to.deep.equal(new Uint8Array([10, 11, 12])) + }) + + it('should encode and decode PartialMessagesExtension without partsMetadata', () => { + const partial: RPC.PartialMessagesExtension = { + topicID: new Uint8Array([1, 2, 3]), + groupID: new Uint8Array([4, 5, 6]) + } + + const encoded = RPC.PartialMessagesExtension.encode(partial) + const decoded = RPC.PartialMessagesExtension.decode(encoded) + + expect(decoded.topicID).to.deep.equal(new Uint8Array([1, 2, 3])) + expect(decoded.groupID).to.deep.equal(new Uint8Array([4, 5, 6])) + expect(decoded.partsMetadata).to.be.undefined() + }) + + it('should be backward compatible - old format decodes without partial fields', () => { + const rpc: RPC = { + subscriptions: [{ subscribe: true, topic: 'test' }], + messages: [] + } + + const encoded = RPC.encode(rpc) + const decoded = RPC.decode(encoded) + + expect(decoded.subscriptions[0].requestsPartial).to.be.undefined() + expect(decoded.subscriptions[0].supportsSendingPartial).to.be.undefined() + expect(decoded.control).to.be.undefined() + expect(decoded.partial).to.be.undefined() + }) +}) diff --git a/packages/gossipsub/test/partial-messages/publish-partial.spec.ts b/packages/gossipsub/test/partial-messages/publish-partial.spec.ts new file mode 100644 index 0000000000..5ee338e1a6 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/publish-partial.spec.ts @@ -0,0 +1,288 @@ +import { expect } from 'aegir/chai' +import { RPC } from '../../src/message/rpc.js' +import { setupTwoNodes, teardownTwoNodes } from './utils.js' +import type { TwoNodeContext } from './utils.js' +import type { PartialSubscriptionOpts } from '../../src/types.js' + +describe('partial messages - publishPartial', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should update local PartialMessageState', () => { + const topic = 'test-topic' + const gsA = ctx.nodeA.pubsub as any + + // Subscribe nodeA with partial support + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + ctx.nodeA.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // Verify local state was updated + const state = gsA.partialMessageState.get(topic) + expect(state).to.not.be.undefined() + expect(state.hasGroup(new Uint8Array([1, 2, 3]))).to.be.true() + }) + + it('should send partial RPC to peers with requestsPartial', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Subscribe nodeA + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Add nodeB to the topic's peer set + if (!gsA.topics.has(topic)) { + gsA.topics.set(topic, new Set()) + } + gsA.topics.get(topic).add(bId) + + // Set nodeB's partial opts indicating it requests partial messages + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: true, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + // Spy on sendRpc to capture what gets sent + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + // Publish partial from nodeA + ctx.nodeA.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // Verify the partial RPC was sent to nodeB + const sentToB = sentRpcs.find(s => s.peerId === bId) + expect(sentToB).to.not.be.undefined() + expect(sentToB?.rpc.partial).to.not.be.undefined() + expect(sentToB?.rpc.partial?.topicID).to.deep.equal(new TextEncoder().encode(topic)) + expect(sentToB?.rpc.partial?.groupID).to.deep.equal(new Uint8Array([1, 2, 3])) + expect(sentToB?.rpc.partial?.partialMessage).to.deep.equal(new Uint8Array([4, 5, 6])) + expect(sentToB?.rpc.partial?.partsMetadata).to.deep.equal(new Uint8Array([0b1010])) + }) + + it('should send metadata-only to peers with supportsSendingPartial but not requestsPartial', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Subscribe nodeA + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Add nodeB to the topic's peer set + if (!gsA.topics.has(topic)) { + gsA.topics.set(topic, new Set()) + } + gsA.topics.get(topic).add(bId) + + // Set nodeB as supporting sending but NOT requesting partial + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: false, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + // Spy on sendRpc + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + // Publish partial + ctx.nodeA.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // Verify metadata-only partial was sent (no partialMessage field) + const sentToB = sentRpcs.find(s => s.peerId === bId) + expect(sentToB).to.not.be.undefined() + expect(sentToB?.rpc.partial).to.not.be.undefined() + expect(sentToB?.rpc.partial?.partsMetadata).to.deep.equal(new Uint8Array([0b1010])) + expect(sentToB?.rpc.partial?.partialMessage).to.be.undefined() + }) + + it('should not send partial to peers without partial opts', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Subscribe nodeA + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Add nodeB to the topic's peer set + if (!gsA.topics.has(topic)) { + gsA.topics.set(topic, new Set()) + } + gsA.topics.get(topic).add(bId) + + // Do NOT set any peerPartialOpts for nodeB + + // Spy on sendRpc + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + ctx.nodeA.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // No partial RPC should be sent to nodeB + const sentToB = sentRpcs.find(s => s.peerId === bId && s.rpc.partial != null) + expect(sentToB).to.be.undefined() + }) + + it('should not send partial to peers subscribed to different topic', () => { + const topicA = 'topic-a' + const topicB = 'topic-b' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Subscribe nodeA to topic A + ctx.nodeA.pubsub.subscribePartial(topicA, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Add nodeB to topic A's peer set + if (!gsA.topics.has(topicA)) { + gsA.topics.set(topicA, new Set()) + } + gsA.topics.get(topicA).add(bId) + + // Set nodeB's partial opts for topic B only (not topic A) + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topicB, { + requestsPartial: true, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + // Spy on sendRpc + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + ctx.nodeA.pubsub.publishPartial({ + topic: topicA, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // No partial RPC should be sent to nodeB for topic A + const sentToB = sentRpcs.find(s => s.peerId === bId && s.rpc.partial != null) + expect(sentToB).to.be.undefined() + }) + + it('should create PartialMessageState on-demand when publishing', () => { + const topic = 'test-topic' + const gsA = ctx.nodeA.pubsub as any + + // Do NOT call subscribePartial, so no state exists yet + expect(gsA.partialMessageState.has(topic)).to.be.false() + + ctx.nodeA.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([1, 2, 3]), + partialMessage: new Uint8Array([4, 5, 6]), + partsMetadata: new Uint8Array([0b1010]) + }) + + // State should be created on-demand + expect(gsA.partialMessageState.has(topic)).to.be.true() + const state = gsA.partialMessageState.get(topic) + expect(state.hasGroup(new Uint8Array([1, 2, 3]))).to.be.true() + }) + + it('should send partial data to eligible non-mesh peers', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + if (!gsA.topics.has(topic)) { + gsA.topics.set(topic, new Set()) + } + gsA.topics.get(topic).add(bId) + + // Ensure peer is not in mesh for this topic + const meshPeers = gsA.mesh.get(topic) + if (meshPeers != null) { + meshPeers.delete(bId) + } + + gsA.peerPartialOpts.set(bId, new Map()) + gsA.peerPartialOpts.get(bId).set(topic, { + requestsPartial: true, + supportsSendingPartial: true + } as PartialSubscriptionOpts) + + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + ctx.nodeA.pubsub.publishPartial({ + topic, + groupID: new Uint8Array([7, 8, 9]), + partialMessage: new Uint8Array([1, 2, 3]), + partsMetadata: new Uint8Array([0b1111]) + }) + + const sentToB = sentRpcs.find((entry) => entry.peerId === bId && entry.rpc.partial != null) + expect(sentToB).to.not.be.undefined() + expect(sentToB?.rpc.partial?.partialMessage).to.deep.equal(new Uint8Array([1, 2, 3])) + }) +}) diff --git a/packages/gossipsub/test/partial-messages/subscription-signaling.spec.ts b/packages/gossipsub/test/partial-messages/subscription-signaling.spec.ts new file mode 100644 index 0000000000..9293c75972 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/subscription-signaling.spec.ts @@ -0,0 +1,324 @@ +import { expect } from 'aegir/chai' +import { RPC } from '../../src/message/rpc.js' +import { setupTwoNodes, teardownTwoNodes } from './utils.js' +import type { TwoNodeContext } from './utils.js' +import type { PartialSubscriptionOpts } from '../../src/types.js' + +describe('partial messages - subscription signaling', () => { + let ctx: TwoNodeContext + + beforeEach(async () => { + ctx = await setupTwoNodes() + }) + + afterEach(async () => { + await teardownTwoNodes(ctx) + }) + + it('should include partial flags in SubOpts when subscribePartial is called', async () => { + const topic = 'test-topic' + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Verify partialTopics was updated + const partialOpts = ctx.nodeA.pubsub.partialTopics.get(topic) + expect(partialOpts).to.not.be.undefined() + expect(partialOpts?.requestsPartial).to.be.true() + expect(partialOpts?.supportsSendingPartial).to.be.true() + + // Verify the topic was subscribed + expect(ctx.nodeA.pubsub.getTopics()).to.include(topic) + }) + + it('should remove partial flags when unsubscribePartial is called', async () => { + const topic = 'test-topic' + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + expect(ctx.nodeA.pubsub.partialTopics.has(topic)).to.be.true() + + ctx.nodeA.pubsub.unsubscribePartial(topic) + + expect(ctx.nodeA.pubsub.partialTopics.has(topic)).to.be.false() + }) + + it('should track peer partial opts when receiving subscription with partial flags', async () => { + const topic = 'test-topic' + const aId = ctx.nodeA.components.peerId.toString() + + // Simulate nodeB receiving an RPC from nodeA with partial subscription flags + // This tests the handleReceivedRpc path that processes partial SubOpts + const gsB = ctx.nodeB.pubsub as any + const rpc: RPC = { + subscriptions: [{ + subscribe: true, + topic, + requestsPartial: true, + supportsSendingPartial: true + }], + messages: [] + } + + // Directly call the RPC processing on nodeB + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, rpc) + + // Verify nodeB tracked nodeA's partial opts + const peerOpts = gsB.peerPartialOpts.get(aId) + expect(peerOpts).to.not.be.undefined() + const topicOpts = peerOpts?.get(topic) + expect(topicOpts?.requestsPartial).to.be.true() + expect(topicOpts?.supportsSendingPartial).to.be.true() + + // Verify nodeB sees nodeA subscribed + const subscribers = ctx.nodeB.pubsub.getSubscribers(topic) + expect(subscribers.map(p => p.toString())).to.include(aId) + }) + + it('should remove peer partial opts on unsubscribe', async () => { + const topic = 'test-topic' + const aId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + // First subscribe + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: true, + topic, + requestsPartial: true, + supportsSendingPartial: true + }], + messages: [] + }) + + expect(gsB.peerPartialOpts.get(aId)?.has(topic)).to.be.true() + + // Then unsubscribe + gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: false, + topic, + requestsPartial: true, + supportsSendingPartial: true + }], + messages: [] + }) + + // The topic should be removed from the peer's opts + expect(gsB.peerPartialOpts.get(aId)?.has(topic) ?? false).to.be.false() + }) + + it('should normalize received peer opts when requestsPartial is true', async () => { + const topic = 'test-topic' + const aId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + await gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: true, + topic, + requestsPartial: true, + supportsSendingPartial: false + }], + messages: [] + }) + + const peerOpts = gsB.peerPartialOpts.get(aId)?.get(topic) + expect(peerOpts).to.not.be.undefined() + expect(peerOpts?.requestsPartial).to.be.true() + expect(peerOpts?.supportsSendingPartial).to.be.true() + }) + + it('should enforce supportsSendingPartial when requestsPartial is true', () => { + const topic = 'test-topic' + + // Per spec: "If a node requests partial messages, it MUST support sending partial messages." + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: false + }) + + const opts = ctx.nodeA.pubsub.partialTopics.get(topic) + expect(opts).to.not.be.undefined() + expect(opts?.requestsPartial).to.be.true() + expect(opts?.supportsSendingPartial).to.be.true() + }) + + it('should normalize outgoing SubOpts when requestsPartial is true', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: false + }) + + const sentToB = sentRpcs.find(s => + s.peerId === bId && + s.rpc.subscriptions.some(sub => sub.topic === topic) + ) + expect(sentToB).to.not.be.undefined() + + const sub = sentToB?.rpc.subscriptions.find(s => s.topic === topic) + expect(sub?.requestsPartial).to.be.true() + expect(sub?.supportsSendingPartial).to.be.true() + }) + + it('should clear stale peer partial opts when peer re-subscribes without partial flags', async () => { + const topic = 'test-topic' + const aId = ctx.nodeA.components.peerId.toString() + const gsB = ctx.nodeB.pubsub as any + + // Initial subscription with partial flags + await gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: true, + topic, + requestsPartial: true, + supportsSendingPartial: true + }], + messages: [] + }) + + expect(gsB.peerPartialOpts.get(aId)?.get(topic)?.requestsPartial).to.be.true() + + // Re-subscribe without partial flags (how unsubscribePartial re-advertises) + await gsB.handleReceivedRpc(ctx.nodeA.components.peerId, { + subscriptions: [{ + subscribe: true, + topic + }], + messages: [] + }) + + // Stale partial opts should be removed + expect(gsB.peerPartialOpts.get(aId)?.has(topic) ?? false).to.be.false() + }) + + it('should send updated SubOpts with partial flags to connected peers', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // Spy on sendRpc to capture what gets sent when subscribing + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + // nodeA subscribes with partial support + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Verify that an RPC was sent to nodeB with partial flags in SubOpts + const sentToB = sentRpcs.find(s => + s.peerId === bId && + s.rpc.subscriptions.some(sub => + sub.topic === topic && sub.requestsPartial === true + ) + ) + expect(sentToB).to.not.be.undefined() + const sub = sentToB?.rpc.subscriptions.find(s => s.topic === topic) + expect(sub?.requestsPartial).to.be.true() + expect(sub?.supportsSendingPartial).to.be.true() + }) + + it('should re-send subscription without partial flags on unsubscribePartial', () => { + const topic = 'test-topic' + const bId = ctx.nodeB.components.peerId.toString() + const gsA = ctx.nodeA.pubsub as any + + // First subscribe with partial + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + + // Now spy on sendRpc to capture the re-send + const sentRpcs: Array<{ peerId: string, rpc: RPC }> = [] + const origSendRpc = gsA.sendRpc.bind(gsA) + gsA.sendRpc = (id: string, rpc: RPC): boolean => { + sentRpcs.push({ peerId: id, rpc }) + return origSendRpc(id, rpc) + } + + // Unsubscribe partial + ctx.nodeA.pubsub.unsubscribePartial(topic) + + // Verify that an RPC was sent to nodeB with SubOpts without partial flags + const sentToB = sentRpcs.find(s => + s.peerId === bId && + s.rpc.subscriptions.some(sub => sub.topic === topic) + ) + expect(sentToB).to.not.be.undefined() + const sub = sentToB?.rpc.subscriptions.find(s => s.topic === topic) + // After unsubscribePartial, the topic is no longer in partialTopics, + // so sendSubscriptions won't include partial flags + expect(sub?.requestsPartial).to.be.undefined() + expect(sub?.supportsSendingPartial).to.be.undefined() + }) + + it('should handle supportsSendingPartial without requestsPartial', () => { + const topic = 'test-topic' + + ctx.nodeA.pubsub.subscribePartial(topic, { + requestsPartial: false, + supportsSendingPartial: true + }) + + const opts = ctx.nodeA.pubsub.partialTopics.get(topic) + expect(opts).to.not.be.undefined() + expect(opts?.requestsPartial).to.be.false() + expect(opts?.supportsSendingPartial).to.be.true() + + // Verify the topic was subscribed + expect(ctx.nodeA.pubsub.getTopics()).to.include(topic) + }) + + it('should include partial flags in outgoing SubOpts', () => { + const topic = 'test-topic' + const gsA = ctx.nodeA.pubsub as any + + // Set up partial topic + ctx.nodeA.pubsub.partialTopics.set(topic, { + requestsPartial: true, + supportsSendingPartial: true + }) + gsA.subscriptions.add(topic) + + // Build the SubOpts like sendSubscriptions does + const subOpts: RPC.SubOpts = { topic, subscribe: true } + const partialOpts = ctx.nodeA.pubsub.partialTopics.get(topic) + if (partialOpts != null) { + subOpts.requestsPartial = partialOpts.requestsPartial + subOpts.supportsSendingPartial = partialOpts.supportsSendingPartial + } + + // Verify the SubOpts contain partial flags + expect(subOpts.requestsPartial).to.be.true() + expect(subOpts.supportsSendingPartial).to.be.true() + + // Verify encoding preserves the flags + const encoded = RPC.SubOpts.encode(subOpts) + const decoded = RPC.SubOpts.decode(encoded) + expect(decoded.requestsPartial).to.equal(true) + expect(decoded.supportsSendingPartial).to.equal(true) + }) +}) diff --git a/packages/gossipsub/test/partial-messages/utils.ts b/packages/gossipsub/test/partial-messages/utils.ts new file mode 100644 index 0000000000..13412eb015 --- /dev/null +++ b/packages/gossipsub/test/partial-messages/utils.ts @@ -0,0 +1,28 @@ +import { stop } from '@libp2p/interface' +import { createComponents, connectPubsubNodes } from '../utils/create-pubsub.js' +import type { GossipSubAndComponents } from '../utils/create-pubsub.js' + +export interface TwoNodeContext { + nodeA: GossipSubAndComponents + nodeB: GossipSubAndComponents +} + +export async function setupTwoNodes (): Promise { + const nodeA = await createComponents({ + init: { + emitSelf: false + } + }) + const nodeB = await createComponents({ + init: { + emitSelf: false + } + }) + await connectPubsubNodes(nodeA, nodeB) + return { nodeA, nodeB } +} + +export async function teardownTwoNodes (ctx: TwoNodeContext): Promise { + await stop(ctx.nodeA.pubsub, ...Object.entries(ctx.nodeA.components)) + await stop(ctx.nodeB.pubsub, ...Object.entries(ctx.nodeB.components)) +} diff --git a/packages/gossipsub/test/signature-policy.spec.ts b/packages/gossipsub/test/signature-policy.spec.ts index 0c1c3dcadd..510306e633 100644 --- a/packages/gossipsub/test/signature-policy.spec.ts +++ b/packages/gossipsub/test/signature-policy.spec.ts @@ -36,6 +36,9 @@ describe('signature policy', () => { it('should publish a message', async () => { const topic = 'foo' + // set up subscription-change listeners before subscribing and connecting + const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) + // add subscriptions to each node nodes.forEach((n) => { n.pubsub.subscribe(topic) }) @@ -43,7 +46,7 @@ describe('signature policy', () => { await connectAllPubSubNodes(nodes) // wait for subscriptions to be transmitted - await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))) + await Promise.all(subscriptionPromises) // await mesh rebalancing await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) @@ -56,6 +59,9 @@ describe('signature policy', () => { it('should forward a valid message', async () => { const topic = 'foo' + // set up subscription-change listeners before subscribing and connecting + const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) + // add subscriptions to each node nodes.forEach((n) => { n.pubsub.subscribe(topic) }) @@ -63,7 +69,7 @@ describe('signature policy', () => { await Promise.all(Array.from({ length: numNodes - 1 }, async (_, i) => connectPubsubNodes(nodes[i], nodes[i + 1]))) // wait for subscriptions to be transmitted - await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))) + await Promise.all(subscriptionPromises) // await mesh rebalancing await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) @@ -91,10 +97,10 @@ describe('signature policy', () => { // add subscriptions to each node nodes.forEach((n) => { n.pubsub.subscribe(topic) }) - // connect in a line + // connect in a line (not all nodes are connected - last node is isolated) await Promise.all(Array.from({ length: numNodes - 1 }, async (_, i) => connectPubsubNodes(nodes[i], nodes[i + 1]))) - // await mesh rebalancing + // await mesh rebalancing - subscriptions are exchanged during connection await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) // publish a message on the topic @@ -134,6 +140,9 @@ describe('signature policy', () => { it('should publish a message', async () => { const topic = 'foo' + // set up subscription-change listeners before subscribing and connecting + const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) + // add subscriptions to each node nodes.forEach((n) => { n.pubsub.subscribe(topic) }) @@ -141,7 +150,7 @@ describe('signature policy', () => { await connectAllPubSubNodes(nodes) // wait for subscriptions to be transmitted - await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))) + await Promise.all(subscriptionPromises) // await mesh rebalancing await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) @@ -154,6 +163,9 @@ describe('signature policy', () => { it('should forward a valid message', async () => { const topic = 'foo' + // set up subscription-change listeners before subscribing and connecting + const subscriptionPromises = nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) + // add subscriptions to each node nodes.forEach((n) => { n.pubsub.subscribe(topic) }) @@ -161,7 +173,7 @@ describe('signature policy', () => { await Promise.all(Array.from({ length: numNodes - 1 }, async (_, i) => connectPubsubNodes(nodes[i], nodes[i + 1]))) // wait for subscriptions to be transmitted - await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))) + await Promise.all(subscriptionPromises) // await mesh rebalancing await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) @@ -189,10 +201,10 @@ describe('signature policy', () => { // add subscriptions to each node nodes.forEach((n) => { n.pubsub.subscribe(topic) }) - // connect in a line + // connect in a line (not all nodes are connected - last node is isolated) await Promise.all(Array.from({ length: numNodes - 1 }, async (_, i) => connectPubsubNodes(nodes[i], nodes[i + 1]))) - // await mesh rebalancing + // await mesh rebalancing - subscriptions are exchanged during connection await Promise.all(nodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) // publish a message on the topic diff --git a/packages/gossipsub/test/utils/create-pubsub.ts b/packages/gossipsub/test/utils/create-pubsub.ts index 4af041fdaf..d01abcf14f 100644 --- a/packages/gossipsub/test/utils/create-pubsub.ts +++ b/packages/gossipsub/test/utils/create-pubsub.ts @@ -7,6 +7,7 @@ import { persistentPeerStore } from '@libp2p/peer-store' import { mockMuxer, multiaddrConnectionPair } from '@libp2p/utils' import { multiaddr } from '@multiformats/multiaddr' import { MemoryDatastore } from 'datastore-core' +import pWaitFor from 'p-wait-for' import { stubInterface } from 'sinon-ts' import { GossipSub as GossipSubClass } from '../../src/gossipsub.ts' import { gossipsub } from '../../src/index.js' @@ -99,6 +100,7 @@ export const connectPubsubNodes = async (a: GossipSubAndComponents, b: GossipSub for (const call of a.components.registrar.handle.getCalls()) { if (call.args[0] === evt.detail.protocol) { call.args[1](evt.detail, outboundConnection) + break } } }) @@ -107,6 +109,7 @@ export const connectPubsubNodes = async (a: GossipSubAndComponents, b: GossipSub for (const call of b.components.registrar.handle.getCalls()) { if (call.args[0] === evt.detail.protocol) { call.args[1](evt.detail, inboundConnection) + break } } }) @@ -150,6 +153,19 @@ export const connectPubsubNodes = async (a: GossipSubAndComponents, b: GossipSub } } } + + // Wait for both outbound and inbound streams to be established. Outbound + // streams are created asynchronously via the outboundInflightQueue. Inbound + // streams are created when the remote muxer receives the 'create' message + // and fires the 'stream' event, which invokes the registrar handler. + const aId = a.components.peerId.toString() + const bId = b.components.peerId.toString() + await pWaitFor(() => { + const gsA = a.pubsub as any + const gsB = b.pubsub as any + return gsA.streamsOutbound.has(bId) && gsB.streamsOutbound.has(aId) && + gsA.streamsInbound.has(bId) && gsB.streamsInbound.has(aId) + }, { timeout: 10000 }) } export const connectAllPubSubNodes = async (components: GossipSubAndComponents[]): Promise => { diff --git a/packages/utils/src/mock-muxer.ts b/packages/utils/src/mock-muxer.ts index c653033fb9..1ac70fd108 100644 --- a/packages/utils/src/mock-muxer.ts +++ b/packages/utils/src/mock-muxer.ts @@ -271,6 +271,7 @@ class MockMuxer extends AbstractStreamMuxer { this.log.trace('createStream %s %s', direction, id) return new MockMuxedStream({ + protocol: '', ...this.streamOptions, ...options, id, @@ -278,8 +279,7 @@ class MockMuxer extends AbstractStreamMuxer { log: this.log.newScope(`stream:${direction}:${id}`), sendMessage: this.sendMessage, encoding: this.encoding, - maxMessageSize: this.maxMessageSize - MESSAGE_OVERHEAD, - protocol: '' + maxMessageSize: this.maxMessageSize - MESSAGE_OVERHEAD }) } }