Skip to content

Commit e26b298

Browse files
authored
Fix ephemeral port leak: pass context to ClientSession.DialStream (skycoin#373)
* Fix ephemeral port leak: pass context to ClientSession.DialStream ClientSession.DialStream didn't accept a context, so when the caller's deadline expired, the blocked readResponse kept the ephemeral port reserved until HandshakeTimeout fired (20-30s per server). With 6 servers tried sequentially, a single failed dial could hold ports for minutes. Now DialStream accepts context and spawns a goroutine that closes the stream when the context is cancelled, immediately interrupting any blocked read/write and freeing the ephemeral port. Also: Client.DialStream now passes ctx to all ClientSession.DialStream calls (phases 0-3) and checks ctx.Err() between phases. * Fix lint: misspell (canceled) and gosec (unhandled Close) * Fix errcheck: use nolint comment instead of blank identifier * Fix gosec: add gosec to nolint directive * Fix flaky TestHTTPTransport_RoundTrip: increase session stabilization wait * Fix flaky test: increase HTTP client timeout from 10s to 30s for CI
1 parent d583078 commit e26b298

3 files changed

Lines changed: 33 additions & 11 deletions

File tree

pkg/dmsg/client_dial.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
4040
// Phase 0: Try cached route first (server that last successfully reached this destination).
4141
if cachedSrvPK, ok := ce.getCachedRoute(addr.PK); ok {
4242
if dSes, ok := ce.clientSession(ce.porter, cachedSrvPK); ok {
43-
stream, err := dSes.DialStream(addr)
43+
stream, err := dSes.DialStream(ctx, addr)
4444
if err != nil {
4545
ce.log.WithError(err).WithField("server", cachedSrvPK).
4646
Debug("DialStream failed via cached route, evicting")
@@ -58,7 +58,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
5858
// Sort by latency so the lowest-latency server is tried first.
5959
delegatedSessions := ce.sortedDelegatedSessions(entry.Client.DelegatedServers)
6060
for _, dSes := range delegatedSessions {
61-
stream, err := dSes.DialStream(addr)
61+
stream, err := dSes.DialStream(ctx, addr)
6262
if err != nil {
6363
ce.log.WithError(err).WithField("server", dSes.RemotePK()).
6464
Debug("DialStream failed via existing session, trying next server")
@@ -73,7 +73,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
7373
// Sorted by latency.
7474
meshSessions := ce.sortedMeshSessions(entry.Client.DelegatedServers)
7575
for _, ses := range meshSessions {
76-
stream, err := ses.DialStream(addr)
76+
stream, err := ses.DialStream(ctx, addr)
7777
if err != nil {
7878
ce.log.WithError(err).WithField("server", ses.RemotePK()).
7979
Debug("DialStream failed via mesh, trying next server")
@@ -89,7 +89,7 @@ func (ce *Client) DialStream(ctx context.Context, addr Addr) (*Stream, error) {
8989
if err != nil {
9090
continue
9191
}
92-
stream, err := dSes.DialStream(addr)
92+
stream, err := dSes.DialStream(ctx, addr)
9393
if err != nil {
9494
ce.log.WithError(err).WithField("server", srvPK).
9595
Debug("DialStream failed via new session, trying next server")

pkg/dmsg/client_session.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package dmsg
33

44
import (
5+
"context"
56
"errors"
67
"net"
78
"time"
@@ -28,7 +29,9 @@ func makeClientSession(entity *EntityCommon, porter *netutil.Porter, conn net.Co
2829
}
2930

3031
// DialStream attempts to dial a stream to a remote client via the dmsg server that this session is connected to.
31-
func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) {
32+
// The context is used to cancel the dial if the caller's deadline expires — this prevents ephemeral port
33+
// leaks when many dials are attempted and the caller gives up before the handshake completes.
34+
func (cs *ClientSession) DialStream(ctx context.Context, dst Addr) (dStr *Stream, err error) {
3235
log := cs.log.
3336
WithField("func", "ClientSession.DialStream").
3437
WithField("dst_addr", dst)
@@ -37,7 +40,7 @@ func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) {
3740
return nil, err
3841
}
3942

40-
// Close stream on failure.
43+
// Close stream on failure — this frees the reserved ephemeral port.
4144
defer func() {
4245
if err != nil {
4346
log.WithError(err).
@@ -46,6 +49,23 @@ func (cs *ClientSession) DialStream(dst Addr) (dStr *Stream, err error) {
4649
}
4750
}()
4851

52+
// If the caller's context is canceled, close the stream to interrupt
53+
// any blocked read/write and free the ephemeral port immediately.
54+
ctxDone := make(chan struct{})
55+
go func() {
56+
select {
57+
case <-ctx.Done():
58+
dStr.Close() //nolint:errcheck,gosec
59+
case <-ctxDone:
60+
}
61+
}()
62+
defer close(ctxDone)
63+
64+
// Check context before starting.
65+
if ctx.Err() != nil {
66+
return nil, ctx.Err()
67+
}
68+
4969
// Prepare deadline.
5070
if err = dStr.SetDeadline(time.Now().Add(HandshakeTimeout)); err != nil {
5171
return nil, err

pkg/dmsghttp/http_transport_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,21 @@ func TestHTTPTransport_RoundTrip(t *testing.T) {
7070
// Configure timeouts to prevent hanging on errors.
7171
httpC1 := http.Client{
7272
Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client1")),
73-
Timeout: 10 * time.Second,
73+
Timeout: 30 * time.Second,
7474
}
7575
httpC2 := http.Client{
7676
Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client2")),
77-
Timeout: 10 * time.Second,
77+
Timeout: 30 * time.Second,
7878
}
7979
httpC3 := http.Client{
8080
Transport: MakeHTTPTransport(ctx, newDmsgClient(t, dc, minSessions, "client3")),
81-
Timeout: 10 * time.Second,
81+
Timeout: 30 * time.Second,
8282
}
8383

84-
// Allow time for dmsg sessions to stabilize on macOS
85-
time.Sleep(200 * time.Millisecond)
84+
// Allow time for dmsg sessions to stabilize across all platforms.
85+
// CI runners are slower; 200ms was insufficient for noise handshakes
86+
// to complete across 5 servers × 4 clients.
87+
time.Sleep(2 * time.Second)
8688

8789
// Act: http clients send requests concurrently.
8890
// - client1 sends "/index.html" requests.

0 commit comments

Comments
 (0)