Skip to content

Commit 7b73c9f

Browse files
*: propagate Logger through drpcmanager, drpcstream, drpcwire, and drpcpool
Thread the Logger interface through the entire DRPC stack so that all packages have access to structured logging: - drpcwire.Writer: add NewWriterWithLogger constructor, replace drpcdebug.Log calls with Logger.Debugf for buffer flush events. - drpcstream.Options: add Logger field, default to drpc.DefaultLogger. Replace drpcdebug.Log calls with Logger.Debugf for stream lifecycle events (HANDLE, SEND, FIN, FLUSH, CALL). - drpcmanager.Options: add Logger field, default to drpc.DefaultLogger. Replace drpcdebug.Log calls with Logger.Debugf for connection management events (WAIT, TERM, READ, STREAM, CANCEL, BUSY, UNFIN, CLEAN). Propagate Logger to Writer and Stream instances it creates. - drpcpool.Options: add Logger field, default to drpc.DefaultLogger. Replace drpcdebug.Log calls with Logger.Debugf for pool operations (CLOSE, TAKEN, PUT). - drpcserver: propagate its Logger to the managers it creates via ServeOne, ensuring the entire call stack shares the same logger. All packages now use Logger.Debugf instead of the build-tag-gated drpcdebug.Log. Since DefaultLogger.Debugf is a no-op, production behavior is unchanged. Custom Logger implementations (e.g. CockroachDB's) can route these to verbose/conditional logging. Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
1 parent a2ae205 commit 7b73c9f

13 files changed

Lines changed: 244 additions & 35 deletions

File tree

debug_off.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// Copyright (C) 2019 Storj Labs, Inc.
2+
// See LICENSE for copying information.
3+
4+
//go:build !drpcdebug
5+
6+
package drpc
7+
8+
// DebugEnabled controls whether debug logging is active. When false (the
9+
// default), the compiler eliminates debug log callsites entirely so that
10+
// callbacks passed to log helpers are never allocated or evaluated.
11+
const DebugEnabled = false

debug_on.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
// Copyright (C) 2019 Storj Labs, Inc.
2+
// See LICENSE for copying information.
3+
4+
//go:build drpcdebug
5+
6+
package drpc
7+
8+
// DebugEnabled controls whether debug logging is active. Build with
9+
// -tags=drpcdebug to enable debug log evaluation.
10+
const DebugEnabled = true

drpcclient/dialoptions.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ type dialOptions struct {
3333
// tlsConfig is an optional TLS configuration for secure connections.
3434
tlsConfig *tls.Config
3535

36+
// logger is used to log errors and operational events on the connection.
37+
logger drpc.Logger
38+
3639
// metrics holds optional metrics the conn will populate. No metrics are
3740
// recorded if this is nil. When shouldRecord is set, metrics are recorded
3841
// only when shouldRecord returns true.
@@ -94,6 +97,13 @@ func WithShouldRecordFunc(shouldRecord func() bool) DialOption {
9497
}
9598
}
9699

100+
// WithLogger returns a DialOption that sets the Logger for the connection.
101+
func WithLogger(logger drpc.Logger) DialOption {
102+
return func(o *dialOptions) {
103+
o.logger = logger
104+
}
105+
}
106+
97107
// WithContextDialer returns a DialOption that sets a custom dialer function
98108
// to be used instead of the default net.Dialer.
99109
func WithContextDialer(dialer func(context.Context, string) (net.Conn, error)) DialOption {
@@ -160,6 +170,7 @@ func DialContext(
160170
},
161171
SoftCancel: false,
162172
},
173+
Logger: options.logger,
163174
ShouldRecord: options.shouldRecord,
164175
Metrics: *options.metrics,
165176
}), nil

drpcconn/conn.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ type Options struct {
2525
// Manager controls the options we pass to the manager of this conn.
2626
Manager drpcmanager.Options
2727

28+
// Logger is used to log errors and operational events. If nil,
29+
// drpc.DefaultLogger is used.
30+
Logger drpc.Logger
31+
2832
// TODO: (server): deprecate this
2933
// CollectStats controls whether the client should collect stats on the
3034
// rpcs it creates.
@@ -77,6 +81,9 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
7781
c.stats = make(map[string]*drpcstats.Stats)
7882
}
7983

