diff --git a/packages/gossipsub/src/gossipsub.ts b/packages/gossipsub/src/gossipsub.ts index d0436777fc..886a851b16 100644 --- a/packages/gossipsub/src/gossipsub.ts +++ b/packages/gossipsub/src/gossipsub.ts @@ -672,10 +672,11 @@ export class GossipSub extends TypedEventEmitter implements Typ } try { + const rawStream = await connection.newStream(this.protocols, { + runOnLimitedConnection: this.runOnLimitedConnection + }) const stream = new OutboundStream( - await connection.newStream(this.protocols, { - runOnLimitedConnection: this.runOnLimitedConnection - }), + rawStream, (e) => { this.log.error('outbound pipe error', e) }, { maxBufferSize: this.opts.maxOutboundBufferSize } ) @@ -690,6 +691,14 @@ export class GossipSub extends TypedEventEmitter implements Typ } this.metrics?.peersPerProtocol.inc({ protocol }, 1) + rawStream.addEventListener('close', () => { + if (this.streamsOutbound.get(id) === stream) { + this.streamsOutbound.delete(id) + this.floodsubPeers.delete(id) + this.metrics?.peersPerProtocol.inc({ protocol }, -1) + } + }, { once: true }) + // Immediately send own subscriptions via the newly attached stream if (this.subscriptions.size > 0) { this.log('send subscriptions to', id) diff --git a/packages/gossipsub/test/outbound-stream-lifecycle.spec.ts b/packages/gossipsub/test/outbound-stream-lifecycle.spec.ts new file mode 100644 index 0000000000..337a28e263 --- /dev/null +++ b/packages/gossipsub/test/outbound-stream-lifecycle.spec.ts @@ -0,0 +1,40 @@ +import { stop } from '@libp2p/interface' +import { expect } from 'aegir/chai' +import pWaitFor from 'p-wait-for' +import { createComponentsArray } from './utils/create-pubsub.ts' +import type { GossipSubAndComponents } from './utils/create-pubsub.ts' +import type { Stream } from '@libp2p/interface' + +describe('outbound stream lifecycle', () => { + let nodes: GossipSubAndComponents[] + + beforeEach(async () => { + nodes = await createComponentsArray({ number: 2, connected: true }) + }) + + afterEach(async () => { + await stop(...nodes.reduce((acc, curr) => acc.concat(curr.pubsub, ...Object.entries(curr.components)), [])) + }) + + it('removes an outbound stream from the registry when it closes', async () => { + const [nodeA, nodeB] = nodes + const bId = nodeB.components.peerId.toString() + + // wait until nodeA has opened an outbound stream to nodeB + await pWaitFor(() => nodeA.pubsub.streamsOutbound.has(bId), { timeout: 10000 }) + + const outboundStream = nodeA.pubsub.streamsOutbound.get(bId) + expect(outboundStream).to.not.be.undefined() + + // simulate the underlying stream being reset/closed while the connection + // itself stays open (e.g. a WebRTC stream reset or a relayed stream closing). + // reach the wrapped stream directly to mimic a transport-level reset + const { rawStream } = outboundStream as unknown as { rawStream: Stream } + rawStream.abort(new Error('simulated stream reset')) + + // the closed stream must be removed from the registry, otherwise it blocks + // every future outbound message to this peer for the life of the connection + await pWaitFor(() => !nodeA.pubsub.streamsOutbound.has(bId), { timeout: 10000 }) + expect(nodeA.pubsub.streamsOutbound.has(bId)).to.be.false() + }) +})