Skip to content

Commit 7e3a93b

Browse files
committed
*: plug in no-op metric implementations for nil fields
This is a follow-up to PR #33 with the following fixes: - Initialize nil metric fields with no-op implementations at construction time (server, client, pool) so that call sites don't need nil guards. - Unexport MeteredTransport behind a NewMeteredTransport constructor that handles nil counters.
1 parent 1614f86 commit 7e3a93b

7 files changed

Lines changed: 139 additions & 180 deletions

File tree

drpcclient/dialoptions.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ func DialContext(ctx context.Context, address string, opts ...DialOption) (conn
134134
}
135135
}
136136

137+
collectMetrics := true
138+
if options.metrics == nil {
139+
collectMetrics = false
140+
options.metrics = &drpcmetrics.ClientMetrics{}
141+
}
137142
return drpcconn.NewWithOptions(netConn, drpcconn.Options{
138143
Manager: drpcmanager.Options{
139144
Reader: drpcwire.ReaderOptions{
@@ -144,6 +149,7 @@ func DialContext(ctx context.Context, address string, opts ...DialOption) (conn
144149
},
145150
SoftCancel: false,
146151
},
147-
Metrics: options.metrics,
152+
CollectMetrics: collectMetrics,
153+
Metrics: *options.metrics,
148154
}), nil
149155
}

drpcconn/conn.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@ type Options struct {
2525
// Manager controls the options we pass to the manager of this conn.
2626
Manager drpcmanager.Options
2727

28+
// TODO: (server): deprecate this
2829
// CollectStats controls whether the client should collect stats on the
2930
// rpcs it creates.
3031
CollectStats bool
3132

32-
// Metrics holds optional metrics the client will populate. If nil, no
33-
// metrics are recorded.
34-
Metrics *drpcmetrics.ClientMetrics
33+
// CollectMetrics controls whether the client should collect metrics
34+
CollectMetrics bool
35+
36+
// Metrics holds optional metrics the client will populate.
37+
Metrics drpcmetrics.ClientMetrics
3538
}
3639

3740
// Conn is a drpc client connection.
@@ -41,7 +44,7 @@ type Conn struct {
4144
mu sync.Mutex
4245
wbuf []byte
4346

44-
stats map[string]*drpcstats.Stats
47+
stats map[string]*drpcstats.Stats // TODO (server): deprecate
4548
}
4649

4750
var _ drpc.Conn = (*Conn)(nil)
@@ -56,18 +59,19 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
5659
tr: tr,
5760
}
5861

59-
if opts.Metrics != nil {
60-
mt := &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: opts.Metrics.BytesSent, BytesRecv: opts.Metrics.BytesRecv}
61-
tr = mt
62-
c.tr = tr
62+
if opts.CollectMetrics {
63+
mt := drpcmetrics.ToMeteredTransport(tr, opts.Metrics.BytesSent,
64+
opts.Metrics.BytesRecv)
65+
c.tr = mt
6366
}
6467

68+
// TODO: (server): deprecate
6569
if opts.CollectStats {
6670
drpcopts.SetManagerStatsCB(&opts.Manager.Internal, c.getStats)
6771
c.stats = make(map[string]*drpcstats.Stats)
6872
}
6973

70-
c.man = drpcmanager.NewWithOptions(tr, opts.Manager)
74+
c.man = drpcmanager.NewWithOptions(c.tr, opts.Manager)
7175

7276
return c
7377
}

drpcmetrics/metrics.go

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,37 @@ import (
1111
)
1212

1313
// Counter is a metric that can only be incremented (monotonically increasing).
14-
// The labels parameter contains key-value pairs for metric dimensions
15-
// (e.g. rpcService, rpcMethod). It may be nil when no
16-
// dimensional context is available.
17-
// The concrete type *must* provide a thread-safe implementation for these
18-
// methods.
14+
// The concrete type must provide a thread-safe implementation for the method.
1915
type Counter interface {
20-
Inc(labels map[string]string, v int64)
16+
Inc(v int64)
2117
}
2218

2319
// NoOpCounter is a Counter implementation that does nothing.
2420
type NoOpCounter struct{}
2521