84+
if opts.Manager.Logger == nil {
85+
opts.Manager.Logger = opts.Logger
86+
}
8087
c.man = drpcmanager.NewWithOptions(c.tr, opts.Manager)
8188

8289
return c
@@ -123,7 +130,9 @@ func (c *Conn) Close() (err error) { return c.man.Close() }
123130

124131
// Invoke issues the rpc on the transport serializing in, waits for a response, and
125132
// deserializes it into out. Only one Invoke or Stream may be open at a time.
126-
func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) {
133+
func (c *Conn) Invoke(
134+
ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message,
135+
) (err error) {
127136
defer func() { err = drpc.ToRPCErr(err) }()
128137

129138
var metadata []byte
@@ -155,7 +164,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou
155164
return nil
156165
}
157166

158-
func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string, data []byte, metadata []byte, out drpc.Message) (err error) {
167+
func (c *Conn) doInvoke(
168+
stream *drpcstream.Stream,
169+
enc drpc.Encoding,
170+
rpc string,
171+
data []byte,
172+
metadata []byte,
173+
out drpc.Message,
174+
) (err error) {
159175
if len(metadata) > 0 {
160176
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
161177
return err
@@ -178,7 +194,9 @@ func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string
178194

179195
// NewStream begins a streaming rpc on the connection. Only one Invoke or Stream may
180196
// be open at a time.
181-
func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) {
197+
func (c *Conn) NewStream(
198+
ctx context.Context, rpc string, enc drpc.Encoding,
199+
) (_ drpc.Stream, err error) {
182200
defer func() { err = drpc.ToRPCErr(err) }()
183201

184202
var metadata []byte

drpcconn/conn_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,29 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
183183
_ = s.CloseSend()
184184
}
185185

186+
func TestConnDefaultLogger(t *testing.T) {
187+
pc, ps := net.Pipe()
188+
defer func() { _ = pc.Close() }()
189+
defer func() { _ = ps.Close() }()
190+
191+
conn := NewWithOptions(pc, Options{})
192+
defer func() { _ = conn.Close() }()
193+
// Verify construction with nil Logger does not panic.
194+
_ = conn
195+
}
196+
197+
func TestConnCustomLogger(t *testing.T) {
198+
pc, ps := net.Pipe()
199+
defer func() { _ = pc.Close() }()
200+
defer func() { _ = ps.Close() }()
201+
202+
var logger drpc.InMemLogger
203+
conn := NewWithOptions(pc, Options{Logger: &logger})
204+
defer func() { _ = conn.Close() }()
205+
// Verify construction with custom Logger does not panic.
206+
_ = conn
207+
}
208+
186209
func TestConn_encodeMetadata(t *testing.T) {
187210
pc, ps := net.Pipe()
188211
defer func() { assert.NoError(t, pc.Close()) }()

drpcmanager/manager.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ import (
1515

1616
"github.com/zeebo/errs"
1717
grpcmetadata "google.golang.org/grpc/metadata"
18-
1918
"storj.io/drpc"
20-
"storj.io/drpc/drpcdebug"
2119
"storj.io/drpc/drpcmetadata"
2220
"storj.io/drpc/drpcsignal"
2321
"storj.io/drpc/drpcstream"
@@ -60,6 +58,10 @@ type Options struct {
6058
// handling. When enabled, the server stream will decode incoming metadata
6159
// into grpc metadata in the context.
6260
GRPCMetadataCompatMode bool
61+
62+
// Logger is used to log operational events. If nil, drpc.DefaultLogger is
63+
// used.
64+
Logger drpc.Logger
6365
}
6466

6567
// Manager handles the logic of managing a transport for a drpc client or
@@ -100,9 +102,13 @@ func New(tr drpc.Transport) *Manager {
100102
// NewWithOptions returns a new manager for the transport. It uses the provided
101103
// options to manage details of how it uses it.
102104
func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
105+
if opts.Logger == nil {
106+
opts.Logger = drpc.DefaultLogger
107+
}
108+
103109
m := &Manager{
104110
tr: tr,
105-
wr: drpcwire.NewWriter(tr, opts.WriterBufferSize),
111+
wr: drpcwire.NewWriterWithLogger(tr, opts.WriterBufferSize, opts.Logger),
106112
rd: drpcwire.NewReaderWithOptions(tr, opts.Reader),
107113
opts: opts,
108114

@@ -135,8 +141,8 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
135141
func (m *Manager) String() string { return fmt.Sprintf("<man %p>", m) }
136142

137143
func (m *Manager) log(what string, cb func() string) {
138-
if drpcdebug.Enabled {
139-
drpcdebug.Log(func() (_, _, _ string) { return m.String(), what, cb() })
144+
if drpc.DebugEnabled {
145+
m.opts.Logger.Debugf("%s %s %s", m.String(), what, cb())
140146
}
141147
}
142148

@@ -298,8 +304,11 @@ func (m *Manager) manageReader() {
298304
//
299305

300306
// newStream creates a stream value with the appropriate configuration for this manager.
301-
func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string) (*drpcstream.Stream, error) {
307+
func (m *Manager) newStream(
308+
ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string,
309+
) (*drpcstream.Stream, error) {
302310
opts := m.opts.Stream
311+
opts.Logger = m.opts.Logger
303312
drpcopts.SetStreamKind(&opts.Internal, kind)
304313
drpcopts.SetStreamRPC(&opts.Internal, rpc)
305314
if cb := drpcopts.GetManagerStatsCB(&m.opts.Internal); cb != nil {
@@ -425,7 +434,9 @@ func (m *Manager) Close() error {
425434
}
426435

427436
// NewClientStream starts a stream on the managed transport for use by a client.
428-
func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpcstream.Stream, err error) {
437+
func (m *Manager) NewClientStream(
438+
ctx context.Context, rpc string,
439+
) (stream *drpcstream.Stream, err error) {
429440
if err := m.acquireSemaphore(ctx); err != nil {
430441
return nil, err
431442
}
@@ -436,7 +447,9 @@ func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpc
436447
// NewServerStream starts a stream on the managed transport for use by a server.
437448
// It does this by waiting for the client to issue an invoke message and
438449
// returning the details.
439-
func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Stream, rpc string, err error) {
450+
func (m *Manager) NewServerStream(
451+
ctx context.Context,
452+
) (stream *drpcstream.Stream, rpc string, err error) {
440453
if err := m.acquireSemaphore(ctx); err != nil {
441454
return nil, "", err
442455
}

drpcmanager/manager_debug_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Copyright (C) 2019 Storj Labs, Inc.
2+
// See LICENSE for copying information.
3+
4+
//go:build drpcdebug
5+
6+
package drpcmanager
7+
8+
import (
9+
"context"
10+
"errors"
11+
"io"
12+
"net"
13+
"strings"
14+
"testing"
15+
16+
"github.com/zeebo/assert"
17+
"storj.io/drpc"
18+
"storj.io/drpc/drpctest"
19+
"storj.io/drpc/drpcwire"
20+
)
21+
22+
func TestManagerLoggerPropagation(t *testing.T) {
23+
ctx := drpctest.NewTracker(t)
24+
defer ctx.Close()
25+
26+
cconn, sconn := net.Pipe()
27+
defer func() { _ = cconn.Close() }()
28+
defer func() { _ = sconn.Close() }()
29+
30+
var logger drpc.InMemLogger
31+
32+
cman := NewWithOptions(cconn, Options{Logger: &logger})
33+
defer func() { _ = cman.Close() }()
34+
35+
sman := New(sconn)
36+
defer func() { _ = sman.Close() }()
37+
38+
ctx.Run(func(ctx context.Context) {
39+
stream, err := cman.NewClientStream(ctx, "rpc")
40+
assert.NoError(t, err)
41+
defer func() { _ = stream.Close() }()
42+
43+
assert.NoError(t, stream.RawWrite(drpcwire.KindInvoke, []byte("invoke")))
44+
assert.NoError(t, stream.RawWrite(drpcwire.KindMessage, []byte("message")))
45+
assert.NoError(t, stream.RawFlush())
46+
47+
assert.NoError(t, stream.Close())
48+
})
49+
50+
ctx.Run(func(ctx context.Context) {
51+
stream, _, err := sman.NewServerStream(ctx)
52+
assert.NoError(t, err)
53+
defer func() { _ = stream.Close() }()
54+
55+
_, err = stream.RawRecv()
56+
assert.NoError(t, err)
57+
58+
_, err = stream.RawRecv()
59+
assert.That(t, errors.Is(err, io.EOF))
60+
})
61+
62+
ctx.Wait()
63+
64+
logs := logger.String()
65+
assert.That(t, strings.Contains(logs, "DEBUG:"))
66+
}

drpcmanager/manager_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414

1515
"github.com/zeebo/assert"
1616
grpcmetadata "google.golang.org/grpc/metadata"
17+
"storj.io/drpc"
1718
"storj.io/drpc/drpcmetadata"
18-
1919
"storj.io/drpc/drpctest"
2020
"storj.io/drpc/drpcwire"
2121
)
@@ -295,3 +295,18 @@ func (b *blockedTransport) wait(p int, rw *bool) (int, error) {
295295
func (b *blockedTransport) Read(p []byte) (n int, err error) { return b.wait(len(p), &b.ro) }
296296
func (b *blockedTransport) Write(p []byte) (n int, err error) { return b.wait(len(p), &b.wo) }
297297
func (b *blockedTransport) Close() error { return nil }
298+
299+
func TestManagerDefaultLogger(t *testing.T) {
300+
tr := make(blockingTransport)
301+
man := New(tr)
302+
defer func() { _ = man.Close() }()
303+
assert.Equal(t, man.opts.Logger, drpc.DefaultLogger)
304+
}
305+
306+
func TestManagerCustomLogger(t *testing.T) {
307+
tr := make(blockingTransport)
308+
var logger drpc.InMemLogger
309+
man := NewWithOptions(tr, Options{Logger: &logger})
310+
defer func() { _ = man.Close() }()
311+
assert.Equal(t, man.opts.Logger, &logger)
312+
}

drpcpool/pool.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ package drpcpool
55

66
import (
77
"context"
8-
"fmt"
98
"sync"
109
"time"
1110

1211
"github.com/zeebo/errs"
13-
"storj.io/drpc/drpcdebug"
12+
"storj.io/drpc"
1413
"storj.io/drpc/drpcmetrics"
1514
)
1615

@@ -43,6 +42,10 @@ type Options struct {
4342

4443
// Labels holds optional labels to be attached to all metrics.
4544
Labels map[string]string
45+
46+
// Logger is used to log operational events. If nil, drpc.DefaultLogger is
47+
// used.
48+
Logger drpc.Logger
4649
}
4750

4851
// Pool is a connection pool with key type K. It maintains a cache of connections
@@ -58,6 +61,9 @@ type Pool[K comparable, V Conn] struct {
5861

5962
// New constructs a new Pool with the provided Options.
6063
func New[K comparable, V Conn](opts Options) *Pool[K, V] {
64+
if opts.Logger == nil {
65+
opts.Logger = drpc.DefaultLogger
66+
}
6167
pool := Pool[K, V]{
6268
opts: opts,
6369
entries: make(map[K]*list[K, V]),
@@ -112,8 +118,8 @@ func (p *Pool[K, V]) updatePoolSize() {
112118
}
113119

114120
func (p *Pool[K, V]) log(what string, cb func() string) {
115-
if drpcdebug.Enabled {
116-
drpcdebug.Log(func() (_, _, _ string) { return fmt.Sprintf("<pül %p>", p), what, cb() })
121+
if drpc.DebugEnabled {
122+
p.opts.Logger.Debugf("<pool %p> %s %s", p, what, cb())
117123
}
118124
}
119125

drpcpool/pool_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,19 @@ func TestPoolMetrics_ShouldRecordDynamic(t *testing.T) {
646646
assert.Equal(t, misses.total, 1.0) // unchanged
647647
}
648648

649+
func TestPoolDefaultLogger(t *testing.T) {
650+
pool := New[string, Conn](Options{})
651+
defer func() { _ = pool.Close() }()
652+
assert.Equal(t, pool.opts.Logger, drpc.DefaultLogger)
653+
}
654+
655+
func TestPoolCustomLogger(t *testing.T) {
656+
var logger drpc.InMemLogger
657+
pool := New[string, Conn](Options{Logger: &logger})
658+
defer func() { _ = pool.Close() }()
659+
assert.Equal(t, pool.opts.Logger, &logger)
660+
}
661+
649662
func BenchmarkPool(b *testing.B) {
650663
ctx := drpctest.NewTracker(b)
651664
defer ctx.Close()

0 commit comments

Comments
 (0)