Skip to content

Commit e806f5a

Browse files
committed
fixup! drpcpool: add support for ShouldRecord
1 parent 169cc9b commit e806f5a

4 files changed

Lines changed: 146 additions & 4 deletions

File tree

drpcconn/conn.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
6161
tr: tr,
6262
}
6363

64+
shouldWrap := opts.ShouldRecord != nil
6465
if opts.ShouldRecord == nil {
6566
opts.ShouldRecord = func() bool { return false }
6667
}
67-
if opts.ShouldRecord() {
68+
if shouldWrap {
6869
mt := drpcmetrics.ToMeteredTransport(tr, opts.Metrics.BytesSent,
6970
opts.Metrics.BytesRecv, opts.ShouldRecord)
7071
c.tr = mt

drpcpool/pool.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,8 @@ func (p *Pool[K, V]) initPoolMetrics() {
7676
if p.opts.ShouldRecord == nil {
7777
p.opts.ShouldRecord = func() bool { return false }
7878
}
79-
if !p.opts.ShouldRecord() {
80-
return
81-
}
79+
80+
// initialize the metrics with no-op implementations if they are nil
8281
metrics := &p.opts.Metrics
8382
if metrics.PoolSize == nil {
8483
metrics.PoolSize = drpcmetrics.NoOpGauge{}

drpcpool/pool_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,64 @@ func TestPoolMetrics_ShouldRecordFalse(t *testing.T) {
588588
})
589589
}
590590

591+
func TestPoolMetrics_ShouldRecordDynamic(t *testing.T) {
592+
recording := false
593+
poolSize := &testPoolGauge{}
594+
hits := &testPoolCounter{}
595+
misses := &testPoolCounter{}
596+
597+
pool := New[string, Conn](Options{
598+
Capacity: 10,
599+
Metrics: PoolMetrics{
600+
PoolSize: poolSize,
601+
ConnectionHitsTotal: hits,
602+
ConnectionMissesTotal: misses,
603+
},
604+
ShouldRecord: func() bool { return recording },
605+
})
606+
defer func() { _ = pool.Close() }()
607+
608+
newConn := func() *callbackConn {
609+
return &callbackConn{
610+
ClosedFn: func() <-chan struct{} { return nil },
611+
UnblockedFn: func() <-chan struct{} { return closedCh },
612+
}
613+
}
614+
615+
// Recording is off — no metrics should be collected.
616+
pool.Put("key", newConn())
617+
pool.Take("key")
618+
pool.Take("miss")
619+
assert.Equal(t, poolSize.total, 0.0)
620+
assert.Equal(t, hits.total, 0.0)
621+
assert.Equal(t, misses.total, 0.0)
622+
623+
// Turn recording on — metrics should now be collected.
624+
recording = true
625+
pool.Put("key", newConn())
626+
assert.Equal(t, poolSize.total, 1.0)
627+
628+
_, ok := pool.Take("key")
629+
assert.That(t, ok)
630+
assert.Equal(t, hits.total, 1.0)
631+
assert.Equal(t, poolSize.total, 0.0)
632+
633+
_, ok = pool.Take("miss")
634+
assert.That(t, !ok)
635+
assert.Equal(t, misses.total, 1.0)
636+
637+
// Turn recording back off — counters should stop incrementing.
638+
recording = false
639+
pool.Put("key", newConn())
640+
assert.Equal(t, poolSize.total, 0.0) // unchanged
641+
642+
pool.Take("key")
643+
assert.Equal(t, hits.total, 1.0) // unchanged
644+
645+
pool.Take("miss2")
646+
assert.Equal(t, misses.total, 1.0) // unchanged
647+
}
648+
591649
func BenchmarkPool(b *testing.B) {
592650
ctx := drpctest.NewTracker(b)
593651
defer ctx.Close()

internal/integration/metrics_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,90 @@ func TestClientByteMetricsNotCollected(t *testing.T) {
165165
_ = conn.Close()
166166
}
167167

168+
func TestClientByteMetricsShouldRecordFalse(t *testing.T) {
169+
ctx := drpctest.NewTracker(t)
170+
defer ctx.Close()
171+
172+
sent := &testCounter{}
173+
recv := &testCounter{}
174+
175+
c1, c2 := net.Pipe()
176+
mux := drpcmux.New()
177+
assert.NoError(t, DRPCRegisterService(mux, standardImpl))
178+
srv := drpcserver.New(mux)
179+
ctx.Run(func(ctx2 context.Context) { _ = srv.ServeOne(ctx2, c1) })
180+
conn := drpcconn.NewWithOptions(c2, drpcconn.Options{
181+
Metrics: drpcmetrics.ClientMetrics{
182+
BytesSent: sent,
183+
BytesRecv: recv,
184+
},
185+
ShouldRecord: func() bool { return false },
186+
})
187+
cli := NewDRPCServiceClient(conn)
188+
189+
out, err := cli.Method1(ctx, in(1))
190+
assert.NoError(t, err)
191+
assert.True(t, Equal(out, &Out{Out: 1}))
192+
193+
// ShouldRecord returns false, so no metrics should be collected
194+
// even though the transport is wrapped.
195+
assert.Equal(t, sent.total(), 0.0)
196+
assert.Equal(t, recv.total(), 0.0)
197+
198+
_ = conn.Close()
199+
}
200+
201+
func TestClientByteMetricsShouldRecordDynamic(t *testing.T) {
202+
ctx := drpctest.NewTracker(t)
203+
defer ctx.Close()
204+
205+
recording := false
206+
sent := &testCounter{}
207+
recv := &testCounter{}
208+
209+
c1, c2 := net.Pipe()
210+
mux := drpcmux.New()
211+
assert.NoError(t, DRPCRegisterService(mux, standardImpl))
212+
srv := drpcserver.New(mux)
213+
ctx.Run(func(ctx2 context.Context) { _ = srv.ServeOne(ctx2, c1) })
214+
conn := drpcconn.NewWithOptions(c2, drpcconn.Options{
215+
Manager: drpcmanager.Options{},
216+
Metrics: drpcmetrics.ClientMetrics{
217+
BytesSent: sent,
218+
BytesRecv: recv,
219+
},
220+
ShouldRecord: func() bool { return recording },
221+
})
222+
cli := NewDRPCServiceClient(conn)
223+
224+
// Recording off — no metrics.
225+
out, err := cli.Method1(ctx, in(1))
226+
assert.NoError(t, err)
227+
assert.True(t, Equal(out, &Out{Out: 1}))
228+
assert.Equal(t, sent.total(), 0.0)
229+
assert.Equal(t, recv.total(), 0.0)
230+
231+
// Turn recording on — metrics should now be collected.
232+
recording = true
233+
out, err = cli.Method1(ctx, in(1))
234+
assert.NoError(t, err)
235+
assert.True(t, Equal(out, &Out{Out: 1}))
236+
assert.That(t, sent.total() > 0)
237+
assert.That(t, recv.total() > 0)
238+
239+
// Turn recording back off — counters should stop incrementing.
240+
sentBefore := sent.total()
241+
recvBefore := recv.total()
242+
recording = false
243+
out, err = cli.Method1(ctx, in(1))
244+
assert.NoError(t, err)
245+
assert.True(t, Equal(out, &Out{Out: 1}))
246+
assert.Equal(t, sent.total(), sentBefore)
247+
assert.Equal(t, recv.total(), recvBefore)
248+
249+
_ = conn.Close()
250+
}
251+
168252
func TestServerTLSHandshakeErrorMetric(t *testing.T) {
169253
tlsErrors := &testCounter{}
170254

0 commit comments

Comments
 (0)