Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,5 @@ DRPC is proud to get as much done in as few lines of code as possible. It's the
| storj.io/drpc/drpcctx | 41 |
| storj.io/drpc/internal/drpcopts | 30 |
| storj.io/drpc/drpcstats | 25 |
| storj.io/drpc/drpcdebug | 22 |
| storj.io/drpc/drpcenc | 15 |
| **Total** | **3611** |
| **Total** | **3589** |
11 changes: 11 additions & 0 deletions debug_off.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

//go:build !drpcdebug

package drpc

// DebugEnabled controls whether debug logging is active. When false (the
// default), the compiler eliminates debug log callsites entirely so that
// callbacks passed to log helpers are never allocated or evaluated.
const DebugEnabled = false
10 changes: 10 additions & 0 deletions debug_on.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

//go:build drpcdebug

package drpc

// DebugEnabled controls whether debug logging is active. Build with
// -tags=drpcdebug to enable debug log evaluation.
const DebugEnabled = true
11 changes: 11 additions & 0 deletions drpcclient/dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type dialOptions struct {
// tlsConfig is an optional TLS configuration for secure connections.
tlsConfig *tls.Config

// logger is used to log errors and operational events on the connection.
logger drpc.Logger

// metrics holds optional metrics the conn will populate. No metrics are
// recorded if this is nil. When shouldRecord is set, metrics are recorded
// only when shouldRecord returns true.
Expand Down Expand Up @@ -94,6 +97,13 @@ func WithShouldRecordFunc(shouldRecord func() bool) DialOption {
}
}

// WithLogger returns a DialOption that sets the Logger for the connection.
func WithLogger(logger drpc.Logger) DialOption {
return func(o *dialOptions) {
o.logger = logger
}
}

// WithContextDialer returns a DialOption that sets a custom dialer function
// to be used instead of the default net.Dialer.
func WithContextDialer(dialer func(context.Context, string) (net.Conn, error)) DialOption {
Expand Down Expand Up @@ -160,6 +170,7 @@ func DialContext(
},
SoftCancel: false,
},
Logger: options.logger,
ShouldRecord: options.shouldRecord,
Metrics: *options.metrics,
}), nil
Expand Down
24 changes: 21 additions & 3 deletions drpcconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type Options struct {
// Manager controls the options we pass to the manager of this conn.
Manager drpcmanager.Options

// Logger is used to log errors and operational events. If nil,
// drpc.DefaultLogger is used.
Logger drpc.Logger

// TODO: (server): deprecate this
// CollectStats controls whether the client should collect stats on the
// rpcs it creates.
Expand Down Expand Up @@ -77,6 +81,9 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
c.stats = make(map[string]*drpcstats.Stats)
}

if opts.Manager.Logger == nil {
opts.Manager.Logger = opts.Logger
}
c.man = drpcmanager.NewWithOptions(c.tr, opts.Manager)

return c
Expand Down Expand Up @@ -123,7 +130,9 @@ func (c *Conn) Close() (err error) { return c.man.Close() }

