Skip to content

Commit 0c3d2e4

Browse files
authored
Merge pull request #430 from ahrtr/20260520_message_pointer
Pass `Message` as pointer
2 parents ddb40a8 + 3771b8e commit 0c3d2e4

23 files changed

Lines changed: 471 additions & 491 deletions

doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ restart), or you can supply your own disk-backed implementation.
109109
110110
Third, when you receive a message from another node, pass it to Node.Step:
111111
112-
func recvRaftRPC(ctx context.Context, m raftpb.Message) {
112+
func recvRaftRPC(ctx context.Context, m *raftpb.Message) {
113113
n.Step(ctx, m)
114114
}
115115

example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919
)
2020

2121
func applyToStore(_ []*pb.Entry) {}
22-
func sendMessages(_ []pb.Message) {}
22+
func sendMessages(_ []*pb.Message) {}
2323
func saveStateToDisk(_ pb.HardState) {}
2424
func saveToDisk(_ []*pb.Entry) {}
2525

node.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ type Ready struct {
107107
//
108108
// If it contains a MsgSnap message, the application MUST report back to raft
109109
// when the snapshot has been received or has failed by calling ReportSnapshot.
110-
Messages []pb.Message
110+
Messages []*pb.Message
111111

112112
// MustSync indicates whether the HardState and Entries must be durably
113113
// written to disk or if a non-durable write is permissible.
@@ -153,7 +153,7 @@ type Node interface {
153153
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
154154

155155
// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
156-
Step(ctx context.Context, msg pb.Message) error
156+
Step(ctx context.Context, msg *pb.Message) error
157157

158158
// Ready returns a channel that returns the current point-in-time state.
159159
// Users of the Node must call Advance after retrieving the state returned by Ready (unless
@@ -289,14 +289,14 @@ func RestartNode(c *Config) Node {
289289
}
290290

291291
type msgWithResult struct {
292-
m pb.Message
292+
m *pb.Message
293293
result chan error
294294
}
295295

296296
// node is the canonical implementation of the Node interface
297297
type node struct {
298298
propc chan msgWithResult
299-
recvc chan pb.Message
299+
recvc chan *pb.Message
300300
confc chan pb.ConfChangeV2
301301
confstatec chan pb.ConfState
302302
readyc chan Ready
@@ -312,7 +312,7 @@ type node struct {
312312
func newNode(rn *RawNode) node {
313313
return node{
314314
propc: make(chan msgWithResult),
315-
recvc: make(chan pb.Message),
315+
recvc: make(chan *pb.Message),
316316
confc: make(chan pb.ConfChangeV2),
317317
confstatec: make(chan pb.ConfState),
318318
readyc: make(chan Ready),
@@ -465,14 +465,18 @@ func (n *node) Tick() {
465465
}
466466

467467
func (n *node) Campaign(ctx context.Context) error {
468-
return n.step(ctx, pb.Message{Type: pb.MsgHup.Enum()})
468+
return n.step(ctx, &pb.Message{Type: pb.MsgHup.Enum()})
469469
}
470470

471471
func (n *node) Propose(ctx context.Context, data []byte) error {
472-
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Data: data}}})
472+
return n.stepWait(ctx, &pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Data: data}}})
473473
}
474474

475-
func (n *node) Step(ctx context.Context, m pb.Message) error {
475+
func (n *node) Step(ctx context.Context, m *pb.Message) error {
476+
// m should never be nil
477+
if m == nil {
478+
return errors.New("cannot step with nil message")
479+
}
476480
// Ignore unexpected local messages receiving over network.
477481
if IsLocalMsg(m.GetType()) && !IsLocalMsgTarget(m.GetFrom()) {
478482
// TODO: return an error?
@@ -481,12 +485,12 @@ func (n *node) Step(ctx context.Context, m pb.Message) error {
481485
return n.step(ctx, m)
482486
}
483487

484-
func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
488+
func confChangeToMsg(c pb.ConfChangeI) (*pb.Message, error) {
485489
typ, data, err := pb.MarshalConfChange(c)
486490
if err != nil {
487-
return pb.Message{}, err
491+
return nil, err
488492
}
489-
return pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Type: typ.Enum(), Data: data}}}, nil
493+
return &pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Type: typ.Enum(), Data: data}}}, nil
490494
}
491495

492496
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
@@ -497,17 +501,17 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
497501
return n.Step(ctx, msg)
498502
}
499503

500-
func (n *node) step(ctx context.Context, m pb.Message) error {
504+
func (n *node) step(ctx context.Context, m *pb.Message) error {
501505
return n.stepWithWaitOption(ctx, m, false)
502506
}
503507

504-
func (n *node) stepWait(ctx context.Context, m pb.Message) error {
508+
func (n *node) stepWait(ctx context.Context, m *pb.Message) error {
505509
return n.stepWithWaitOption(ctx, m, true)
506510
}
507511

508512
// Step advances the state machine using msgs. The ctx.Err() will be returned,
509513
// if any.
510-
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
514+
func (n *node) stepWithWaitOption(ctx context.Context, m *pb.Message, wait bool) error {
511515
if m.GetType() != pb.MsgProp {
512516
select {
513517
case n.recvc <- m:
@@ -580,7 +584,7 @@ func (n *node) Status() Status {
580584

581585
func (n *node) ReportUnreachable(id uint64) {
582586
select {
583-
case n.recvc <- pb.Message{Type: pb.MsgUnreachable.Enum(), From: new(id)}:
587+
case n.recvc <- &pb.Message{Type: pb.MsgUnreachable.Enum(), From: new(id)}:
584588
case <-n.done:
585589
}
586590
}
@@ -589,24 +593,24 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
589593
rej := status == SnapshotFailure
590594

591595
select {
592-
case n.recvc <- pb.Message{Type: pb.MsgSnapStatus.Enum(), From: new(id), Reject: new(rej)}:
596+
case n.recvc <- &pb.Message{Type: pb.MsgSnapStatus.Enum(), From: new(id), Reject: new(rej)}:
593597
case <-n.done:
594598
}
595599
}
596600

597601
func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
598602
select {
599603
// manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
600-
case n.recvc <- pb.Message{Type: pb.MsgTransferLeader.Enum(), From: new(transferee), To: new(lead)}:
604+
case n.recvc <- &pb.Message{Type: pb.MsgTransferLeader.Enum(), From: new(transferee), To: new(lead)}:
601605
case <-n.done:
602606
case <-ctx.Done():
603607
}
604608
}
605609

606610
func (n *node) ForgetLeader(ctx context.Context) error {
607-
return n.step(ctx, pb.Message{Type: pb.MsgForgetLeader.Enum()})
611+
return n.step(ctx, &pb.Message{Type: pb.MsgForgetLeader.Enum()})
608612
}
609613

610614
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
611-
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex.Enum(), Entries: []*pb.Entry{{Data: rctx}}})
615+
return n.step(ctx, &pb.Message{Type: pb.MsgReadIndex.Enum(), Entries: []*pb.Entry{{Data: rctx}}})
612616
}

0 commit comments

Comments
 (0)