Skip to content

Commit e8d532c

Browse files
committed
Raise gRPC message size limits
1 parent 7f3f917 commit e8d532c

6 files changed

Lines changed: 42 additions & 21 deletions

File tree

adapter/test_util.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/Jille/raft-grpc-leader-rpc/leaderhealth"
1414
transport "github.com/Jille/raft-grpc-transport"
1515
"github.com/Jille/raftadmin"
16+
internalutil "github.com/bootjp/elastickv/internal"
1617
"github.com/bootjp/elastickv/kv"
1718
pb "github.com/bootjp/elastickv/proto"
1819
"github.com/bootjp/elastickv/store"
@@ -23,7 +24,6 @@ import (
2324
"github.com/stretchr/testify/require"
2425
"golang.org/x/sys/unix"
2526
"google.golang.org/grpc"
26-
"google.golang.org/grpc/credentials/insecure"
2727
)
2828

2929
func shutdown(nodes []Node) {
@@ -350,7 +350,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
350350
r, tm, err := newRaft(strconv.Itoa(i), port.raftAddress, fsm, i == 0, cfg, electionTimeout)
351351
assert.NoError(t, err)
352352

353-
s := grpc.NewServer()
353+
s := grpc.NewServer(internalutil.GRPCServerOptions()...)
354354
trx := kv.NewTransaction(r)
355355
coordinator := kv.NewCoordinator(trx, r)
356356
relay := NewRedisPubSubRelay()
@@ -416,9 +416,7 @@ func newRaft(myID string, myAddress string, fsm raft.FSM, bootstrap bool, cfg ra
416416
Level: hclog.LevelFromString("WARN"),
417417
})
418418

419-
tm := transport.New(raft.ServerAddress(myAddress), []grpc.DialOption{
420-
grpc.WithTransportCredentials(insecure.NewCredentials()),
421-
})
419+
tm := transport.New(raft.ServerAddress(myAddress), internalutil.GRPCDialOptions())
422420

423421
r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
424422
if err != nil {

cmd/server/demo.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/hashicorp/raft"
3131
"golang.org/x/sync/errgroup"
3232
"google.golang.org/grpc"
33-
"google.golang.org/grpc/credentials/insecure"
3433
)
3534

3635
var (
@@ -206,7 +205,7 @@ func joinCluster(ctx context.Context, nodes []config) error {
206205
}
207206

208207
// Connect to leader
209-
conn, err := grpc.NewClient(leader.address, grpc.WithTransportCredentials(insecure.NewCredentials()))
208+
conn, err := grpc.NewClient(leader.address, internalutil.GRPCDialOptions()...)
210209
if err != nil {
211210
return fmt.Errorf("failed to dial leader: %w", err)
212211
}
@@ -322,7 +321,7 @@ func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotSto
322321
}
323322

324323
func setupGRPC(r *raft.Raft, st store.MVCCStore, tm *transport.Manager, coordinator *kv.Coordinate, distServer *adapter.DistributionServer, relay *adapter.RedisPubSubRelay) (*grpc.Server, *adapter.GRPCServer) {
325-
s := grpc.NewServer()
324+
s := grpc.NewServer(internalutil.GRPCServerOptions()...)
326325
trx := kv.NewTransaction(r)
327326
routedStore := kv.NewLeaderRoutedStore(st, coordinator)
328327
gs := adapter.NewGRPCServer(routedStore, coordinator, adapter.WithCloseStore())
@@ -381,9 +380,7 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
381380
})
382381

383382
// Transport
384-
tm := transport.New(raft.ServerAddress(cfg.address), []grpc.DialOption{
385-
grpc.WithTransportCredentials(insecure.NewCredentials()),
386-
})
383+
tm := transport.New(raft.ServerAddress(cfg.address), internalutil.GRPCDialOptions())
387384

