Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net"
"runtime"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -107,9 +108,18 @@ func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker
}
}

// NewClient creates a new ttrpc client using the given connection
// NewClient creates a new ttrpc client using the given connection.
// It is equivalent to [NewClientWithContext] with [context.Background] as
// the parent context.
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background())
return NewClientWithContext(context.Background(), conn, opts...)
}

// NewClientWithContext creates a new ttrpc client using the given connection,
// deriving the client's internal context from ctx. Cancellation of ctx
// shuts the client down; the client's own Close does not cancel ctx.
func NewClientWithContext(ctx context.Context, conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(ctx)
channel := newChannel(conn)
c := &Client{
codec: codec{},
Expand Down Expand Up @@ -434,10 +444,22 @@ func (c *Client) createStream(flags uint8, b []byte, recvBuf int) (*stream, erro
}

func (c *Client) deleteStream(s *stream) {
c.deleteStreamWithError(s, nil)
}

// deleteStreamWithError removes the stream from the client and closes it,
// propagating the supplied error to anyone still observing the stream via
// receive (the connection read loop) or RecvMsg. A nil error closes the
// stream with the default ErrClosed.
//
// The stream is closed before being removed from the map so that any
// in-flight message dispatch in the read loop observes recvErr through the
// normal receive path rather than falling through to "inactive stream".
func (c *Client) deleteStreamWithError(s *stream, err error) {
s.closeWithError(err)
c.streamLock.Lock()
delete(c.streams, s.id)
c.streamLock.Unlock()
s.closeWithError(nil)
}

func (c *Client) getStream(sid streamID) *stream {
Expand Down Expand Up @@ -522,12 +544,46 @@ func (c *Client) NewStream(ctx context.Context, desc *StreamDesc, service, metho
return nil, err
}

return &clientStream{
cs := &clientStream{
ctx: ctx,
s: s,
c: c,
desc: desc,
}, nil
}
// Attach a cleanup as a safety net for callers that drop the stream
// without consuming it to completion. In the common case the stream is
// already closed by the time GC reaches it and the cleanup is a no-op.
// If it is still open, the caller leaked the stream; force-close it
// with errStreamAbandoned so the connection's receive loop is not
// blocked by a buffer that will never drain, and the abandon surfaces
// in logs with a specific cause.
runtime.AddCleanup(cs, finalizeClientStream, clientStreamCleanupArgs{c: c, s: s})
return cs, nil
}

// clientStreamCleanupArgs carries the state needed by finalizeClientStream.
// It must not reference the *clientStream that the cleanup is attached to,
// otherwise the cleanup would never fire.
type clientStreamCleanupArgs struct {
c *Client
s *stream
}

// finalizeClientStream is the runtime.AddCleanup callback registered for each
// clientStream returned by NewStream. The fast path is the common case: the
// stream has already been closed (recvClose is closed) and there is nothing
// to do. The slow path indicates the caller dropped the stream without
// closing it; force-close the stream with errStreamAbandoned so the abandon
// surfaces through the connection read loop's "failed to handle message"
// log when a frame is in flight, and so the receive loop is not blocked by
// a buffer that will never drain.
func finalizeClientStream(args clientStreamCleanupArgs) {
select {
case <-args.s.recvClose:
return
default:
}
args.c.deleteStreamWithError(args.s, errStreamAbandoned)
}

func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
Expand Down
8 changes: 8 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ var (
ErrStreamFull = errors.New("ttrpc: stream buffer full")
)

// errStreamAbandoned is set on a stream's recvErr by the runtime cleanup
// when the caller dropped a clientStream without closing it. It is not
// exported because it cannot reach external callers: by the time the
// cleanup runs every reference to the clientStream is gone, so no RecvMsg
// or dispatch is left to observe it. Its purpose is to differentiate the
// abandon case in the connection read loop's error log from a normal close.
var errStreamAbandoned = errors.New("ttrpc: stream abandoned by caller")

// OversizedMessageErr is used to indicate refusal to send an oversized message.
// It wraps a ResourceExhausted grpc Status together with the offending message
// length.
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/containerd/ttrpc

go 1.23
go 1.24

require (
github.com/containerd/log v0.1.0
Expand All @@ -13,4 +13,4 @@ require (
google.golang.org/protobuf v1.36.0
)

require github.com/sirupsen/logrus v1.9.3 // indirect
require github.com/sirupsen/logrus v1.9.3
18 changes: 12 additions & 6 deletions services.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,17 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
return nil, status.Errorf(codes.Unimplemented, "method %v", req.Method)
}

// streamRecvBufferSize is the buffer size for stream recv channels. It
// should be large enough to absorb normal bursts without hitting the
// 1-second timeout fallback in receive/data, but small enough that
// per-stream memory overhead stays trivial.
const streamRecvBufferSize = 64
// streamRecvBufferSize is the buffer size for stream recv channels.
//
// Consumers are expected to either process incoming messages immediately
// or hand them off to a goroutine, so the buffer only needs to absorb
// brief scheduling jitter (a GC pause, a blocked syscall, the time to
// pass a value to a worker channel). 16 is comfortably above realistic
// jitter without inflating per-stream memory. Sustained slowness will
// hit the streamFullTimeout fallback in stream.receive / streamHandler.data,
// at which point either runtime.AddCleanup (for abandoned streams) or
// ErrStreamFull (for buggy held-but-unconsumed consumers) recovers.
const streamRecvBufferSize = 16

type streamHandler struct {
ctx context.Context
Expand Down Expand Up @@ -203,7 +209,7 @@ func (s *streamHandler) data(unmarshal Unmarshaler) error {
return nil
case <-s.ctx.Done():
return s.ctx.Err()
case <-time.After(time.Second):
case <-time.After(streamFullTimeout):
return ErrStreamFull
}
}
Expand Down
19 changes: 18 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,23 @@ type streamMessage struct {
payload []byte
}

// streamFullTimeout bounds how long the receive loop will wait for a stream's
// recv buffer to drain before giving up. The fallback prevents a single
// unconsumed stream from indefinitely blocking the connection-level receive
// loop.
//
// Most buffer fillups in practice are abandoned streams (the caller dropped
// the clientStream without consuming it), and those are handled faster by
// the runtime.AddCleanup attached in NewStream than by waiting out this
// timeout. Five seconds therefore primarily bounds head-of-line blocking
// for the rarer "held but not consumed" case (a goroutine leak in the
// caller), where neither the cleanup nor the contract that consumers
// process immediately or hand off to a goroutine can help.
//
// Exposed as a var (rather than const) so tests can extend it to observe
// the abandon-via-cleanup unblock path without racing the timeout.
var streamFullTimeout = 5 * time.Second

type stream struct {
id streamID
sender sender
Expand Down Expand Up @@ -92,7 +109,7 @@ func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
case <-time.After(streamFullTimeout):
s.closeWithError(ErrStreamFull)
return ErrStreamFull
}
Expand Down
Loading
Loading