Skip to content

Commit 7b24df1

Browse files
committed
client: force-close abandoned client streams via runtime cleanup
Attach a runtime.AddCleanup callback to every clientStream returned by NewStream so that callers who drop the stream without consuming it have the underlying *stream force-closed and removed from the connection's stream map. Without this safety net a leaked stream's recv buffer fills and the connection's read loop only recovers via the 1-second ErrStreamFull fallback, leaving streamID slots and goroutines pinned until that timeout fires. The cleanup sets recvErr to errStreamAbandoned (unexported - the cleanup runs after the clientStream is unreachable, so no caller is left to match on it as a sentinel) and the abandon surfaces in the receive loop's "failed to handle message" log via the existing error path. Also adds NewClientWithContext so callers - notably tests - can supply a parent context whose attached logger is used for the client's internal goroutines, and a unit test that drives GC, asserts errStreamAbandoned on the closed stream, verifies the connection is not deadlocked, and captures the abandon log through the supplied context's logger. Bumps the Go minimum to 1.24 for runtime.AddCleanup. Signed-off-by: Derek McGowan <derek@mcg.dev>
1 parent 95e1509 commit 7b24df1

7 files changed

Lines changed: 405 additions & 36 deletions

File tree

client.go

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"io"
2424
"net"
25+
"runtime"
2526
"strings"
2627
"sync"
2728
"syscall"
@@ -107,9 +108,18 @@ func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker
107108
}
108109
}
109110