// Invoke issues the rpc on the transport serializing in, waits for a response, and
// deserializes it into out. Only one Invoke or Stream may be open at a time.
func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) {
func (c *Conn) Invoke(
ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message,
) (err error) {
defer func() { err = drpc.ToRPCErr(err) }()

var metadata []byte
Expand Down Expand Up @@ -155,7 +164,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou
return nil
}

func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string, data []byte, metadata []byte, out drpc.Message) (err error) {
func (c *Conn) doInvoke(
stream *drpcstream.Stream,
enc drpc.Encoding,
rpc string,
data []byte,
metadata []byte,
out drpc.Message,
) (err error) {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
Expand All @@ -178,7 +194,9 @@ func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string

// NewStream begins a streaming rpc on the connection. Only one Invoke or Stream may
// be open at a time.
func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) {
func (c *Conn) NewStream(
ctx context.Context, rpc string, enc drpc.Encoding,
) (_ drpc.Stream, err error) {
defer func() { err = drpc.ToRPCErr(err) }()

var metadata []byte
Expand Down
23 changes: 23 additions & 0 deletions drpcconn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,29 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
_ = s.CloseSend()
}

func TestConnDefaultLogger(t *testing.T) {
pc, ps := net.Pipe()
defer func() { _ = pc.Close() }()
defer func() { _ = ps.Close() }()

conn := NewWithOptions(pc, Options{})
defer func() { _ = conn.Close() }()
// Verify construction with nil Logger does not panic.
_ = conn
}

func TestConnCustomLogger(t *testing.T) {
pc, ps := net.Pipe()
defer func() { _ = pc.Close() }()
defer func() { _ = ps.Close() }()

var logger drpc.InMemLogger
conn := NewWithOptions(pc, Options{Logger: &logger})
defer func() { _ = conn.Close() }()
// Verify construction with custom Logger does not panic.
_ = conn
}

func TestConn_encodeMetadata(t *testing.T) {
pc, ps := net.Pipe()
defer func() { assert.NoError(t, pc.Close()) }()
Expand Down
19 changes: 0 additions & 19 deletions drpcdebug/README.md

This file was deleted.

8 changes: 0 additions & 8 deletions drpcdebug/doc.go

This file was deleted.

14 changes: 0 additions & 14 deletions drpcdebug/log_disabled.go

This file was deleted.

30 changes: 0 additions & 30 deletions drpcdebug/log_enabled.go

This file was deleted.

29 changes: 21 additions & 8 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (

"github.com/zeebo/errs"
grpcmetadata "google.golang.org/grpc/metadata"

"storj.io/drpc"
"storj.io/drpc/drpcdebug"
"storj.io/drpc/drpcmetadata"
"storj.io/drpc/drpcsignal"
"storj.io/drpc/drpcstream"
Expand Down Expand Up @@ -60,6 +58,10 @@ type Options struct {
// handling. When enabled, the server stream will decode incoming metadata
// into grpc metadata in the context.
GRPCMetadataCompatMode bool

// Logger is used to log operational events. If nil, drpc.DefaultLogger is
// used.
Logger drpc.Logger
}

// Manager handles the logic of managing a transport for a drpc client or
Expand Down Expand Up @@ -100,9 +102,13 @@ func New(tr drpc.Transport) *Manager {
// NewWithOptions returns a new manager for the transport. It uses the provided
// options to manage details of how it uses it.
func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
if opts.Logger == nil {
opts.Logger = drpc.DefaultLogger
}

m := &Manager{
tr: tr,
wr: drpcwire.NewWriter(tr, opts.WriterBufferSize),
wr: drpcwire.NewWriterWithLogger(tr, opts.WriterBufferSize, opts.Logger),
rd: drpcwire.NewReaderWithOptions(tr, opts.Reader),
opts: opts,

Expand Down Expand Up @@ -135,8 +141,8 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
func (m *Manager) String() string { return fmt.Sprintf("<man %p>", m) }

func (m *Manager) log(what string, cb func() string) {
if drpcdebug.Enabled {
drpcdebug.Log(func() (_, _, _ string) { return m.String(), what, cb() })
if drpc.DebugEnabled {
m.opts.Logger.Debugf("%s %s %s", m.String(), what, cb())
}
}

Expand Down Expand Up @@ -298,8 +304,11 @@ func (m *Manager) manageReader() {
//

// newStream creates a stream value with the appropriate configuration for this manager.
func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string) (*drpcstream.Stream, error) {
func (m *Manager) newStream(
ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string,
) (*drpcstream.Stream, error) {
opts := m.opts.Stream
opts.Logger = m.opts.Logger
drpcopts.SetStreamKind(&opts.Internal, kind)
drpcopts.SetStreamRPC(&opts.Internal, rpc)
if cb := drpcopts.GetManagerStatsCB(&m.opts.Internal); cb != nil {
Expand Down Expand Up @@ -425,7 +434,9 @@ func (m *Manager) Close() error {
}

// NewClientStream starts a stream on the managed transport for use by a client.
func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpcstream.Stream, err error) {
func (m *Manager) NewClientStream(
ctx context.Context, rpc string,
) (stream *drpcstream.Stream, err error) {
if err := m.acquireSemaphore(ctx); err != nil {
return nil, err
}
Expand All @@ -436,7 +447,9 @@ func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpc
// NewServerStream starts a stream on the managed transport for use by a server.
// It does this by waiting for the client to issue an invoke message and
// returning the details.
func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Stream, rpc string, err error) {
func (m *Manager) NewServerStream(
ctx context.Context,
) (stream *drpcstream.Stream, rpc string, err error) {
if err := m.acquireSemaphore(ctx); err != nil {
return nil, "", err
}
Expand Down
66 changes: 66 additions & 0 deletions drpcmanager/manager_debug_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.

//go:build drpcdebug

package drpcmanager

import (
"context"
"errors"
"io"
"net"
"strings"
"testing"

"github.com/zeebo/assert"
"storj.io/drpc"
"storj.io/drpc/drpctest"
"storj.io/drpc/drpcwire"
)

func TestManagerLoggerPropagation(t *testing.T) {
ctx := drpctest.NewTracker(t)
defer ctx.Close()

cconn, sconn := net.Pipe()
defer func() { _ = cconn.Close() }()
defer func() { _ = sconn.Close() }()

var logger drpc.InMemLogger

cman := NewWithOptions(cconn, Options{Logger: &logger})
defer func() { _ = cman.Close() }()

sman := New(sconn)
defer func() { _ = sman.Close() }()

ctx.Run(func(ctx context.Context) {
stream, err := cman.NewClientStream(ctx, "rpc")
assert.NoError(t, err)
defer func() { _ = stream.Close() }()

assert.NoError(t, stream.RawWrite(drpcwire.KindInvoke, []byte("invoke")))
assert.NoError(t, stream.RawWrite(drpcwire.KindMessage, []byte("message")))
assert.NoError(t, stream.RawFlush())

assert.NoError(t, stream.Close())
})

ctx.Run(func(ctx context.Context) {
stream, _, err := sman.NewServerStream(ctx)
assert.NoError(t, err)
defer func() { _ = stream.Close() }()

_, err = stream.RawRecv()
assert.NoError(t, err)

_, err = stream.RawRecv()
assert.That(t, errors.Is(err, io.EOF))
})

ctx.Wait()

logs := logger.String()
assert.That(t, strings.Contains(logs, "DEBUG:"))
}
Loading
Loading