Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ restart), or you can supply your own disk-backed implementation.

Third, when you receive a message from another node, pass it to Node.Step:

func recvRaftRPC(ctx context.Context, m raftpb.Message) {
func recvRaftRPC(ctx context.Context, m *raftpb.Message) {
n.Step(ctx, m)
}

Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func applyToStore(_ []*pb.Entry) {}
func sendMessages(_ []pb.Message) {}
func sendMessages(_ []*pb.Message) {}
func saveStateToDisk(_ pb.HardState) {}
func saveToDisk(_ []*pb.Entry) {}

Expand Down
42 changes: 23 additions & 19 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type Ready struct {
//
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
Messages []*pb.Message

// MustSync indicates whether the HardState and Entries must be durably
// written to disk or if a non-durable write is permissible.
Expand Down Expand Up @@ -153,7 +153,7 @@ type Node interface {
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error

// Step advances the state machine using the given message. ctx.Err() will be returned, if any.
Step(ctx context.Context, msg pb.Message) error
Step(ctx context.Context, msg *pb.Message) error

// Ready returns a channel that returns the current point-in-time state.
// Users of the Node must call Advance after retrieving the state returned by Ready (unless
Expand Down Expand Up @@ -289,14 +289,14 @@ func RestartNode(c *Config) Node {
}

type msgWithResult struct {
m pb.Message
m *pb.Message
result chan error
}

// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
recvc chan *pb.Message
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
Expand All @@ -312,7 +312,7 @@ type node struct {
func newNode(rn *RawNode) node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
recvc: make(chan *pb.Message),
confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
Expand Down Expand Up @@ -465,14 +465,18 @@ func (n *node) Tick() {
}

func (n *node) Campaign(ctx context.Context) error {
return n.step(ctx, pb.Message{Type: pb.MsgHup.Enum()})
return n.step(ctx, &pb.Message{Type: pb.MsgHup.Enum()})
}

func (n *node) Propose(ctx context.Context, data []byte) error {
return n.stepWait(ctx, pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Data: data}}})
return n.stepWait(ctx, &pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Data: data}}})
}

func (n *node) Step(ctx context.Context, m pb.Message) error {
func (n *node) Step(ctx context.Context, m *pb.Message) error {
// m should never be nil
if m == nil {
return errors.New("cannot step with nil message")
}
// Ignore unexpected local messages receiving over network.
if IsLocalMsg(m.GetType()) && !IsLocalMsgTarget(m.GetFrom()) {
// TODO: return an error?
Expand All @@ -481,12 +485,12 @@ func (n *node) Step(ctx context.Context, m pb.Message) error {
return n.step(ctx, m)
Comment thread
ahrtr marked this conversation as resolved.
}

func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
func confChangeToMsg(c pb.ConfChangeI) (*pb.Message, error) {
typ, data, err := pb.MarshalConfChange(c)
if err != nil {
return pb.Message{}, err
return nil, err
}
return pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Type: typ.Enum(), Data: data}}}, nil
return &pb.Message{Type: pb.MsgProp.Enum(), Entries: []*pb.Entry{{Type: typ.Enum(), Data: data}}}, nil
}

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
Expand All @@ -497,17 +501,17 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
return n.Step(ctx, msg)
}

func (n *node) step(ctx context.Context, m pb.Message) error {
func (n *node) step(ctx context.Context, m *pb.Message) error {
return n.stepWithWaitOption(ctx, m, false)
}

func (n *node) stepWait(ctx context.Context, m pb.Message) error {
func (n *node) stepWait(ctx context.Context, m *pb.Message) error {
return n.stepWithWaitOption(ctx, m, true)
}

// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
func (n *node) stepWithWaitOption(ctx context.Context, m *pb.Message, wait bool) error {
if m.GetType() != pb.MsgProp {
select {
case n.recvc <- m:
Expand Down Expand Up @@ -580,7 +584,7 @@ func (n *node) Status() Status {

func (n *node) ReportUnreachable(id uint64) {
select {
case n.recvc <- pb.Message{Type: pb.MsgUnreachable.Enum(), From: new(id)}:
case n.recvc <- &pb.Message{Type: pb.MsgUnreachable.Enum(), From: new(id)}:
case <-n.done:
}
}
Expand All @@ -589,24 +593,24 @@ func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
rej := status == SnapshotFailure

select {
case n.recvc <- pb.Message{Type: pb.MsgSnapStatus.Enum(), From: new(id), Reject: new(rej)}:
case n.recvc <- &pb.Message{Type: pb.MsgSnapStatus.Enum(), From: new(id), Reject: new(rej)}:
case <-n.done:
}
}

func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
select {
// manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
case n.recvc <- pb.Message{Type: pb.MsgTransferLeader.Enum(), From: new(transferee), To: new(lead)}:
case n.recvc <- &pb.Message{Type: pb.MsgTransferLeader.Enum(), From: new(transferee), To: new(lead)}:
case <-n.done:
case <-ctx.Done():
}
}

func (n *node) ForgetLeader(ctx context.Context) error {
return n.step(ctx, pb.Message{Type: pb.MsgForgetLeader.Enum()})
return n.step(ctx, &pb.Message{Type: pb.MsgForgetLeader.Enum()})
}

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
return n.step(ctx, pb.Message{Type: pb.MsgReadIndex.Enum(), Entries: []*pb.Entry{{Data: rctx}}})
return n.step(ctx, &pb.Message{Type: pb.MsgReadIndex.Enum(), Entries: []*pb.Entry{{Data: rctx}}})
}
Loading
Loading