Skip to content

Latest commit

 

History

History
367 lines (274 loc) · 7.21 KB

File metadata and controls

367 lines (274 loc) · 7.21 KB

5.6 Raft算法

📍 导航返回目录 | 上一节:Paxos


设计目标

Understandability(可理解性) - Raft相比Paxos更易懂

核心思想

  • 强Leader:只有Leader处理客户端请求
  • 日志复制:Leader将日志复制到Follower

三种角色

  1. Leader(领导者):处理所有客户端请求
  2. Follower(跟随者):被动响应RPC
  3. Candidate(候选人):选举期间的临时角色
Follower → Candidate → Leader
    ↑          ↓           ↓
    ←----------←-----------←

核心机制

1. Leader选举

触发条件

  • 启动时
  • Follower超时未收到Leader心跳

流程

1. Follower → Candidate(增加term,投票给自己)
2. 向所有节点发送RequestVote RPC
3. 获得多数派投票 → 成为Leader
4. 定期发送心跳维持地位

选举超时:随机150-300ms(避免分票)

2. 日志复制

Client → Leader
Leader → Followers(AppendEntries RPC)
Followers → Leader(ACK)
Leader收到多数派ACK → 提交日志

3. 安全性

选举限制

  • 候选人日志必须至少和多数节点一样新
  • 通过比较(term, index)判断

提交限制

  • 只有当前term的日志可以通过计数提交
  • 旧term的日志通过间接方式提交

Go简化实现

package main

import (
    "math/rand"
    "sync"
    "time"
)

type State int

const (
    Follower State = iota
    Candidate
    Leader
)

type LogEntry struct {
    Term    int
    Command interface{}
}

type Raft struct {
    mu sync.Mutex
    
    // 持久化状态
    currentTerm int
    votedFor    int
    log         []LogEntry
    
    // 易失状态
    commitIndex int
    lastApplied int
    
    // Leader状态
    nextIndex  []int
    matchIndex []int
    
    // 其他
    state       State
    electionTimer *time.Timer
    peers       []*Raft
    id          int
}

func NewRaft(id int, peers []*Raft) *Raft {
    rf := &Raft{
        id:          id,
        peers:       peers,
        state:       Follower,
        currentTerm: 0,
        votedFor:    -1,
        log:         []LogEntry{{Term: 0}},
    }
    
    rf.resetElectionTimer()
    return rf
}

// 选举超时,发起选举
func (rf *Raft) startElection() {
    rf.mu.Lock()
    rf.state = Candidate
    rf.currentTerm++
    rf.votedFor = rf.id
    term := rf.currentTerm
    rf.mu.Unlock()
    
    votes := 1  // 投给自己
    
    // 向所有节点请求投票
    for _, peer := range rf.peers {
        if peer.id == rf.id {
            continue
        }
        
        go func(p *Raft) {
            if p.RequestVote(rf.id, term) {
                rf.mu.Lock()
                votes++
                
                // 获得多数派投票
                if votes > len(rf.peers)/2 && rf.state == Candidate {
                    rf.becomeLeader()
                }
                rf.mu.Unlock()
            }
        }(peer)
    }
}

func (rf *Raft) becomeLeader() {
    rf.state = Leader
    
    // 初始化Leader状态
    rf.nextIndex = make([]int, len(rf.peers))
    rf.matchIndex = make([]int, len(rf.peers))
    
    for i := range rf.nextIndex {
        rf.nextIndex[i] = len(rf.log)
    }
    
    // 发送心跳
    go rf.sendHeartbeats()
}

func (rf *Raft) sendHeartbeats() {
    for rf.state == Leader {
        for _, peer := range rf.peers {
            if peer.id == rf.id {
                continue
            }
            
            go rf.appendEntries(peer)
        }
        
        time.Sleep(100 * time.Millisecond)
    }
}

// RequestVote RPC
func (rf *Raft) RequestVote(candidateID, term int) bool {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    
    // 拒绝旧term
    if term < rf.currentTerm {
        return false
    }
    
    // 发现更新的term
    if term > rf.currentTerm {
        rf.currentTerm = term
        rf.state = Follower
        rf.votedFor = -1
    }
    
    // 投票
    if rf.votedFor == -1 || rf.votedFor == candidateID {
        rf.votedFor = candidateID
        rf.resetElectionTimer()
        return true
    }
    
    return false
}

// AppendEntries RPC
func (rf *Raft) appendEntries(peer *Raft) {
    rf.mu.Lock()
    nextIdx := rf.nextIndex[peer.id]
    prevLogIndex := nextIdx - 1
    prevLogTerm := rf.log[prevLogIndex].Term
    entries := rf.log[nextIdx:]
    rf.mu.Unlock()
    
    success := peer.AppendEntries(rf.currentTerm, prevLogIndex, prevLogTerm, entries)
    
    if success {
        rf.mu.Lock()
        rf.nextIndex[peer.id] = nextIdx + len(entries)
        rf.matchIndex[peer.id] = rf.nextIndex[peer.id] - 1
        rf.mu.Unlock()
    } else {
        rf.mu.Lock()
        rf.nextIndex[peer.id]--
        rf.mu.Unlock()
    }
}

func (rf *Raft) AppendEntries(term, prevLogIndex, prevLogTerm int, entries []LogEntry) bool {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    
    // 拒绝旧term
    if term < rf.currentTerm {
        return false
    }
    
    // 更新term并转为Follower
    if term > rf.currentTerm {
        rf.currentTerm = term
        rf.state = Follower
        rf.votedFor = -1
    }
    
    // 重置选举定时器
    rf.resetElectionTimer()
    
    // 日志不匹配
    if prevLogIndex >= len(rf.log) || rf.log[prevLogIndex].Term != prevLogTerm {
        return false
    }
    
    // 追加日志
    rf.log = append(rf.log[:prevLogIndex+1], entries...)
    
    return true
}

func (rf *Raft) resetElectionTimer() {
    if rf.electionTimer != nil {
        rf.electionTimer.Stop()
    }
    
    timeout := time.Duration(150+rand.Intn(150)) * time.Millisecond
    rf.electionTimer = time.AfterFunc(timeout, func() {
        rf.startElection()
    })
}

关键特性

1. 任期(Term)

  • 逻辑时钟
  • 每次选举递增
  • 用于检测过期信息

2. 日志匹配特性

  • 如果两个日志在相同索引位置的term相同,则它们之前的所有日志都相同

3. 提交规则

  • Leader收到多数派确认后提交
  • 通过心跳间接提交旧term日志

应用

etcd

# etcd集群
etcd1: leader
etcd2: follower
etcd3: follower

# 写入
etcdctl put key value  # 只能在leader上执行

# 读取
etcdctl get key  # 可以在任意节点

Consul

  • 服务注册与发现
  • 健康检查
  • KV存储

Raft vs Paxos

特性 Raft Paxos
易懂性 ✅ 高 ❌ 低
强Leader ✅ 是 ❌ 否
日志连续性 ✅ 保证 ❌ 不保证
工程实现 ✅ 简单 ❌ 复杂

本章小结

关键要点

  • ✅ Raft通过强Leader简化了分布式共识
  • ✅ Leader选举 + 日志复制
  • ✅ Term机制保证一致性
  • ✅ 比Paxos更易理解和实现

扩展阅读


💡 思考题

  1. 为什么Raft比Paxos易懂?
  2. 选举超时为什么要随机化?
  3. 如何处理网络分区?

⏮️ 上一节:Paxos | ⏏️ 返回目录