Skip to content

Latest commit

 

History

History
470 lines (363 loc) · 9.26 KB

File metadata and controls

470 lines (363 loc) · 9.26 KB

4.5 限流算法

📍 导航返回目录 | 上一节:一致性哈希


为什么需要限流

保护系统

  • 防止系统过载
  • 保证服务稳定性
  • 公平分配资源

应用场景

  • API限流
  • 秒杀系统
  • 防止爬虫
  • DDoS防护

固定窗口算法

原理

在固定时间窗口内统计请求数。

import (
    "sync"
    "time"
)

type FixedWindowLimiter struct {
    limit    int
    window   time.Duration
    counter  int
    lastTime time.Time
    mu       sync.Mutex
}

func NewFixedWindowLimiter(limit int, window time.Duration) *FixedWindowLimiter {
    return &FixedWindowLimiter{
        limit:    limit,
        window:   window,
        lastTime: time.Now(),
    }
}

func (l *FixedWindowLimiter) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()
    
    now := time.Now()
    
    // 新窗口,重置计数器
    if now.Sub(l.lastTime) >= l.window {
        l.counter = 0
        l.lastTime = now
    }
    
    // 检查是否超限
    if l.counter < l.limit {
        l.counter++
        return true
    }
    
    return false
}

缺点:窗口边界突刺问题


滑动窗口算法

原理

将时间窗口分为多个小格子,更精细地控制流量。

import (
    "sync"
    "time"
)

type SlidingWindowLimiter struct {
    limit      int
    window     time.Duration
    slotCount  int
    slots      []int
    slotTime   time.Duration
    currentIdx int
    lastTime   time.Time
    mu         sync.Mutex
}

func NewSlidingWindowLimiter(limit int, window time.Duration, slotCount int) *SlidingWindowLimiter {
    return &SlidingWindowLimiter{
        limit:     limit,
        window:    window,
        slotCount: slotCount,
        slots:     make([]int, slotCount),
        slotTime:  window / time.Duration(slotCount),
        lastTime:  time.Now(),
    }
}

func (l *SlidingWindowLimiter) Allow() bool {
    l.mu.Lock()
    defer l.mu.Unlock()
    
    now := time.Now()
    
    // 计算时间差
    elapsed := now.Sub(l.lastTime)
    slotsToMove := int(elapsed / l.slotTime)
    
    // 移动窗口
    if slotsToMove > 0 {
        if slotsToMove >= l.slotCount {
            // 清空所有槽
            for i := range l.slots {
                l.slots[i] = 0
            }
        } else {
            // 移动部分槽
            for i := 0; i < slotsToMove; i++ {
                l.currentIdx = (l.currentIdx + 1) % l.slotCount
                l.slots[l.currentIdx] = 0
            }
        }
        l.lastTime = now
    }
    
    // 统计当前窗口内的请求数
    total := 0
    for _, count := range l.slots {
        total += count
    }
    
    if total < l.limit {
        l.slots[l.currentIdx]++
        return true
    }
    
    return false
}

令牌桶算法

原理

  • 固定速率产生令牌
  • 桶有容量上限
  • 请求消耗令牌
import (
    "sync"
    "time"
)

type TokenBucket struct {
    capacity      int           // 桶容量
    tokens        int           // 当前令牌数
    rate          int           // 每秒产生令牌数
    lastTokenTime time.Time
    mu            sync.Mutex
}

func NewTokenBucket(capacity, rate int) *TokenBucket {
    return &TokenBucket{
        capacity:      capacity,
        tokens:        capacity,
        rate:          rate,
        lastTokenTime: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    now := time.Now()
    
    // 计算应该产生的令牌数
    elapsed := now.Sub(tb.lastTokenTime)
    tokensToAdd := int(elapsed.Seconds() * float64(tb.rate))
    
    if tokensToAdd > 0 {
        tb.tokens += tokensToAdd
        if tb.tokens > tb.capacity {
            tb.tokens = tb.capacity
        }
        tb.lastTokenTime = now
    }
    
    // 消耗令牌
    if tb.tokens > 0 {
        tb.tokens--
        return true
    }
    
    return false
}

// AllowN 消耗N个令牌
func (tb *TokenBucket) AllowN(n int) bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    now := time.Now()
    elapsed := now.Sub(tb.lastTokenTime)
    tokensToAdd := int(elapsed.Seconds() * float64(tb.rate))
    
    if tokensToAdd > 0 {
        tb.tokens += tokensToAdd
        if tb.tokens > tb.capacity {
            tb.tokens = tb.capacity
        }
        tb.lastTokenTime = now
    }
    
    if tb.tokens >= n {
        tb.tokens -= n
        return true
    }
    
    return false
}