110-
// NewClient creates a new ttrpc client using the given connection
111+
// NewClient creates a new ttrpc client using the given connection.
112+
// It is equivalent to [NewClientWithContext] with [context.Background] as
113+
// the parent context.
111114
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
112-
ctx, cancel := context.WithCancel(context.Background())
115+
return NewClientWithContext(context.Background(), conn, opts...)
116+
}
117+
118+
// NewClientWithContext creates a new ttrpc client using the given connection,
119+
// deriving the client's internal context from ctx. Cancellation of ctx
120+
// shuts the client down; the client's own Close does not cancel ctx.
121+
func NewClientWithContext(ctx context.Context, conn net.Conn, opts ...ClientOpts) *Client {
122+
ctx, cancel := context.WithCancel(ctx)
113123
channel := newChannel(conn)
114124
c := &Client{
115125
codec: codec{},
@@ -434,10 +444,22 @@ func (c *Client) createStream(flags uint8, b []byte, recvBuf int) (*stream, erro
434444
}
435445

436446
func (c *Client) deleteStream(s *stream) {
447+
c.deleteStreamWithError(s, nil)
448+
}
449+
450+
// deleteStreamWithError removes the stream from the client and closes it,
451+
// propagating the supplied error to anyone still observing the stream via
452+
// receive (the connection read loop) or RecvMsg. A nil error closes the
453+
// stream with the default ErrClosed.
454+
//
455+
// The stream is closed before being removed from the map so that any
456+
// in-flight message dispatch in the read loop observes recvErr through the
457+
// normal receive path rather than falling through to "inactive stream".
458+
func (c *Client) deleteStreamWithError(s *stream, err error) {
459+
s.closeWithError(err)
437460
c.streamLock.Lock()
438461
delete(c.streams, s.id)
439462
c.streamLock.Unlock()
440-
s.closeWithError(nil)
441463
}
442464

443465
func (c *Client) getStream(sid streamID) *stream {
@@ -522,12 +544,46 @@ func (c *Client) NewStream(ctx context.Context, desc *StreamDesc, service, metho
522544
return nil, err
523545
}
524546

525-
return &clientStream{
547+
cs := &clientStream{
526548
ctx: ctx,
527549
s: s,
528550
c: c,
529551
desc: desc,
530-
}, nil
552+
}
553+
// Attach a cleanup as a safety net for callers that drop the stream
554+
// without consuming it to completion. In the common case the stream is
555+
// already closed by the time GC reaches it and the cleanup is a no-op.
556+
// If it is still open, the caller leaked the stream; force-close it
557+
// with errStreamAbandoned so the connection's receive loop is not
558+
// blocked by a buffer that will never drain, and the abandon surfaces
559+
// in logs with a specific cause.
560+
runtime.AddCleanup(cs, finalizeClientStream, clientStreamCleanupArgs{c: c, s: s})
561+
return cs, nil
562+
}
563+
564+
// clientStreamCleanupArgs carries the state needed by finalizeClientStream.
565+
// It must not reference the *clientStream that the cleanup is attached to,
566+
// otherwise the cleanup would never fire.
567+
type clientStreamCleanupArgs struct {
568+
c *Client
569+
s *stream
570+
}
571+
572+
// finalizeClientStream is the runtime.AddCleanup callback registered for each
573+
// clientStream returned by NewStream. The fast path is the common case: the
574+
// stream has already been closed (recvClose is closed) and there is nothing
575+
// to do. The slow path indicates the caller dropped the stream without
576+
// closing it; force-close the stream with errStreamAbandoned so the abandon
577+
// surfaces through the connection read loop's "failed to handle message"
578+
// log when a frame is in flight, and so the receive loop is not blocked by
579+
// a buffer that will never drain.
580+
func finalizeClientStream(args clientStreamCleanupArgs) {
581+
select {
582+
case <-args.s.recvClose:
583+
return
584+
default:
585+
}
586+
args.c.deleteStreamWithError(args.s, errStreamAbandoned)
531587
}
532588

533589
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {

errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ var (
4444
ErrStreamFull = errors.New("ttrpc: stream buffer full")
4545
)
4646

47+
// errStreamAbandoned is set on a stream's recvErr by the runtime cleanup
48+
// when the caller dropped a clientStream without closing it. It is not
49+
// exported because it cannot reach external callers: by the time the
50+
// cleanup runs every reference to the clientStream is gone, so no RecvMsg
51+
// or dispatch is left to observe it. Its purpose is to differentiate the
52+
// abandon case in the connection read loop's error log from a normal close.
53+
var errStreamAbandoned = errors.New("ttrpc: stream abandoned by caller")
54+
4755
// OversizedMessageErr is used to indicate refusal to send an oversized message.
4856
// It wraps a ResourceExhausted grpc Status together with the offending message
4957
// length.

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/containerd/ttrpc
22

3-
go 1.23
3+
go 1.24
44

55
require (
66
github.com/containerd/log v0.1.0
@@ -13,4 +13,4 @@ require (
1313
google.golang.org/protobuf v1.36.0
1414
)
1515

16-
require github.com/sirupsen/logrus v1.9.3 // indirect
16+
require github.com/sirupsen/logrus v1.9.3

services.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,17 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
163163
return nil, status.Errorf(codes.Unimplemented, "method %v", req.Method)
164164
}
165165

166-
// streamRecvBufferSize is the buffer size for stream recv channels. It
167-
// should be large enough to absorb normal bursts without hitting the
168-
// 1-second timeout fallback in receive/data, but small enough that
169-
// per-stream memory overhead stays trivial.
170-
const streamRecvBufferSize = 64
166+
// streamRecvBufferSize is the buffer size for stream recv channels.
167+
//
168+
// Consumers are expected to either process incoming messages immediately
169+
// or hand them off to a goroutine, so the buffer only needs to absorb
170+
// brief scheduling jitter (a GC pause, a blocked syscall, the time to
171+
// pass a value to a worker channel). 16 is comfortably above realistic
172+
// jitter without inflating per-stream memory. Sustained slowness will
173+
// hit the streamFullTimeout fallback in stream.receive / streamHandler.data,
174+
// at which point either runtime.AddCleanup (for abandoned streams) or
175+
// ErrStreamFull (for buggy held-but-unconsumed consumers) recovers.
176+
const streamRecvBufferSize = 16
171177

172178
type streamHandler struct {
173179
ctx context.Context
@@ -203,7 +209,7 @@ func (s *streamHandler) data(unmarshal Unmarshaler) error {
203209
return nil
204210
case <-s.ctx.Done():
205211
return s.ctx.Err()
206-
case <-time.After(time.Second):
212+
case <-time.After(streamFullTimeout):
207213
return ErrStreamFull
208214
}
209215
}

stream.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,23 @@ type streamMessage struct {
2929
payload []byte
3030
}
3131

32+
// streamFullTimeout bounds how long the receive loop will wait for a stream's
33+
// recv buffer to drain before giving up. The fallback prevents a single
34+
// unconsumed stream from indefinitely blocking the connection-level receive
35+
// loop.
36+
//
37+
// Most buffer fillups in practice are abandoned streams (the caller dropped
38+
// the clientStream without consuming it), and those are handled faster by
39+
// the runtime.AddCleanup attached in NewStream than by waiting out this
40+
// timeout. Five seconds therefore primarily bounds head-of-line blocking
41+
// for the rarer "held but not consumed" case (a goroutine leak in the
42+
// caller), where neither the cleanup nor the contract that consumers
43+
// process immediately or hand off to a goroutine can help.
44+
//
45+
// Exposed as a var (rather than const) so tests can extend it to observe
46+
// the abandon-via-cleanup unblock path without racing the timeout.
47+
var streamFullTimeout = 5 * time.Second
48+
3249
type stream struct {
3350
id streamID
3451
sender sender
@@ -92,7 +109,7 @@ func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
92109
return nil
93110
case <-ctx.Done():
94111
return ctx.Err()
95-
case <-time.After(time.Second):
112+
case <-time.After(streamFullTimeout):
96113
s.closeWithError(ErrStreamFull)
97114
return ErrStreamFull
98115
}

0 commit comments

Comments
 (0)