From bcc2bf186647854875dba64eea262d13fb6b32a0 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 7 Jan 2026 01:19:16 +0100 Subject: [PATCH] fix(basic_host): set read deadline before multistream Close to prevent blocking streamWrapper.Close() can block indefinitely when the remote peer is slow or unresponsive during the multistream-select handshake completion. The lazy multistream protocol negotiation defers reading the handshake response until Close() is called. If the remote peer doesn't respond, the read blocks forever, causing goroutine leaks. This is particularly problematic for bitswap servers where taskWorkers can get stuck trying to close streams after sending blocks. The fix sets a read deadline (using DefaultNegotiationTimeout) before calling the multistream Close(), ensuring the operation will time out rather than block indefinitely. Related: https://github.com/multiformats/go-multistream/issues/47 Related: https://github.com/multiformats/go-multistream/pull/48 --- p2p/host/basic/basic_host.go | 5 ++ p2p/host/basic/basic_host_synctest_test.go | 81 ++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 p2p/host/basic/basic_host_synctest_test.go diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index d98aa0e337..abd18795c9 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -683,6 +683,11 @@ func (s *streamWrapper) Write(b []byte) (int, error) { } func (s *streamWrapper) Close() error { + // Set a read deadline to prevent Close() from blocking indefinitely + // waiting for the multistream-select handshake to complete. + // This can happen when the remote peer is slow or unresponsive. + // See: https://github.com/multiformats/go-multistream/issues/47 + _ = s.Stream.SetReadDeadline(time.Now().Add(DefaultNegotiationTimeout)) return s.rw.Close() } diff --git a/p2p/host/basic/basic_host_synctest_test.go b/p2p/host/basic/basic_host_synctest_test.go new file mode 100644 index 0000000000..eb78cfcbcd --- /dev/null +++ b/p2p/host/basic/basic_host_synctest_test.go @@ -0,0 +1,81 @@ +//go:build go1.25 + +package basichost_test + +import ( + "testing" + "testing/synctest" + "time" + + "github.com/libp2p/go-libp2p/core/network" + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/x/simlibp2p" + + "github.com/stretchr/testify/require" +) + +// TestStreamCloseDoesNotHangOnUnresponsivePeer verifies that stream.Close() +// returns within DefaultNegotiationTimeout even when the remote peer never +// completes the multistream handshake. Without the read deadline fix in +// streamWrapper.Close(), this would hang indefinitely. +func TestStreamCloseDoesNotHangOnUnresponsivePeer_synctest(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + h1, h2 := simlibp2p.GetBasicHostPair(t) + defer h1.Close() + defer h2.Close() + + const testProto = "/test/hang" + + // Manually add protocol to peerstore so h1 thinks h2 supports it. + // This makes NewStream use lazy multistream (skipping negotiation until Close). + h1.Peerstore().AddProtocols(h2.ID(), testProto) + + // h2 accepts streams at the network level but never responds to + // multistream protocol negotiation, simulating an unresponsive peer. + h2.Network().SetStreamHandler(func(s network.Stream) { + // Read incoming data but never write back - simulates unresponsive peer + buf := make([]byte, 1024) + for { + _, err := s.Read(buf) + if err != nil { + return + } + } + }) + + // Open stream to h2 - uses lazy multistream because protocol is "known" + s, err := h1.NewStream(ctx, h2.ID(), testProto) + require.NoError(t, err) + + // Trigger the lazy handshake by writing data. + // The write succeeds (buffered), but the read handshake will block + // because h2 never sends a response. + _, err = s.Write([]byte("trigger handshake")) + require.NoError(t, err) + + // Close() should return within DefaultNegotiationTimeout because the fix + // sets a read deadline before calling the underlying Close(). + // Without the fix, this would hang indefinitely. + elapsedCh := make(chan time.Duration) + go func() { + start := time.Now() + _ = s.Close() + elapsedCh <- time.Since(start) + }() + + maxExpected := basichost.DefaultNegotiationTimeout + var elapsed time.Duration + select { + case elapsed = <-elapsedCh: + case <-time.After(maxExpected + time.Second): + t.Fatal("timeout waiting for Close()") + } + + require.Equal(t, elapsed, maxExpected, + "Close() took %v, expected < %v (DefaultNegotiationTimeout + margin)", elapsed, maxExpected) + + t.Logf("Close() returned in %v (limit: %v)", elapsed, maxExpected) + }) +}