Skip to content

Commit edef12c

Browse files
committed
fixup! enable DRPC for sysbench
1 parent 0bdd179 commit edef12c

1 file changed

Lines changed: 30 additions & 3 deletions

File tree

pkg/rpc/drpc.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1616
"github.com/cockroachdb/cockroach/pkg/roachpb"
1717
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
18+
"github.com/cockroachdb/cockroach/pkg/util/envutil"
1819
"github.com/cockroachdb/cockroach/pkg/util/log"
1920
"github.com/cockroachdb/cockroach/pkg/util/stop"
2021
"github.com/cockroachdb/cockroach/pkg/util/tracing/drpcinterceptor"
@@ -29,6 +30,8 @@ import (
2930
"storj.io/drpc/drpcwire"
3031
)
3132

33+
var envExperimentalDRPCMuxEnabled = envutil.EnvOrDefaultBool("COCKROACH_EXPERIMENTAL_DRPC_MUX_ENABLED", true)
34+
3235
// Default idle connection timeout for DRPC connections in the pool.
3336
var defaultDRPCConnIdleTimeout = 5 * time.Minute
3437

@@ -45,6 +48,12 @@ func DialDRPC(
4548
rpcCtx *Context,
4649
) func(ctx context.Context, target string, class rpcbase.ConnectionClass) (drpc.Conn, error) {
4750
return func(ctx context.Context, target string, class rpcbase.ConnectionClass) (drpc.Conn, error) {
51+
if envExperimentalDRPCMuxEnabled {
52+
log.Dev.Infof(ctx, "dialing DRPC mux connection to %s", target)
53+
return dialDRPCMux(ctx, rpcCtx, target, class)
54+
}
55+
56+
log.Dev.Infof(ctx, "dialing DRPC non-mux connection to %s", target)
4857
transport := tcpTransport
4958
if rpcCtx.ContextOptions.AdvertiseAddr == target && rpcCtx.canLoopbackDial() {
5059
transport = loopbackTransport
@@ -78,6 +87,27 @@ func DialDRPC(
7887
}
7988
}
8089

90+
// dialDRPCMux establishes a multiplexed DRPC connection over a single
91+
// transport. Multiple concurrent streams are handled by the drpc Manager
92+
// natively, without requiring an external mux layer like yamux.
93+
func dialDRPCMux(
94+
ctx context.Context, rpcCtx *Context, target string, class rpcbase.ConnectionClass,
95+
) (drpc.Conn, error) {
96+
transport := tcpTransport
97+
if rpcCtx.ContextOptions.AdvertiseAddr == target && rpcCtx.canLoopbackDial() {
98+
transport = loopbackTransport
99+
}
100+
drpcDialOptions, err := rpcCtx.drpcDialOptionsInternal(ctx, target, class, transport)
101+
if err != nil {
102+
return nil, err
103+
}
104+
conn, err := drpcclient.DialContext(ctx, target, drpcDialOptions...)
105+
if err != nil {
106+
return nil, err
107+
}
108+
return drpcclient.NewClientConnWithOptions(ctx, conn, drpcDialOptions...)
109+
}
110+
81111
// drpcDialOptionsInternal is similar to grpcDialOptionsInternal but for
82112
// drpc connections.
83113
func (rpcCtx *Context) drpcDialOptionsInternal(
@@ -389,9 +419,6 @@ func NewDRPCServer(_ context.Context, rpcCtx *Context, opts ...ServerOption) (DR
389419
mux := drpcmux.NewWithInterceptors(unaryInterceptors, streamInterceptors)
390420

391421
d.Server = drpcserver.NewWithOptions(mux, drpcserver.Options{
392-
Log: func(err error) {
393-
log.Dev.Warningf(context.Background(), "drpc server error %v", err)
394-
},
395422
// The reader's max buffer size defaults to 4mb, and if it is exceeded (such
396423
// as happens with AddSSTable) the RPCs fail.
397424
Manager: drpcmanager.Options{

0 commit comments

Comments
 (0)