2622
// Inc implements Counter.
27-
func (NoOpCounter) Inc(labels map[string]string, v int64) {}
23+
func (NoOpCounter) Inc(v int64) {}
24+
25+
// LabeledCounter is a Counter that accepts dimensional labels on each
26+
// increment. The labels parameter contains key-value pairs for metric
27+
// dimensions. It may be nil when no dimensional context is available.
28+
// The concrete type must provide a thread-safe implementation.
29+
type LabeledCounter interface {
30+
Inc(labels map[string]string, v int64)
31+
}
2832

29-
// Gauge is a metric that can increase and decrease (e.g. pool size,
30-
// active count). Update sets the gauge to the given absolute value.
33+
// NoOpLabeledCounter is a LabeledCounter implementation that does nothing.
34+
type NoOpLabeledCounter struct{}
35+
36+
// Inc implements LabeledCounter.
37+
func (NoOpLabeledCounter) Inc(labels map[string]string, v int64) {}
38+
39+
// Gauge is a metric that can increase and decrease (e.g. pool size).
40+
// Update sets the gauge to the given absolute value.
3141
//
3242
// Note: Gauge values may go up or down; Counter values must only increase.
33-
// The concrete type *must* provide a thread-safe implementation for these
34-
// methods.
43+
// The concrete type must provide a thread-safe implementation for the
44+
// method.
3545
type Gauge interface {
3646
Update(labels map[string]string, v int64)
3747
}
@@ -42,30 +52,43 @@ type NoOpGauge struct{}
4252
// Update implements Gauge.
4353
func (NoOpGauge) Update(labels map[string]string, v int64) {}
4454