388385
r, err := raft.NewRaft(c, fsm, ldb, sdb, fss, tm.Transport())
389386
if err != nil {

internal/grpc.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package internal
2+
3+
import (
4+
"google.golang.org/grpc"
5+
"google.golang.org/grpc/credentials/insecure"
6+
)
7+
8+
const GRPCMaxMessageBytes = 64 << 20
9+
10+
// GRPCServerOptions keeps Raft replication and the public/internal APIs aligned
11+
// on the same message-size budget.
12+
func GRPCServerOptions() []grpc.ServerOption {
13+
return []grpc.ServerOption{
14+
grpc.MaxRecvMsgSize(GRPCMaxMessageBytes),
15+
grpc.MaxSendMsgSize(GRPCMaxMessageBytes),
16+
}
17+
}
18+
19+
// GRPCDialOptions returns the common insecure dial options used by node-local
20+
// and node-to-node traffic.
21+
func GRPCDialOptions() []grpc.DialOption {
22+
return []grpc.DialOption{
23+
grpc.WithTransportCredentials(insecure.NewCredentials()),
24+
grpc.WithDefaultCallOptions(
25+
grpc.MaxCallRecvMsgSize(GRPCMaxMessageBytes),
26+
grpc.MaxCallSendMsgSize(GRPCMaxMessageBytes),
27+
),
28+
}
29+
}

kv/grpc_conn_cache.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package kv
33
import (
44
"sync"
55

6+
internalutil "github.com/bootjp/elastickv/internal"
67
"github.com/cockroachdb/errors"
78
"github.com/hashicorp/raft"
89
"google.golang.org/grpc"
910
"google.golang.org/grpc/connectivity"
10-
"google.golang.org/grpc/credentials/insecure"
1111
)
1212

1313
// GRPCConnCache reuses gRPC connections per address. gRPC itself handles
@@ -75,9 +75,9 @@ func (c *GRPCConnCache) ConnFor(addr raft.ServerAddress) (*grpc.ClientConn, erro
7575
return conn, nil
7676
}
7777

78-
conn, err := grpc.NewClient(string(addr),
79-
grpc.WithTransportCredentials(insecure.NewCredentials()),
80-
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
78+
conn, err := grpc.NewClient(
79+
string(addr),
80+
append(internalutil.GRPCDialOptions(), grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))...,
8181
)
8282
if err != nil {
8383
return nil, errors.WithStack(err)

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime
287287

288288
func startRaftServers(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, shardStore *kv.ShardStore, coordinate kv.Coordinator, distServer *adapter.DistributionServer, relay *adapter.RedisPubSubRelay) error {
289289
for _, rt := range runtimes {
290-
gs := grpc.NewServer()
290+
gs := grpc.NewServer(internalutil.GRPCServerOptions()...)
291291
trx := kv.NewTransaction(rt.raft)
292292
grpcSvc := adapter.NewGRPCServer(shardStore, coordinate)
293293
pb.RegisterRawKVServer(gs, grpcSvc)

multiraft_runtime.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ import (
77
"time"
88

99
transport "github.com/Jille/raft-grpc-transport"
10+
internalutil "github.com/bootjp/elastickv/internal"
1011
"github.com/bootjp/elastickv/internal/raftstore"
1112
"github.com/bootjp/elastickv/store"
1213
"github.com/cockroachdb/errors"
1314
"github.com/hashicorp/raft"
14-
"google.golang.org/grpc"
15-
"google.golang.org/grpc/credentials/insecure"
1615
)
1716

1817
type raftGroupRuntime struct {
@@ -117,9 +116,7 @@ func newRaftGroup(raftID string, group groupSpec, baseDir string, multi bool, bo
117116
return nil, nil, nil, errors.WithStack(err)
118117
}
119118

120-
tm = transport.New(raft.ServerAddress(group.address), []grpc.DialOption{
121-
grpc.WithTransportCredentials(insecure.NewCredentials()),
122-
})
119+
tm = transport.New(raft.ServerAddress(group.address), internalutil.GRPCDialOptions())
123120

124121
r, err := raft.NewRaft(c, fsm, raftStore, raftStore, fss, tm.Transport())
125122
if err != nil {

0 commit comments

Comments
 (0)