优点

  • 允许突发流量(桶满时)
  • 平滑限流
  • 支持预消费

漏桶算法

原理

  • 固定速率处理请求
  • 请求进入队列
  • 队列满则拒绝
import (
    "sync"
    "time"
)

type LeakyBucket struct {
    capacity  int           // 桶容量
    water     int           // 当前水量
    rate      int           // 漏水速率(每秒)
    lastLeakTime time.Time
    mu        sync.Mutex
}

func NewLeakyBucket(capacity, rate int) *LeakyBucket {
    lb := &LeakyBucket{
        capacity:  capacity,
        water:     0,
        rate:      rate,
        lastLeakTime: time.Now(),
    }
    
    // 启动漏水协程
    go lb.leak()
    
    return lb
}

func (lb *LeakyBucket) Allow() bool {
    lb.mu.Lock()
    defer lb.mu.Unlock()
    
    // 检查桶是否满
    if lb.water < lb.capacity {
        lb.water++
        return true
    }
    
    return false
}

func (lb *LeakyBucket) leak() {
    ticker := time.NewTicker(time.Second / time.Duration(lb.rate))
    defer ticker.Stop()
    
    for range ticker.C {
        lb.mu.Lock()
        if lb.water > 0 {
            lb.water--
        }
        lb.mu.Unlock()
    }
}

优点

  • 流量整形
  • 平滑输出

缺点

  • 无法应对突发流量

Redis实现分布式限流

Lua脚本(令牌桶)

local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])

local now = redis.call('TIME')
local timestamp = tonumber(now[1]) + tonumber(now[2]) / 1000000

local info = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(info[1]) or capacity
local last_time = tonumber(info[2]) or timestamp

local delta = math.max(0, timestamp - last_time)
local new_tokens = math.min(capacity, tokens + delta * rate)

if new_tokens >= requested then
    new_tokens = new_tokens - requested
    redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', timestamp)
    redis.call('EXPIRE', key, 3600)
    return 1
else
    return 0
end

Go客户端

import (
    "context"
    "github.com/go-redis/redis/v8"
)

type RedisRateLimiter struct {
    client   *redis.Client
    script   *redis.Script
    capacity int
    rate     float64
}

func NewRedisRateLimiter(client *redis.Client, capacity int, rate float64) *RedisRateLimiter {
    script := redis.NewScript(`
        -- Lua脚本内容
    `)
    
    return &RedisRateLimiter{
        client:   client,
        script:   script,
        capacity: capacity,
        rate:     rate,
    }
}

func (rl *RedisRateLimiter) Allow(ctx context.Context, key string) (bool, error) {
    result, err := rl.script.Run(ctx, rl.client, []string{key}, 
        rl.capacity, rl.rate, 1).Int()
    
    if err != nil {
        return false, err
    }
    
    return result == 1, nil
}

算法对比

算法 突发流量 平滑性 实现复杂度 适用场景
固定窗口 简单 粗粒度限流
滑动窗口 中等 API限流
令牌桶 中等 网络流量整形
漏桶 中等 削峰填谷

实战建议

选择策略

  1. API限流:滑动窗口或令牌桶
  2. 秒杀系统:令牌桶(预热)
  3. 防爬虫:固定窗口(IP维度)
  4. 流量整形:漏桶

多层限流

type MultiLevelLimiter struct {
    globalLimiter *TokenBucket  // 全局限流
    userLimiter   map[string]*TokenBucket  // 用户级限流
    ipLimiter     map[string]*TokenBucket  // IP级限流
}

func (ml *MultiLevelLimiter) Allow(userID, ip string) bool {
    // 1. 全局限流
    if !ml.globalLimiter.Allow() {
        return false
    }
    
    // 2. 用户级限流
    if limiter, ok := ml.userLimiter[userID]; ok {
        if !limiter.Allow() {
            return false
        }
    }
    
    // 3. IP级限流
    if limiter, ok := ml.ipLimiter[ip]; ok {
        if !limiter.Allow() {
            return false
        }
    }
    
    return true
}

本章小结

关键要点

  • ✅ 令牌桶支持突发流量,最常用
  • ✅ 漏桶平滑输出,适合流量整形
  • ✅ 滑动窗口解决固定窗口的边界问题
  • ✅ Redis实现分布式限流

扩展阅读


💡 思考题

  1. 令牌桶和漏桶的区别是什么?
  2. 如何实现分布式限流?
  3. 秒杀系统应该选择哪种限流算法?

⏮️ 上一节:一致性哈希 | ⏏️ 返回目录