45-
// TODO (sujatha): Plug-in no-op implementation for nil metrics
46-
47-
// MeteredTransport wraps a Transport and increments byte counters on each
55+
// meteredTransport wraps a Transport and increments byte counters on each
4856
// Read and Write call.
49-
type MeteredTransport struct {
57+
type meteredTransport struct {
5058
drpc.Transport
51-
BytesSent Counter
52-
BytesRecv Counter
59+
bytesSent Counter
60+
bytesRecv Counter
61+
}
62+
63+
// ToMeteredTransport returns a transport that increments bytesSent and
64+
// bytesRecv on each Write and Read call respectively. Nil counters are
65+
// replaced with no-op implementations.
66+
func ToMeteredTransport(tr drpc.Transport, bytesSent,
67+
bytesRecv Counter) drpc.Transport {
68+
if bytesSent == nil {
69+
bytesSent = NoOpCounter{}
70+
}
71+
if bytesRecv == nil {
72+
bytesRecv = NoOpCounter{}
73+
}
74+
return &meteredTransport{Transport: tr, bytesSent: bytesSent,
75+
bytesRecv: bytesRecv}
5376
}
5477

55-
// Read reads from the underlying transport and increments BytesRecv.
56-
func (t *MeteredTransport) Read(p []byte) (n int, err error) {
78+
// Read reads from the underlying transport and increments bytesRecv.
79+
func (t *meteredTransport) Read(p []byte) (n int, err error) {
5780
n, err = t.Transport.Read(p)
58-
if n > 0 && t.BytesRecv != nil {
59-
t.BytesRecv.Inc(nil, int64(n))
81+
if n > 0 {
82+
t.bytesRecv.Inc(int64(n))
6083
}
6184
return n, err
6285
}
6386

64-
// Write writes to the underlying transport and increments BytesSent.
65-
func (t *MeteredTransport) Write(p []byte) (n int, err error) {
87+
// Write writes to the underlying transport and increments bytesSent.
88+
func (t *meteredTransport) Write(p []byte) (n int, err error) {
6689
n, err = t.Transport.Write(p)
67-
if n > 0 && t.BytesSent != nil {
68-
t.BytesSent.Inc(nil, int64(n))
90+
if n > 0 {
91+
t.bytesSent.Inc(int64(n))
6992
}
7093
return n, err
7194
}

drpcpool/pool.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import (
1717
// PoolMetrics holds optional metrics for connection pool monitoring.
1818
type PoolMetrics struct {
1919
PoolSize drpcmetrics.Gauge
20-
ConnectionHitsTotal drpcmetrics.Counter
21-
ConnectionMissesTotal drpcmetrics.Counter
20+
ConnectionHitsTotal drpcmetrics.LabeledCounter
21+
ConnectionMissesTotal drpcmetrics.LabeledCounter
2222
}
2323

2424
// Options contains the options to configure a pool.
@@ -38,7 +38,7 @@ type Options struct {
3838

3939
// Metrics holds optional metrics the pool will populate. If nil,
4040
// no metrics are recorded.
41-
Metrics *PoolMetrics
41+
Metrics PoolMetrics
4242

4343
// Labels holds optional labels to be attached to all metrics.
4444
Labels map[string]string
@@ -61,36 +61,39 @@ func New[K comparable, V Conn](opts Options) *Pool[K, V] {
6161
opts: opts,
6262
entries: make(map[K]*list[K, V]),
6363
}
64+
65+
pool.initPoolMetrics()
66+
6467
// emit the metric (0 value) so it shows up as soon as the pool is created
6568
pool.updatePoolSize()
6669
return &pool
6770
}
6871

69-
func (p *Pool[K, V]) recordHit() {
70-
if p.opts.Metrics == nil {
71-
return
72+
// initPoolMetrics copies the caller-supplied metrics into the pool,
73+
// substituting no-op implementations for any nil fields.
74+
func (p *Pool[K, V]) initPoolMetrics() {
75+
metrics := &p.opts.Metrics
76+
if metrics.PoolSize == nil {
77+
metrics.PoolSize = drpcmetrics.NoOpGauge{}
7278
}
73-
if p.opts.Metrics.ConnectionHitsTotal != nil {
74-
p.opts.Metrics.ConnectionHitsTotal.Inc(p.opts.Labels, 1)
79+
if metrics.ConnectionHitsTotal == nil {
80+
metrics.ConnectionHitsTotal = drpcmetrics.NoOpLabeledCounter{}
7581
}
82+
if metrics.ConnectionMissesTotal == nil {
83+
metrics.ConnectionMissesTotal = drpcmetrics.NoOpLabeledCounter{}
84+
}
85+
}
86+
87+
func (p *Pool[K, V]) recordHit() {
88+
p.opts.Metrics.ConnectionHitsTotal.Inc(p.opts.Labels, 1)
7689
}
7790

7891
func (p *Pool[K, V]) recordMiss() {
79-
if p.opts.Metrics == nil {
80-
return
81-
}
82-
if p.opts.Metrics.ConnectionMissesTotal != nil {
83-
p.opts.Metrics.ConnectionMissesTotal.Inc(p.opts.Labels, 1)
84-
}
92+
p.opts.Metrics.ConnectionMissesTotal.Inc(p.opts.Labels, 1)
8593
}
8694

8795
func (p *Pool[K, V]) updatePoolSize() {
88-
if p.opts.Metrics == nil {
89-
return
90-
}
91-
if p.opts.Metrics.PoolSize != nil {
92-
p.opts.Metrics.PoolSize.Update(p.opts.Labels, int64(p.order.count))
93-
}
96+
p.opts.Metrics.PoolSize.Update(p.opts.Labels, int64(p.order.count))
9497
}
9598

9699
func (p *Pool[K, V]) log(what string, cb func() string) {
@@ -120,8 +123,9 @@ func (p *Pool[K, V]) Close() (err error) {
120123
// Get returns a new Conn that will use the provided dial function to create an
121124
// underlying conn to be cached by the Pool when Conn methods are invoked. It will
122125
// share any cached connections with other conns that use the same key.
123-
func (p *Pool[K, V]) Get(ctx context.Context, key K,
124-
dial func(ctx context.Context, key K) (V, error)) Conn {
126+
func (p *Pool[K, V]) Get(
127+
ctx context.Context, key K, dial func(ctx context.Context, key K) (V, error),
128+
) Conn {
125129
return &poolConn[K, V]{
126130
key: key,
127131
pool: p,

drpcpool/pool_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func TestPoolMetrics_PutTakeClose(t *testing.T) {
448448

449449
pool := New[string, Conn](Options{
450450
Capacity: 10,
451-
Metrics: &PoolMetrics{
451+
Metrics: PoolMetrics{
452452
PoolSize: poolSize,
453453
ConnectionHitsTotal: hits,
454454
ConnectionMissesTotal: misses,
@@ -501,7 +501,7 @@ func TestPoolMetrics_Eviction(t *testing.T) {
501501
pool := New[string, Conn](Options{
502502
Capacity: 1,
503503
KeyCapacity: 1,
504-
Metrics: &PoolMetrics{
504+
Metrics: PoolMetrics{
505505
PoolSize: poolSize,
506506
ConnectionMissesTotal: misses,
507507
},
@@ -529,7 +529,7 @@ func TestPoolMetrics_Eviction(t *testing.T) {
529529
func TestPoolMetrics_NilFields(t *testing.T) {
530530
// All PoolMetrics fields are nil — should not panic.
531531
pool := New[string, Conn](Options{
532-
Metrics: &PoolMetrics{},
532+
Metrics: PoolMetrics{},
533533
})
534534

535535
conn := &callbackConn{

drpcserver/server.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,32 +48,20 @@ type Options struct {
4848

4949
// Metrics holds optional metrics the server will populate. If nil, no
5050
// metrics are recorded.
51-
Metrics *ServerMetrics
51+
Metrics ServerMetrics
5252
}
5353

5454
// ServerMetrics holds optional metrics that the server will populate during
5555
// operation.
5656
// Metrics are defined and registered by the caller (e.g. in CockroachDB) and
5757
// passed in; this package never imports a metrics library.
5858
type ServerMetrics struct {
59-
BytesSent drpcmetrics.Counter
60-
BytesRecv drpcmetrics.Counter
6159
TLSHandshakeErrors drpcmetrics.Counter
6260
}
6361

64-
// addTLSHandshakeError increments the TLS handshake error counter.
65-
func (m *ServerMetrics) addTLSHandshakeError() {
66-
if m != nil && m.TLSHandshakeErrors != nil {
67-
m.TLSHandshakeErrors.Inc(nil, 1)
68-
}
69-
}
70-
71-
// toMeteredTransport wraps tr with byte counters.
72-
func (m *ServerMetrics) toMeteredTransport(tr drpc.Transport) drpc.Transport {
73-
if m == nil {
74-
return tr
75-
}
76-
return &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: m.BytesSent, BytesRecv: m.BytesRecv}
62+
// recordTLSHandshakeError increments the TLS handshake error counter.
63+
func (s *Server) recordTLSHandshakeError() {
64+
s.opts.Metrics.TLSHandshakeErrors.Inc(1)
7765
}
7866

7967
// Server is an implementation of drpc.Server to serve drpc connections.
@@ -103,12 +91,14 @@ func NewWithOptions(handler drpc.Handler, opts Options) *Server {
10391
opts: opts,
10492
handler: handler,
10593
}
106-
10794
if s.opts.CollectStats {
95+
// TODO: (server): deprecate stats
10896
drpcopts.SetManagerStatsCB(&s.opts.Manager.Internal, s.getStats)
10997
s.stats = make(map[string]*drpcstats.Stats)
11098
}
111-
99+
if s.opts.Metrics.TLSHandshakeErrors == nil {
100+
s.opts.Metrics.TLSHandshakeErrors = drpcmetrics.NoOpCounter{}
101+
}
112102
return s
113103
}
114104

@@ -156,12 +146,12 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) {
156146
// anyway.
157147
err := tlsConn.HandshakeContext(ctx)
158148
if err != nil {
159-
s.opts.Metrics.addTLSHandshakeError()
149+
s.recordTLSHandshakeError()
160150
return drpc.ConnectionError.New("server handshake [%q] failed: %w", tlsConn.RemoteAddr(), err)
161151
}
162152
if s.opts.TLSCipherRestrict != nil {
163153
if err := s.opts.TLSCipherRestrict(tlsConn); err != nil {
164-
s.opts.Metrics.addTLSHandshakeError()
154+
s.recordTLSHandshakeError()
165155
return drpc.ConnectionError.New("server handshake [%q] failed: %w", tlsConn.RemoteAddr(), err)
166156
}
167157
}
@@ -172,8 +162,6 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) {
172162
}
173163
}
174164

175-
tr = s.opts.Metrics.toMeteredTransport(tr)
176-
177165
man := drpcmanager.NewWithOptions(tr, s.opts.Manager)
178166
defer func() { err = errs.Combine(err, man.Close()) }()
179167

0 commit comments

Comments
 (0)